文章 62
浏览 15135
datax启动流程

datax启动流程

datax 启动流程

组件

datax 采集流程

  • Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 Framework。
  • Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。
  • Transformer:在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作
  • Job: Job 是 DataX 用以描述从一个源头到一个目的端的同步作业,是 DataX 数据同步的最小业务单元。
  • Task: Task 是把 Job 拆分得到的最小执行单元。
  • JobContainer: Job 执行器,负责 Job 全局拆分、调度、前置语句和后置语句等工作的工作单元。
  • TaskGroupContainer: TaskGroup 执行器,负责执行一组 Task 的工作单元。
  • TaskGroup: 描述的是一组 Task 集合。在同一个 TaskGroupContainer 执行下的 Task 集合称之为 TaskGroup

参数

CoreConstant 中会提取 datax.home 这个环境变量供全局使用,拼接成 core.json,plugin.json 的地址

  • 一些环境变量
  • mode:standalone, local, distribute 选择作业运行模式
  • jobid:在 local 与 distribute 模式下运行的作业唯一 id
  • job:作业配置文件路径
  • classpath
  • Standalone: 单进程运行,没有外部依赖。
  • Local: 单进程运行,统计信息、错误信息汇报到集中存储。
  • Distrubuted: 分布式多进程运行,依赖 DataX Service 服务。

运行流程

引擎启动后 jobContainer 启动流程

作业配置加载

  • 通过作业配置文件路径(-job 参数)来加载作业配置文件。
  • CoreConstant 通过环境变量获取 core 配置文件路径(datax.home 拼接),加载 core 配置。
  • 通过 job.content[0].reader/writer.name 读取该作业的插件名,通过 job.preHandler.pluginName/job.postHandler.pluginName 读取该作业的前置或后置处理插件名。通过 CoreConstant 获取以上所有读取到的插件名的绝对路径。
  • 通过路径来加载插件配置文件内容。插件的配置文件按如下约束。
  • Configuration.from(String json) 读取任意配置文件时都会将 ${xxx}$xxx 占位符替换成 xxx 对应的环境变量。即 -Dlast=123 使配置文件中 ${last} 替换成 123, 该逻辑存在 StrUtil.replaceVariable(json)
  • 将 core,job,plugin 配置合并,生成全局使用的配置 Configuration
  • 最后做过滤输出和检查配置

引擎启动

  • 从 common 取出需要的转换格式 yyyy-MM-dd 或编码 UTF-8,用于 String 与 Date 或 Bytes 的互相转换
  • 将配置 Configuration 传入 LoadUtil Jar 加载器,后面会使用 LoadUtil 进行插件 Jar 的动态加载。包括对每个插件的加载隔离机制和加载器缓冲的实现。
  • 根据 core.container.model 判断使用 TaskGroupContainer 还是使用 JobContainer,默认使用 JobContainer
  • PerfTrace 初始化,默认不使用 PerfTrace,获取 job.JobInfo 默认无该配置项,
  • 容器启动

JobContainer

其中加载操作中的加载插件时:

类加载器隔离

  • preHandle 前置处理器:根据 job.preHandler.pluginName 加载已存在的插件,并执行插件的 preHandler 方法
  • init 初始化:
    1. 根据 job.content[0].reader/writer.name 插件名来加载 reader 和 writer 插件,并保存 reader/wirter PluginName
    2. 赋值 Configuration,赋值 Job插件 本身和 对端插件 的配置 job.content[0].reader/writer.parameter 与对端的插件名子。并且执行他们的 init 方法。

  • prepare 准备:执行 reader/writer 的 prepare 方法
  • split 切分任务:
    1. 根据 job.setting.speed.bytecore.transport.channel.speed.bytejob.setting.speed.recordcore.transport.channel.speed.record 的值计算出并发 task 数 needChannelNumber,具体算法
  1. 执行 reader 和 writer 的 split 方法,获取经过 split 每个 Task 的 reader 和 writer 的配置。

切分方式

  1. 获取作业的 transformer 配置,每个 Task 的 reader 和 writer 配置再加上该 transformer 的配置合并。将原本的 job.content 替换。即原本只有单个 content,经过 split 后产生多个 content,并为其设置递增的 taskId
  • schedule 调度:
    1. parseAndGetResourceMarkAndTaskIdMap:以 reader.parameter.loadBalanceResourceMark 资源名做分组。得出一个 资源名称 --> taskId(List) 的 map 映射关系。在 split 阶段,会对插件的 loadBalanceResourceMark 进行设置,通常是使用 JDBC 连接的 host
    2. doAssign:根据 parseAndGetResourceMarkAndTaskIdMap 的结果,将需要运行 Task 按一个特定的规则分配到 taskGroup 中。每个 TaskGroup 都将获得一份 Configuration 克隆,设置每个 taskConfiguration 的 content 中的 core.container.taskGroup.id。并且修正 job.content,使他的配置文件回到单 content 状态
  1. adjustChannelNumPerTaskGroup:修正因为无法平均分配的少一个 task 的 taskGroup 的 core.container.taskGroup.channel 的更改
  2. 为每个 taskGroup 修正 core.container.job.mode 为 standalone
  3. StandAloneScheduler#registerCommunication:为每个 taskGroup 注册 Communication(状态及统计信息交互)
  4. StandAloneScheduler#startAllTaskGroup:为每个 taskGroup 创建 TaskGroupContainer 并代理到 TaskGroupContainerRunner 启动 TaskGroupContainer。其中动态加载 transfomer,数据采集就在这个步骤之内。
  • post:执行 reader 和 writer 的 post 方法
  • postHandle:根据 job.postHandler.pluginName 加载已存在的插件,并执行插件的 postHandler 方法
  • invokeHooks:根据 /hook 目录调用外部 hook

TaskGroupContainer

类图

reader 与 writer 的数据传输


标题:datax启动流程
作者:xiaohugg
地址:https://xiaohugg.top/articles/2023/10/13/1697179008145.html

人民有信仰 民族有希望 国家有力量