开发之前
当你打开这篇文档,想必已经不用在此解释什么是 DataX
了,如果你还不清楚 Datax 是什么,请移步我之前的博客: https://www.xiaohugg.top/articles/2023/05/05/1683269906580.html 或者官网 https://github.com/alibaba/DataX
这里主要介绍如何二次开发新插件接入 Datax
我为什么需要二次开发 Datax
前一段时间,因为接到公司的业务需求,需要将 Hive 数据同步到另外一个 Hive 数据,并且接入公司自研产品,通过公司自研产品,可视化创建或者直接通过导入 JSON 文件创建任务,经过调研最终选择了开源的 Datax,以下是需求背景:
需求背景
1、核心集群为 Hadoop2,未开启 kerberos,前置集群为 Hadoop3,开启 Kerberos
2、前置与核心存在 100+ 张表的数据拉取与推送,均存储在 HDFS
3、前置集群、核心集群的表存储格式、压缩格式:
目前前置核心同步表表结构是一致,有存储格式三种, 1、textfile 2、textfile+gz 3、parquet+snappy
4、核心集群日均接收来自 31 省的 343.7T 数据,每个前置集群日均共享数据约 11T
5、现场已存在的 DataX 集群部署在核心,共 59 台服务器。DataX 服务器进行了分组,每个 DataX 任务可以指派到不同的组,在组内进行轮询
6、通过 DataX 完成数据交换后需刷新核心集群的 hive 表分区、调度核心集群 ADMA 的数据牌翻牌接口以便触发后续任务提交运行
7、数据处理:无
8、脏数据处理:无数据处理场景所以无脏数据处理场景
9、增量同步场景:无
经过需求分析,我们很容易得出,大部分是需要从 HDFS 到 HDFS,虽然官网的 Datax 是支持 Hdfs to hdfs 但是官网的 Datax 的 hdfs 不支持 parquet+snappy 格式,并且无法满足第 6 点功能,也就是无法同步数据后无法自动刷新 hive 表分区,hive 和 hdfs 密不可分,这里就不过多介绍 hive,有时间我会写篇什么是 hive
熟悉源代码
通过熟悉源代码,大概了解到 datax 自定义扩展插件,首先,插件的入口类必须扩展 Reader
或 Writer
抽象类,并且实现分别实现 Job
和 Task
两个内部抽象类,Job
和 Task
的实现必须是 内部类 的形式,需要一个 reader 和一个 writer,并且实现相应的扩展接口
为了不影响官网自带的 hdfsreader 和 hdfswriter,选择重新创建单独的模块
新版 hdfsReader
public class SomeReader extends Reader {
public static class Job extends Reader.Job {
@Override
public void init() {
}
@Override
public void prepare() {
}
@Override
public List<Configuration> split(int adviceNumber) {
return null;
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task {
@Override
public void init() {
}
@Override
public void prepare() {
}
@Override
public void startRead(RecordSender recordSender) {
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
}
hdfsreader 支持 parquet 代码
@Override
public void startRead(RecordSender recordSender) {
LOG.info("read start");
for (String sourceFile : this.sourceFiles) {
LOG.info(String.format("reading file : [%s]", sourceFile));
if(specifiedFileType.equalsIgnoreCase(Constant.TEXT)
|| specifiedFileType.equalsIgnoreCase(Constant.CSV)) {
InputStream inputStream = dfsUtil.getInputStream(sourceFile);
UnstructuredStorageReaderUtil.readFromStream(inputStream, sourceFile, this.taskConfig,
recordSender, this.getTaskPluginCollector());
}else if(specifiedFileType.equalsIgnoreCase(Constant.ORC)){
dfsUtil.orcFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector());
}else if(specifiedFileType.equalsIgnoreCase(Constant.SEQ)){
dfsUtil.sequenceFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector());
}else if(specifiedFileType.equalsIgnoreCase(Constant.RC)) {
dfsUtil.rcFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector());
}else if (specifiedFileType.equalsIgnoreCase(Constant.PARQUET)) {
dfsUtil.parquetFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector());
}else {
String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC,PARQUET 六种格式的文件," +
"请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE 或者 RC";
throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORT, message);
}
if(recordSender != null){
recordSender.flush();
}
}
LOG.info("end read source files...");
}
public void parquetFileStartRead(String sourceParquetFilepath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read parquetfile [%s].", sourceParquetFilepath));
List<ColumnEntry> column = UnstructuredStorageReaderUtil
.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
StringBuilder allColumns = new StringBuilder();
StringBuilder allColumnTypes = new StringBuilder();
boolean isReadAllColumns = false;
int columnIndexMax = -1;
// 判断是否读取所有列
if (null == column || column.size() == 0) {
int allColumnsCount = getAllColumnsCountByparquet(sourceParquetFilepath);
columnIndexMax = allColumnsCount - 1;
isReadAllColumns = true;
} else {
columnIndexMax = getMaxIndex(column);
}
for (int i = 0; i <= columnIndexMax; i++) {
allColumns.append("col");
allColumnTypes.append("string");
if (i != columnIndexMax) {
allColumns.append(",");
allColumnTypes.append(":");
}
}
if (columnIndexMax >= 0) {
JobConf conf = new JobConf(hadoopConf);
Path parquetFilePath = new Path(sourceParquetFilepath);
Properties p = new Properties();
p.setProperty("columns", allColumns.toString());
p.setProperty("columns.types", allColumnTypes.toString());
try {
ParquetHiveSerDe serde = new ParquetHiveSerDe();
serde.initialize(conf, p);
StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
InputFormat<?, ?> in = new MapredParquetInputFormat();
FileInputFormat.setInputPaths(conf, parquetFilePath.toString());
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//Each file as a split
//TODO multy threads
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 获取列信息
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
List<Object> recordFields;
while (reader.next(key, value)) {
recordFields = new ArrayList<Object>();
for (int i = 0; i <= columnIndexMax; i++) {
Object field = inspector.getStructFieldData(value, fields.get(i));
recordFields.add(field);
}
transportOneRecord(column, recordFields, recordSender, taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();
} catch (Exception e) {
String message = String.format("从parquetfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceParquetFilepath);
LOG.error("解析parquet异常,错误原因 ", e);
throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
}
} else {
String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
}
}
新版 hdfswriter
同理,自定义一个 writer 实现 writer 抽象类以及 writer.job
public void startWrite(RecordReceiver lineReceiver) {
LOG.info("begin do write...");
LOG.info(String.format("write to file : [%s]", this.fileName));
if (fileType.equalsIgnoreCase("TEXT")) {
//写TEXT FILE
hdfsHelper.textFileStartWrite(lineReceiver, this.writerSliceConfig, this.fileName,
this.getTaskPluginCollector());
} else if (fileType.equalsIgnoreCase("ORC")) {
//写ORC FILE
hdfsHelper.orcFileStartWrite(lineReceiver, this.writerSliceConfig, this.fileName,
this.getTaskPluginCollector());
} else if (fileType.equalsIgnoreCase("PARQUET")) {
hdfsHelper.parquetFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector());
}
LOG.info("end do write");
}
hive 自动刷新分区
支持单分区和多分区自动刷新,用户填写的 path 路径在 hdfs 没有相应的文件夹将会自动创建
JSON 启动入口
代码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?在每个插件的项目中,都有一个 plugin.json
文件,这个文件定义了插件的相关信息,包括入口类。例如:
{
"name": "mysqlwriter",
"class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
"description": "Use Jdbc connect to database, execute insert sql.",
"developer": "alibaba"
}
name
: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要 。class
: 入口类的全限定名称,框架通过反射插件入口类的实例。十分重要 。description
: 描述信息。developer
: 开发人员。
需要修改这几个文件
接入公司自研系统
当 datax 二次开发完后,需要接入公司自研系统,需要通过 spi 机制,加载实现任务插件(Datax)
分为两种方式:可视化、JSON
- 可视化
-
JSON
如果想通过 json,开启自定义模板即可
创建好任务,底层代码将会自动创建 datax 所需要的 JSON 文件,将会运行 datax
运行日志
{
"content":[
{
"reader":{
"name":"dtswHdfsreader",
"parameter":{
"haveKerberos":false,
"hadoopConfigPath":"/data/hadoop_conf/node-2",
"column":[
{
"index":0,
"type":"int"
},
{
"index":1,
"type":"string"
},
{
"index":2,
"type":"int"
},
{
"index":3,
"type":"int"
},
{
"index":4,
"type":"string"
}
],
"path":"/tsp/dm/tmp_table1/p_provincecode=440000/p_date=2023-10-12/p_hour=6/",
"fieldDelimiter":",",
"fileType":"parenet",
"encoding":"UTF-8"
}
},
"writer":{
"name":"dtswHdfswriter",
"parameter":{
"haveKerberos":false,
"path":"/tsp/dm/tmp_table2/p_provincecode=440000/p_date=2023-10-12/p_hour=6/",
"fieldDelimiter":",",
"fileType":"text",
"encoding":"UTF-8",
"writeMode":"append",
"compress":"parquet",
"fileName":"tmp_test_datax",
"haveHive":true,
"hadoopConfigPath":"/data/hadoop_conf/node-2",
"userName":"hdfs",
"tableName":"tmp_table2",
"partitionField":"[\"p_provincecode\",\"p_date\",\"p_hour\"]",
"jdbcUrl":"jdbc:hive2://xxxxx:10000/xxxxx",
"column":[
{
"name":"province",
"type":"int"
},
{
"name":"day",
"type":"string"
},
{
"name":"hour",
"type":"int"
},
{
"name":"userid",
"type":"int"
},
{
"name":"imsi",
"type":"string"
}
]
}
}
}
],
"setting":{
"speed":{
"channel":1,
"record":1000
},
"errorLimit":{
"record":0,
"percentage":0
}
}
}
至此自研系统接入 datax 二次开发基本完成