datax 启动流程
组件
datax 采集流程
- Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 Framework。
- Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。
- Transformer:在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作
Job
:Job
是 DataX 用以描述从一个源头到一个目的端的同步作业,是 DataX 数据同步的最小业务单元。Task
:Task
是把Job
拆分得到的最小执行单元。JobContainer
:Job
执行器,负责Job
全局拆分、调度、前置语句和后置语句等工作的工作单元。TaskGroupContainer
:TaskGroup
执行器,负责执行一组Task
的工作单元。TaskGroup
: 描述的是一组Task
集合。在同一个TaskGroupContainer
执行下的Task
集合称之为TaskGroup
参数
CoreConstant 中会提取 datax.home 这个环境变量供全局使用,拼接成 core.json,plugin.json 的地址
- 一些环境变量
- mode:standalone, local, distribute 选择作业运行模式
- jobid:在 local 与 distribute 模式下运行的作业唯一 id
- job:作业配置文件路径
- classpath
Standalone
: 单进程运行,没有外部依赖。Local
: 单进程运行,统计信息、错误信息汇报到集中存储。Distrubuted
: 分布式多进程运行,依赖DataX Service
服务。
运行流程
引擎启动后 jobContainer 启动流程
作业配置加载
- 通过作业配置文件路径(-job 参数)来加载作业配置文件。
- CoreConstant 通过环境变量获取 core 配置文件路径(datax.home 拼接),加载 core 配置。
- 通过
job.content[0].reader/writer.name
读取该作业的插件名,通过job.preHandler.pluginName
/job.postHandler.pluginName
读取该作业的前置或后置处理插件名。通过 CoreConstant 获取以上所有读取到的插件名的绝对路径。 - 通过路径来加载插件配置文件内容。插件的配置文件按如下约束。
- 在
Configuration.from(String json)
读取任意配置文件时都会将${xxx}
或$xxx
占位符替换成 xxx 对应的环境变量。即-Dlast=123
使配置文件中${last}
替换成123
, 该逻辑存在StrUtil.replaceVariable(json)
中 - 将 core,job,plugin 配置合并,生成全局使用的配置
Configuration
。 - 最后做过滤输出和检查配置
引擎启动
- 从 common 取出需要的转换格式
yyyy-MM-dd
或编码UTF-8
,用于 String 与 Date 或 Bytes 的互相转换 - 将配置
Configuration
传入LoadUtil
Jar 加载器,后面会使用 LoadUtil 进行插件 Jar 的动态加载。包括对每个插件的加载隔离机制和加载器缓冲的实现。 - 根据
core.container.model
判断使用TaskGroupContainer
还是使用JobContainer
,默认使用JobContainer
- PerfTrace 初始化,默认不使用 PerfTrace,获取
job.JobInfo
默认无该配置项, - 容器启动
JobContainer
其中加载操作中的加载插件时:
类加载器隔离
- preHandle 前置处理器:根据
job.preHandler.pluginName
加载已存在的插件,并执行插件的preHandler
方法 - init 初始化:
- 根据
job.content[0].reader/writer.name
插件名来加载 reader 和 writer 插件,并保存reader/wirter PluginName
, - 赋值
Configuration
,赋值Job插件
本身和对端插件
的配置job.content[0].reader/writer.parameter
与对端的插件名子。并且执行他们的 init 方法。
- 根据
- prepare 准备:执行 reader/writer 的
prepare
方法 - split 切分任务:
- 根据
job.setting.speed.byte
,core.transport.channel.speed.byte
和job.setting.speed.record
,core.transport.channel.speed.record
的值计算出并发 task 数needChannelNumber
,具体算法
- 根据
- 执行 reader 和 writer 的
split
方法,获取经过split
每个 Task 的 reader 和 writer 的配置。
切分方式
- 获取作业的
transformer
配置,每个 Task 的 reader 和 writer 配置再加上该transformer
的配置合并。将原本的job.content
替换。即原本只有单个 content,经过 split 后产生多个 content,并为其设置递增的 taskId
- schedule 调度:
parseAndGetResourceMarkAndTaskIdMap
:以reader.parameter.loadBalanceResourceMark
资源名做分组。得出一个 资源名称 --> taskId(List) 的 map 映射关系。在 split 阶段,会对插件的loadBalanceResourceMark
进行设置,通常是使用 JDBC 连接的 host- doAssign:根据
parseAndGetResourceMarkAndTaskIdMap
的结果,将需要运行 Task 按一个特定的规则分配到 taskGroup 中。每个 TaskGroup 都将获得一份Configuration
克隆,设置每个 taskConfiguration 的 content 中的core.container.taskGroup.id
。并且修正job.content
,使他的配置文件回到单 content 状态
- adjustChannelNumPerTaskGroup:修正因为无法平均分配的少一个 task 的 taskGroup 的
core.container.taskGroup.channel
的更改 - 为每个 taskGroup 修正
core.container.job.mode
为 standalone - StandAloneScheduler#registerCommunication:为每个 taskGroup 注册 Communication(状态及统计信息交互)
- StandAloneScheduler#startAllTaskGroup:为每个 taskGroup 创建
TaskGroupContainer
并代理到TaskGroupContainerRunner
启动TaskGroupContainer
。其中动态加载 transfomer,数据采集就在这个步骤之内。
- post:执行 reader 和 writer 的
post
方法 - postHandle:根据
job.postHandler.pluginName
加载已存在的插件,并执行插件的postHandler
方法 - invokeHooks:根据
/hook
目录调用外部 hook
TaskGroupContainer
类图
reader 与 writer 的数据传输