ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink cdc 使用

2022-08-27 11:00:09  阅读:143  来源: 互联网

标签:cdc org flink https 使用 import com


目前 cdc 产品 非常多 ,目前我使用canal ,flink cdc (集成 debezium) 二者 对比相对来说 flink cdc 更加强大,功能很多 但是 有很多 坑,迭代速度很快,借助flink 分布式计算框架,分布式处理 数据。

1. canal

装个服务端,客户端自己写,当然也提供了一些适配器,我之前是定制 客户端写的组件。

https://github.com/alibaba/canal

官方文档

https://ververica.github.io/flink-cdc-connectors/master/

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/tableapi/

cdc 开发主要就是写 sql (flink sql),借助 flink 各种连接器 ,快速同步数据 sink 到各个地方,确实使用 方便 借助 checkpoint 可以 保证 事务操作 的精确 一次 操作(这个叼)。

flinksql 上手很容易 但是有个大坑,就是 如果 job 多了 很耗 数据库连接 和多次 重复读 bin日志 。

社区文章,阿里的内部解决 也是 合源 合 整库同步。(阿里云 flink cdc 合源,整库同步这种技术未开源,因为他有专门 数据同步服务)

https://flink-learning.org.cn/article/detail/da710dd3cdfb9b430af405725ad27784

腾讯 云 通过 和 source 解决

目前使用 Flink CDC Connector 做数据同步时,每个表都需要建立一个数据库连接,在多表、整库同步等场景下,对数据库实例的压力非常大,Oceanus 引入了多 source 复用的优化来解决这种问题。

https://cloud.tencent.com/document/product/849/76650

3. 不买 云服务 合源 解决方案

flink 1.45 ,flink cdc 2.1

思路 参考

http://www.dlink.top/docs/extend/practice_guide/cdc_kafka_multi_source_merge

flink table kafka

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/debezium/

json 解析 参考 debezium 官网

https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium

4 .代码 实现

  • datastream
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

/**
 * 
 * @Date 2022/8/26 16:04
 */
public class TestCdcKafka {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // enable checkpoint
        String checkpointDir = "file:///D:/checkpoint";

        env.enableCheckpointing(3000);
        env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
        //1.1 设置 CK&状态后端
        //略

        MySqlSource<String> mySqlSource =  MySqlSource.<String>builder()
                .hostname("")
                .port(3306)
                .username("")
                .password("")
                .databaseList("XXSX").tableList()
                //.databaseList()
                //.tableList() //这个注释,就是多库同步
         
                .deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
                //.deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
                .startupOptions(StartupOptions.latest())
                .build();

        //2.通过 FlinkCDC 构建 SourceFunction 并读取数据

        DataStreamSource<String> streamSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "MySQL Source");

        //3.打印数据并将数据写入 Kafka
        streamSource.print();
        String sinkTopic = "testcdc";
        streamSource.addSink(getKafkaProducer("XXX:9092",sinkTopic));

        //4.启动任务
        env.execute("FlinkCDC");



    }

    //kafka 生产者
    public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {
        return new FlinkKafkaProducer<String>(brokers,
                topic,
                new SimpleStringSchema());
    }
}
  • 序列化
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
/**
 * @Author 
 * @Date 2022/8/26 16:14
 */
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {


    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //1.创建 JSON 对象用于存储最终数据
        JSONObject result = new JSONObject();

        //2.获取库名&表名放入 source
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        JSONObject source = new JSONObject();
        source.put("db",database);
        source.put("table",tableName);

        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //5.获取操作类型  CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("insert".equals(type)) {
            type = "c";
        }
        if ("update".equals(type)) {
            type = "u";
        }
        if ("delete".equals(type)) {
            type = "d";
        }
        if ("create".equals(type)) {
            type = "c";
        }

        //6.将字段写入 JSON 对象
        result.put("source", source);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("op", type);

        //7.输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}
  • flink sql
- |
  CREATE TABLE KafkaTable (
   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  `id` BIGINT,
  `agent_id` BIGINT,
  `usable_amount` decimal(20,4)
  ) WITH (
  'connector' = 'kafka',
  'topic' = 'testcdc',
  'properties.bootstrap.servers' = 'XXXXX:9092',
  'properties.group.id' = 'testGroup11',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
  )
- |
  CREATE TABLE t_user_copy(
  id BIGINT,
  agent_id BIGINT,
  usable_amount decimal(20,4),
  PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://XXXXX:9200',
  'index' = 'user_cdc1'
  )
- select * FROM KafkaTable where origin_database='XXXX' and origin_table = 'XXXX'
彩蛋

实时数仓

flink cdc + kafka + doris 成本不高,相比 Hadoop 生态 那一套下来

doris 官网,像 使用 mysql 一样

https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/binlog-load-manual

数仓文章 关于 doris

https://blog.csdn.net/weixin_46141936/article/details/121846412

https://blog.csdn.net/weixin_43320999/article/details/111599512

https://blog.csdn.net/dajiangtai007/article/details/123501210

新东方

https://blog.csdn.net/m0_54252387/article/details/125739846

https://cloud.tencent.com/developer/article/1925453?from=article.detail.1807913

数仓

https://cloud.tencent.com/developer/article/1938194

https://segmentfault.com/a/1190000040686141

https://www.daimajiaoliu.com/daima/7b7448559360801

https://blog.csdn.net/qq_37067752/article/details/107474369

https://tech.meituan.com/2020/04/09/doris-in-meituan-waimai.html

https://developer.aliyun.com/article/985042

https://yangshibiao.blog.csdn.net/article/details/118687344?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-118687344-blog-124237872.pc_relevant_multi_platform_whitelistv1_exp2&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-118687344-blog-124237872.pc_relevant_multi_platform_whitelistv1_e

标签:cdc,org,flink,https,使用,import,com
来源: https://www.cnblogs.com/lyc88/p/16629996.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有