ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Canal增量同步Mysql数据到ES

2022-02-11 10:31:12  阅读:248  来源: 互联网

标签:Canal canal jdbc name kafka es key Mysql ES


对应版本: Mysql 5.7 Es 7.4 Canal 1.1.5

常见报错原因

https://blog.csdn.net/qq_24950043/article/details/122463372

下载如下:https://github.com/alibaba/canal/releases

在这里插入图片描述

修改deployer配置文件

上面配置过不需要配置

修改adapter配置文件

修改/conf/application.yml

server:
  port: 8081 #adapter 服务端口
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp # tcp - canal server读取模式, kafka rocketMQ
  flatMessage: true
  zookeeperHosts:  # adapter集群环境配置,主备锁。
  syncBatchSize: 1 #批处理条数
  retries: 0 #重试次数
  timeout:  #同步超时
  accessKey:
  secretKey:
  consumerProperties: # 消息服务消费端地址,根据mode读取
    # canal tcp consumer
    canal.tcp.server.host: {canalServer}:11111 # cancal地址
    canal.tcp.zookeeper.hosts: # 如果配置了canalServerHost, 则以canalServerHost为准
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:
 
  srcDataSources:
    defaultDS: #支持多数据源
      url: jdbc:mysql://{ip}:3306/test?useUnicode=true
      username: canal
      password: canal
  canalAdapters: # 同步适配器列表,同步到哪各数据源
  - instance: example # canal 实例名或者 MQ topic 名
    groups: # 适配器组,支持多个不同入库数据源
    - groupId: g1 # 分组id, 如果是MQ模式将用到该值
      outerAdapters: # 分组内适配器列表
      - name: logger # 打印获取到的消息数据日志
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es7 #用于匹配Adapter,该名称对应数据源文件名称
        key: es-user #必须有key,否则是比es里面的配置
        hosts: {ip}:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # 9300对应transport or 9200对应rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

配置数据源

在这里插入图片描述

修改yml文件

dataSourceKey: defaultDS #源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: es-user # 对应application.yml中es配置的key
destination: example # cannal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: mytest_user # es 的索引名称
  _type: _doc # es 的type名称, es7下无需配置此项
  _id: id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  upsert: true # 支持不存在新增操作
#  pk: id
  sql: "select t.id,t.username,t.phone from t_user t" #需要更新的所有字段都需写上
#  objFields:
#    _labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
  etlCondition: "where a.c_time>={}" # etl 的条件参数,通过服务的etl接口传入。

修改完成启动报错

 java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

原因:common和escore的druid包冲突。

需要修改源码解决,打开刚刚下载的源码

1, 修改canal-canal-1.1.5\client-adapter\escore\pom.xml
在这里插入图片描述

2, 修改canal-canal-1.1.5\pom.xml

添加 true 主要是打包跳过测试,避免报错

		    <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${maven-surefire.version}</version>
                <configuration>
                    <useSystemClassLoader>true</useSystemClassLoader>
                    <forkMode>once</forkMode>
                    <argLine>${argline} ${jacocoArgLine}</argLine>
                    <systemProperties>
                        <!-- common shared -->
                    </systemProperties>
                    <skipTests>true</skipTests>    <!--默认关掉单元测试 -->
                </configuration>
            </plugin>

Maven重新打包

在这里插入图片描述

执行完后用canal-canal-1.1.5\client-adapter\es7x\target\client-adapter.es7x-1.1.5-jar-with-dependencies.jar下面包替换conf/plugins/client-adapter.es7x-1.1.5-jar-with-dependencies.jar
之后重启

重启之后ES可能会报错。

NoNodeAvailableException[None of the configured nodes are available: 

修改ES的elasticsearch.yml文件:

在这里插入图片描述

注意:cluster.name 与 adapter下 /conf/application.yml文件的cluster.name名称相同

标签:Canal,canal,jdbc,name,kafka,es,key,Mysql,ES
来源: https://blog.csdn.net/qq_36971119/article/details/122875022

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有