ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

canal同步mysql实战

2022-08-18 15:32:07  阅读:154  来源: 互联网

标签:canal 实战 同步 kafka XX master mysql root


环境

mysql 5.6.41

canal 1.15

1.16测试过后,一直报错canal_config表不存在,更换版本后正常

目的 : 同步一个数据库中的二个表

1、创建表

CREATE TABLE `user01` (
`id` int(64) NOT NULL AUTO_INCREMENT,
`username` varchar(64) DEFAULT NULL,
`password` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4

 

CREATE TABLE `user02` (
`id` int(64) NOT NULL AUTO_INCREMENT,
`username` varchar(64) DEFAULT NULL,
`password` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4

 

 

 2下载安装包

服务端

https://github.com/alibaba/canal/releases

下载deploy的server包,还有adapter的client包

 

 直接解压,我的配置文件如下

 

最重要的是

canal.instance.master.address=172.20.70.34:3308

canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

canal.instance.filter.regex=hx_erp\\.user01

sh bin/startup.sh启动

其中看服务器内存资源,我这里是修改了XMS和xmx,xmn为256,减少内存消耗

JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"

 

3客户端

 

[root@master canaladapter]# cat conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null

canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

srcDataSources:
defaultDS:
url: jdbc:mysql://172.20.70.34:3308/hx_erp?useUnicode=true
username: xxx
password: xxxx
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://172.20.70.34:3308/hx_erp?useUnicode=true
jdbc.username: xxx
jdbc.password: xxx

 

其中最重要的是src和outerAdapters源和目标数据库的url账号密码

注意点:

  1. 其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数
  2. adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件

RDB文件

[root@master canaladapter]# cat conf/rdb/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: hx_erp
table: user01
targetTable: user02
targetPk:
id: id
mapAll: true
# targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小

 

从user01表同步到user02表

 

JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"

 

同样修改启动脚本

echo CLASSPATH :$CLASSPATH
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication 1>>/dev/null 2>&1 &
echo $! > $base/bin/adapter.pid

echo "cd to $current_path for continue"
cd $current_path
[root@master canaladapter]# ./bin/startup.sh

 

 同事存在客户端和客户端

查看服务端日志

 

 这样就是正常的

客户端日志

 

 

现在来验证下,想user01表插入数据

 

 

查看客户端日志

 

 这样就是成功的。

 

现在我们测试下,如果服务端挂了,期间user01有DDL,服务端启动以后,user02是否会同步

停止服务

插入数据

 

 

 

 

启动服务

[root@master canalserver]# ./bin/startup.sh

结果是没有自动同步过去

客户端还在报错

重启客户端

[root@master canaladapter]# ./bin/restart.sh

 

 

 

 

现在可以了,自动同步OK了

这就表示,即使服务端因为异常挂掉了,重启服务端和客户端也可以自动同步

canal也可以同步restfulapi的方式进行管理

查询所有同步实例状态

[root@master canalserver]# curl http://127.0.0.1:8081/destinations
[{"destination":"example","status":"on"}][root@master canalserver]#

 

curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT

针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启

注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关

查询单个实例同步状态

curl http://127.0.0.1:8081/syncSwitch/example

查询key为mysql1的rdb实例的数据量

[root@master canalserver]# curl http://127.0.0.1:8081/count/rdb/mysql1/mytest_user.yml
{"count":3,"targetTable":"`user02`"}[root@master canalserver]

手动全量同步

[root@master canalserver]# curl http://127.0.0.1:8081/etl/rdb/mysql1/mytest_user.yml -X POST
{"succeeded":true,"resultMessage":"导入RDB 数据:4 条"}[root@master canalserver]#

如果出现上述canalserver服务端挂掉了,也可以手动全量同步来处理一遍

 

标签:canal,实战,同步,kafka,XX,master,mysql,root
来源: https://www.cnblogs.com/whitelittle/p/16598857.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有