ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

解读Datax mysql reader配置

2021-08-01 22:34:28  阅读:320  来源: 互联网

标签:配置 splitPk reader Datax mysql table DataX channel eachTableShouldSplittedNumber


datax里所有的关系型数据库都走通用的处理Reader,com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader,当进行split的时候会进行切分获取channel的个数,

public List<Configuration> split(Configuration originalConfig,
                                         int adviceNumber) {
            return ReaderSplitUtil.doSplit(originalConfig, adviceNumber);
}

继续进入doSplit方法com.alibaba.datax.plugin.rdbms.reader.util.ReaderSplitUtil 此处会去判断是否是table配置的模式

if (isTableMode) {
            // adviceNumber这里是channel数量大小, 即datax并发task数量
            // eachTableShouldSplittedNumber是单表应该切分的份数, 向上取整可能和adviceNumber没有比例关系了已经
            eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
                    adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
}

如果前面计算的adviceNumber=3 配置了一个table 则每个表分到的channel是3/1=3,

 
// 说明是配置的 table 方式
if (isTableMode) {
    // 已在之前进行了扩展和`处理,可以直接使用
    List<String> tables = connConf.getList(Key.TABLE, String.class);
 
    Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误.");
 
    String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);
 
     /**
      * 1-判断是否配置了splitPk,如果没有配置则每个table都会当成一个任务,生成一个配置文件给任务运行使用
      * 2-如果配置了splitPk,如果只配了一个table,则重新计算eachTableShouldSplittedNumber=eachTableShouldSplittedNumber * 5;
      * 3-如果配置了多个table,eachTableShouldSplittedNumber不变,然后循环对每个表进行切分splitSingleTable
     */
    //最终切分份数不一定等于 eachTableShouldSplittedNumber
    boolean needSplitTable = eachTableShouldSplittedNumber > 1
            && StringUtils.isNotBlank(splitPk);
    if (needSplitTable) {
        if (tables.size() == 1) {
            //原来:如果是单表的,主键切分num=num*2+1
            // splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑
            //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾
             
            //考虑其他比率数字?(splitPk is null, 忽略此长尾)
            eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
        }
        // 尝试对每个表,切分为eachTableShouldSplittedNumber 份
        for (String table : tables) {
            tempSlice = sliceConfig.clone();
            tempSlice.set(Key.TABLE, table);
 
            List<Configuration> splittedSlices = SingleTableSplitUtil
                    .splitSingleTable(tempSlice, eachTableShouldSplittedNumber);
 
            splittedConfigs.addAll(splittedSlices);
        }
    } else {
        for (String table : tables) {
            tempSlice = sliceConfig.clone();
            tempSlice.set(Key.TABLE, table);
            String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column);
            tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where));
            splittedConfigs.add(tempSlice);
        }
    }
} else {
    // 说明是配置的 querySql 方式
    List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);
 
    // TODO 是否check 配置为多条语句??
    for (String querySql : sqls) {
        tempSlice = sliceConfig.clone();
        tempSlice.set(Key.QUERY_SQL, querySql);
        splittedConfigs.add(tempSlice);
    }
}

此处主要分为两种,一种是table模式配置,一种是querysql模式配置, querySql模式相对简单,配置了几个sql,就会生成几个任务的配置文件。

我们主要关注table模式,主要的流程:

 /**
 * 1-判断是否配置了splitPk,如果没有配置则每个table都会当成一个任务,生成一个配置文件给任务运行使用
 * 2-如果配置了splitPk,如果只配了一个table,则重新计算eachTableShouldSplittedNumber=eachTableShouldSplittedNumber * 5;
 * 3-如果配置了多个table,eachTableShouldSplittedNumber不变,然后循环对每个表进行切分splitSingleTable
*/

接下来关注splitSingleTable方法

大体流程是:
首先会根据 Configuration configuration, int adviceNum 配置文件信息和需要切分的个数进行切分会计算出splitPk的最大最小值,然后按照adviceNum进行分割,然后生成具体的sql

如果配置了15个channel,单表, 拆分,则此时通过上面的流程最后可以计算出task数目为75,在我的mysql表中有3089条数据,此时他会返回下面的配置总共75个有效配置,主要是看querySql,每个配置的条件都会不同。这只是其中一个

{
"column": "id,username,telephone",
"columnList": ["id", "username", "telephone"],
"fetchSize": -2147483648,
"isTableMode": true,
"jdbcUrl": "jdbc:mysql://localhost:3306/datax?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true",
"loadBalanceResourceMark": "localhost",
"password": "root123",
"pkType": "pkTypeLong",
"querySql": "select id,username,telephone from user  where  (547 <= id AND id < 588) ",
"splitPk": "id",
"table": "user",
"tableNumber": 1,
"username": "root"
}
第一个sql:
"querySql": "select id,username,telephone from user  where  (1 <= id AND id < 43) "
最后一个sql是:
"querySql": "select id,username,telephone from user  where  id IS NULL"
最后一个有效sql是:
"querySql": "select id,username,telephone from user  where  (3048 <= id AND id <= 3089) "
 
可以看到和表中的数据一致,这里主要是把数据量过大的表,按照配置计算出的channel进行切分,
然后生成分段的sql,一个sql对应一个任务,进行执行。

后面会根据taskGroup的个数,来对着75个任务进行分组,然后提交到线程池中并发执行任务.

