文章 62
浏览 15135
二次开发Datax

二次开发Datax

image.png

开发之前

当你打开这篇文档,想必已经不用在此解释什么是 DataX 了,如果你还不清楚 Datax 是什么,请移步我之前的博客: https://www.xiaohugg.top/articles/2023/05/05/1683269906580.html 或者官网 https://github.com/alibaba/DataX

这里主要介绍如何二次开发新插件接入 Datax

我为什么需要二次开发 Datax

前一段时间,因为接到公司的业务需求,需要将 Hive 数据同步到另外一个 Hive 数据,并且接入公司自研产品,通过公司自研产品,可视化创建或者直接通过导入 JSON 文件创建任务,经过调研最终选择了开源的 Datax,以下是需求背景:

需求背景

1、核心集群为 Hadoop2,未开启 kerberos,前置集群为 Hadoop3,开启 Kerberos
2、前置与核心存在 100+ 张表的数据拉取与推送,均存储在 HDFS
3、前置集群、核心集群的表存储格式、压缩格式:
目前前置核心同步表表结构是一致,有存储格式三种, 1、textfile 2、textfile+gz 3、parquet+snappy
4、核心集群日均接收来自 31 省的 343.7T 数据,每个前置集群日均共享数据约 11T
5、现场已存在的 DataX 集群部署在核心,共 59 台服务器。DataX 服务器进行了分组,每个 DataX 任务可以指派到不同的组,在组内进行轮询
6、通过 DataX 完成数据交换后需刷新核心集群的 hive 表分区、调度核心集群 ADMA 的数据牌翻牌接口以便触发后续任务提交运行
7、数据处理:无
8、脏数据处理:无数据处理场景所以无脏数据处理场景
9、增量同步场景:无

经过需求分析,我们很容易得出,大部分是需要从 HDFS 到 HDFS,虽然官网的 Datax 是支持 Hdfs to hdfs 但是官网的 Datax 的 hdfs 不支持 parquet+snappy 格式,并且无法满足第 6 点功能,也就是无法同步数据后无法自动刷新 hive 表分区,hive 和 hdfs 密不可分,这里就不过多介绍 hive,有时间我会写篇什么是 hive

熟悉源代码

通过熟悉源代码,大概了解到 datax 自定义扩展插件,首先,插件的入口类必须扩展 ReaderWriter 抽象类,并且实现分别实现 JobTask 两个内部抽象类,JobTask 的实现必须是 内部类 的形式,需要一个 reader 和一个 writer,并且实现相应的扩展接口

为了不影响官网自带的 hdfsreader 和 hdfswriter,选择重新创建单独的模块

新版 hdfsReader

public class SomeReader extends Reader {
    public static class Job extends Reader.Job {

        @Override
        public void init() {
        }

		@Override
		public void prepare() {
        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            return null;
        }

        @Override
        public void post() {
        }

        @Override
        public void destroy() {
        }

    }

    public static class Task extends Reader.Task {

        @Override
        public void init() {
        }

		@Override
		public void prepare() {
        }

        @Override
        public void startRead(RecordSender recordSender) {
        }

        @Override
        public void post() {
        }

        @Override
        public void destroy() {
        }
    }
}

