ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

DataX全量和增量mysqltomysql

2022-02-20 19:00:26  阅读:193  来源: 互联网

标签:mysqltomysql mysql job datax time table DataX where 全量


全量mysqltomysql

进入目录编写json

cd /usr/local/datax/job
vi zabbixmysql2mysql.json

写入的表结构要和reader的表结构一样,先建立好
编写json文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "test",
                        "password": "123",
                        "column": [
                            "itemid",
                            "clock",
                            "timestamp",
                            "source",
                            "severity",
                            "value",
                            "logeventid",
                            "ns" 
                        ],
                        "splitPk": "itemid",
                        "connection": [
                            {
                                "table": [
                                    "history_log"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://172.16.3.89:3306/zabbix"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "test",
                        "password": "123",
                        "column": [
                            "itemid",
                            "clock",
                            "timestamp",
                            "source",
                            "severity",
                            "value",
                            "logeventid",
                            "ns"    
                        ],
                        "preSql": [
                            "truncate history_log_copy1"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://172.16.3.89:3306/chenzhenhua2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "history_log_copy1"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

注意:“writeMode”: “insert”,也可以为update,update更加稳妥一点
“preSql”: [ “truncate history_log_copy1” ], 在写入前提前清空表清空表

如果写入的数据库为mysql8以上版本,必须修改mysql-connector-java的插件

cd /usr/local/datax/plugin/writer/mysqlwriter/libs
mv mysql-connector-java-5.1.34.jar mysql-connector-java-5.1.34.jar-bak

我这边上传的为mysql-connector-java-8.0.16.jar,下载地址https://static.runoob.com/download/mysql-connector-java-8.0.16.jar

增量同步

Datax需要解决的另一个难题在于增量更新。

首先需要说明, Datax本身在大部分reader插件中提供了where配置项,用于做增量更新。例如mysqlerader md文件说明如下:

* **where**

	* 描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。<br />

          where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。

	* 必选:否 <br />

	* 默认值:无 <br />

* **querySql**

	* 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id <br />

	 `当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置`,querySql优先级大于table、column、where选项。

	* 必选:否 <br />

	* 默认值:无 <br />

示例:
新建json

vi  new.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123",
                        "where": "created_at > FROM_UNIXTIME(${create_time}) and created_at  < FROM_UNIXTIME(${end_time})",
                        "column": [
                            "id",
                            "rpt_date",
                            "rpt_hour",
                            "unit_id",
                            "build_id",
                            "num",
                            "run_state",
                            "created_at"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "rpt_warning_hour"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://172.16.5.11:3306/smart_fire"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "update",
                        "username": "test",
                        "password": "123",
                        "column": [
                            "id",
                            "rpt_date",
                            "rpt_hour",
                            "unit_id",
                            "build_id",
                            "num",
                            "run_state",
                            "created_at"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://172.16.3.89:3306/chenzhenhua2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "rpt_warning_hour"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

上面需要注意的事情为FROM_UNIXTIME将表里面的时间格式转换为时间戳格式,如果表里默认为时间戳不需要转换。
${…}就是将变量传入,上次更新{create_time}上次更新时间,{end_time}为现在本地时间。

然后再编写一个python脚本可以将参数传入json即可,vi dataxScheduler.py

import time,os,sys

print "going to execute"

configFilePath = sys.argv[1]
logFilePath = sys.argv[2]
lastTimeExecuteRecord = sys.argv[3]
lastExecuteTime=""

try:
    fo = open(lastTimeExecuteRecord, "r")
    lastExecuteTime = fo.read()
    print lastExecuteTime
except IOError:
    lastExecuteTime = int(1)
lastExecuteTime = int(lastExecuteTime)

print("last time execute time:  " + str(lastExecuteTime))

currentTime = int(time.time())
print("currentTime is        :"+ str(currentTime))


#os.system("python /usr/local/datax/bin/datax.py " + configFilePath + " --lastTime" +  lastExecuteTime + " --currentTime" + currentTime + " >> " + logFilePath)

script2execute  = "python /usr/local/datax/bin/datax.py %s -p \"-Dcreate_time=%s -Dend_time=%s\" >> %s"%(configFilePath,lastExecuteTime,currentTime,logFilePath)
print("to be excute script:"+script2execute)
os.system(script2execute)

print("script execute ending")

# update timestamp to file
fo = open(lastTimeExecuteRecord, "w+")
fo.write(str(currentTime))
fo.close()

print("ending---",lastTimeExecuteRecord)

运行

python /usr/local/datax/job/dataxScheduler.py  '/usr/local/datax/job/new.json'  '/usr/local/datax/job/test_job.log'   '/usr/local/datax/job/test_job.record'

测试,增加数据后再次运行,数据对应增加了,加入到定时任务执行即可完成增量同步。
但这个写脚本的方式还是非常笨拙的,下一篇介绍的datax-web会更好的去解决增量同步的问题。

标签:mysqltomysql,mysql,job,datax,time,table,DataX,where,全量
来源: https://blog.csdn.net/jy8655790/article/details/123025019

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有