如果没有SPLIT_PK(目前splitPk仅支持整形、字符串数据切分),则为每个表生成一个任务

在DataX中,mysqlreader配置有两种模式,一种是table模式,另外一种是querySql模式,两种模式使用起来略有差别。

table模式

在table模式下, channel个数决定了reader和writer的个数上限,假设为m个:如果指定了splitPk字段,DataX会将mysql表中数据按照splitPk切分成n段,n大致为5倍的channel个数。

splitPk的字段限制了必需是整型或者字符串类型。由于DataX的实现方式是按照spliPk字段分段查询数据库表,那么spliPk字段的选取应该尽可能的选择分布均匀且有索引的字段,比如主键id、唯一键等字段。DataX会启动m个reader线程,消费DataX切分好的n个查询sql语句(task), 对应的会有m个writer线程将查询出来的数据写入目标数据源中,并行度为m(也就是配置的channel个数),如果不指定splitPk字段,DataX将不会进行数据的切分,并行度直接退化成1。

需要指出的是,table模式下,如果用户指定了spliPk将数据切分成了n段,由于这些task不是在同一个事务下进行select,那么最终取出的全量数据很有可能是不一致的。为了拿到一致性数据,要么不要配置spliPk使用单线程,要么确保mysql中要导出的数据不会再发生变化。

querySql模式

querySql模式一般用于有条件的数据导出,

 "connection": [
                            {
                                "querySql": [ #指定执行的SQL语句
                                    "select bucket_name, delta , timestamp ,cdn_in, cdn_out ,total_request from vip_quota where bucket_name='xxx' "
                                ],
                                "jdbcUrl": ["jdbc:mysql://10.10.0.8:3306/db1?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" #jdbc连接串
                                ]
                            }
                        ]

在此模式下,DataX不会再按照指定的column、table参数进行sql的拼接,而是会直接略过这些配置(如果有),直接执行querySql语句,task数量总是1,因此在此模式下channel的配置不再有多线程的效果。

性能调优

有人肯定会有疑问,有什么办法可以尽可能加速数据的导出呢?

一般来说,大家首先想到的是提高并发度。在DataX中channel的个数决定了并发数量,但是要使channel参数生效,并不是简单配一下channel就完事了。在MySQL导入Tablestore表的场景下,channel生效仅在能够split出多个SQL语句的场景下,也就是table模式+spliPk下有用。

DataX的数据同步涉及三部分:

1.数据读取
2.数据交换
3.数据写入

对于以上三个环节,都有不同的优化方式,分析如下。

1.数据读取
对于数据源读取,导出的两种模式:table模式和sqlQuery模式前面做了阐述,这里不再重复。

2. 数据交换
对于数据交换,前面提到,发送给MySQL数据库SQL语句后会得到查询的数据集,缓存在DataX的buffer中;除此之外,每个channel也维护了自己的record队列,如果存在并发,channel的个数越多,也会需要更多的内存。因此首先需要考虑的是jvm的内存大小参数, 这个在启动jvm进程的时候配置。

除此之外,有几个控制channel的关键参数

 
 "transport": {
            "channel": {
                "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
                "speed": {
                    "byte": -1,
                    "record": -1
                },
                "flowControlInterval": 20,
                "capacity": 512,
                "byteCapacity": 67108864
            },
            "exchanger": {
                "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
                "bufferSize": 32
            }
        },

以上配置位于conf/core.json中:capacity限制了channel中队列的大小(也就是最多缓存record的个数)byteCapacity限制了record占用的内存大小,core.json中的默认配置是64MB,若不指定将会被配置为8MB

这两个参数决定了每个channel能buffer的记录数量和内存占用情况,如果有需要调整,用户应该按照DataX实际的运行环境予以配置。例如MySQL中每个record都比较大,那么可以考虑适当调高byteCapacity,当然调整这个参数还要考虑机器的内存情况。

bufferSize指定了BufferedRecordExchanger的缓存,reader读了多少个往channel放。

一般情况下,channel队列本身配置的调整并不会很常见,但是对于另外几个流控参数,在使用DataX的时候应该注意。有两个常用的流控参数:

a. byte 限制通道的默认传输速率, -1表示不限制
b. record 限制通道的传输记录数,-1表示不限制

这两个参数都是在flowControlInterval间隔里采样后根据采样值来决定是否流控的。

 
{
    "core": {  #定义了全局的系统参数,不指定会使用默认值
        "transport": {
            "channel": {
                "speed": {     
                    "record": 5000,
                    "byte": 102400
                }
            }
        }
    },


    "job": {
        "setting": {
            "speed": {  #定义了单个channel的控制参数
                "record": 10000,
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                      .....#省略
                },

                "writer": {
                    .....#省略
                }

            }
        ]
    }
}

3.数据写入

适当的提高批量写入的批次大小(batchWriteCount),也可以有效地提高吞吐率。相关关键配置如下:

{
    "job": {
        "setting": {
           ....#省略
        },
        "content": [
            {
                "reader": {
                      .....#省略
                },

                "writer": {
                    "name": "otswriter",
                    "parameter": {
                            .......
                        "writeMode":"UpdateRow",
                        "batchWriteCount":100
                    }
                }

            }
        ]
    }
}

 

标签:配置,splitPk,reader,Datax,mysql,table,DataX,channel,eachTableShouldSplittedNumber
来源: https://www.cnblogs.com/gentlescholar/p/15087878.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有