hdfsreader 支持 parquet 代码

   @Override
        public void startRead(RecordSender recordSender) {

            LOG.info("read start");
            for (String sourceFile : this.sourceFiles) {
                LOG.info(String.format("reading file : [%s]", sourceFile));

                if(specifiedFileType.equalsIgnoreCase(Constant.TEXT)
                        || specifiedFileType.equalsIgnoreCase(Constant.CSV)) {

                    InputStream inputStream = dfsUtil.getInputStream(sourceFile);
                    UnstructuredStorageReaderUtil.readFromStream(inputStream, sourceFile, this.taskConfig,
                            recordSender, this.getTaskPluginCollector());
                }else if(specifiedFileType.equalsIgnoreCase(Constant.ORC)){

                    dfsUtil.orcFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector());
                }else if(specifiedFileType.equalsIgnoreCase(Constant.SEQ)){

                    dfsUtil.sequenceFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector());
                }else if(specifiedFileType.equalsIgnoreCase(Constant.RC)) {

                    dfsUtil.rcFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector());
                }else if (specifiedFileType.equalsIgnoreCase(Constant.PARQUET)) {
                    dfsUtil.parquetFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector());
                }else {

                    String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC,PARQUET 六种格式的文件," +
                            "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE 或者 RC";
                    throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORT, message);
                }

                if(recordSender != null){
                    recordSender.flush();
                }
            }

            LOG.info("end read source files...");
        }

 public void parquetFileStartRead(String sourceParquetFilepath, Configuration readerSliceConfig,
                                     RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read parquetfile [%s].", sourceParquetFilepath));
        List<ColumnEntry> column = UnstructuredStorageReaderUtil
                .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getAllColumnsCountByparquet(sourceParquetFilepath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path parquetFilePath = new Path(sourceParquetFilepath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                ParquetHiveSerDe serde = new ParquetHiveSerDe();
                serde.initialize(conf, p);
                StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
                InputFormat<?, ?> in = new MapredParquetInputFormat();
                FileInputFormat.setInputPaths(conf, parquetFilePath.toString());

                //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
                //Each file as a split
                //TODO multy threads
                InputSplit[] splits = in.getSplits(conf, 1);

                RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
                Object key = reader.createKey();
                Object value = reader.createValue();
                // 获取列信息
                List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                List<Object> recordFields;
                while (reader.next(key, value)) {
                    recordFields = new ArrayList<Object>();

                    for (int i = 0; i <= columnIndexMax; i++) {
                        Object field = inspector.getStructFieldData(value, fields.get(i));
                        recordFields.add(field);
                    }
                    transportOneRecord(column, recordFields, recordSender, taskPluginCollector, isReadAllColumns, nullFormat);
                }
                reader.close();
            } catch (Exception e) {
                String message = String.format("从parquetfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
                        , sourceParquetFilepath);
                LOG.error("解析parquet异常,错误原因 ", e);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }

新版 hdfswriter

同理,自定义一个 writer 实现 writer 抽象类以及 writer.job

image.png

   public void startWrite(RecordReceiver lineReceiver) {
            LOG.info("begin do write...");
            LOG.info(String.format("write to file : [%s]", this.fileName));
            if (fileType.equalsIgnoreCase("TEXT")) {
                //写TEXT FILE
                hdfsHelper.textFileStartWrite(lineReceiver, this.writerSliceConfig, this.fileName,
                        this.getTaskPluginCollector());
            } else if (fileType.equalsIgnoreCase("ORC")) {
                //写ORC FILE
                hdfsHelper.orcFileStartWrite(lineReceiver, this.writerSliceConfig, this.fileName,
                        this.getTaskPluginCollector());
            } else if (fileType.equalsIgnoreCase("PARQUET")) {
                hdfsHelper.parquetFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector());
            }

            LOG.info("end do write");
        }

hive 自动刷新分区

支持单分区和多分区自动刷新,用户填写的 path 路径在 hdfs 没有相应的文件夹将会自动创建

image.png

JSON 启动入口

代码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?在每个插件的项目中,都有一个 plugin.json 文件,这个文件定义了插件的相关信息,包括入口类。例如:

{
    "name": "mysqlwriter",
    "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
    "description": "Use Jdbc connect to database, execute insert sql.",
    "developer": "alibaba"
}
  • name: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要
  • class: 入口类的全限定名称,框架通过反射插件入口类的实例。十分重要
  • description: 描述信息。
  • developer: 开发人员。

需要修改这几个文件

image.png

接入公司自研系统

当 datax 二次开发完后,需要接入公司自研系统,需要通过 spi 机制,加载实现任务插件(Datax)

分为两种方式:可视化、JSON

image.png

  • 可视化

image.png

image.png

  • JSON
    如果想通过 json,开启自定义模板即可

    image.png

创建好任务,底层代码将会自动创建 datax 所需要的 JSON 文件,将会运行 datax

运行日志

{
	"content":[
		{
			"reader":{
				"name":"dtswHdfsreader",
				"parameter":{
					"haveKerberos":false,
					"hadoopConfigPath":"/data/hadoop_conf/node-2",
					"column":[
						{
							"index":0,
							"type":"int"
						},
						{
							"index":1,
							"type":"string"
						},
						{
							"index":2,
							"type":"int"
						},
						{
							"index":3,
							"type":"int"
						},
						{
							"index":4,
							"type":"string"
						}
					],
					"path":"/tsp/dm/tmp_table1/p_provincecode=440000/p_date=2023-10-12/p_hour=6/",
					"fieldDelimiter":",",
					"fileType":"parenet",
					"encoding":"UTF-8"
				}
			},
			"writer":{
				"name":"dtswHdfswriter",
				"parameter":{
					"haveKerberos":false,
					"path":"/tsp/dm/tmp_table2/p_provincecode=440000/p_date=2023-10-12/p_hour=6/",
					"fieldDelimiter":",",
					"fileType":"text",
					"encoding":"UTF-8",
					"writeMode":"append",
					"compress":"parquet",
					"fileName":"tmp_test_datax",
					"haveHive":true,
					"hadoopConfigPath":"/data/hadoop_conf/node-2",
					"userName":"hdfs",
					"tableName":"tmp_table2",
					"partitionField":"[\"p_provincecode\",\"p_date\",\"p_hour\"]",
					"jdbcUrl":"jdbc:hive2://xxxxx:10000/xxxxx",
					"column":[
						{
							"name":"province",
							"type":"int"
						},
						{
							"name":"day",
							"type":"string"
						},
						{
							"name":"hour",
							"type":"int"
						},
						{
							"name":"userid",
							"type":"int"
						},
						{
							"name":"imsi",
							"type":"string"
						}
					]
				}
			}
		}
	],
	"setting":{
		"speed":{
			"channel":1,
			"record":1000
		},
		"errorLimit":{
			"record":0,
			"percentage":0
		}
	}
}

image.png

至此自研系统接入 datax 二次开发基本完成

具体想了解看代码:https://github.com/2447850100/datax_parquet


标题:二次开发Datax
作者:xiaohugg
地址:https://xiaohugg.top/articles/2023/10/12/1697100998680.html

人民有信仰 民族有希望 国家有力量