ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink elasticsearch source table 集成elasticsearch-hadoop connector开发

2021-07-01 08:35:29  阅读:224  来源: 互联网

标签:flink dynamic hadoop json elasticsearch table


flink elasticsearch source table 集成 connector开发

flink 官方集成了 connector 的sink 整体上完成度很高

source 这方面比较冷门,没有官方的方案

因为es本身有自已的query/aggs的dsl语法,新版本官方也集成了sql并可以免费应用

source 的方案并不多

因为flink本身有对hadoop的兼容性支持

个人也较深度的使用过elasticsearch-hadoop,大量的读写应用,及一些源码级的feature添加

对elasticsearch-hadoop较为熟悉

完全从无到有开发一个flink的elasticsearch source connector 成本较高

就想能否结合elasticsearch-hadoop 实现es的source

最终验证可行,各方面特性良好,其间也有很多坑,临时代码很多,最后都精简了,保留了最简单的核心代码

开发过程有以下几个坑

1 flink connector 支持两种表,table和dynamic table,目前大量的connector都改用了dynamic table,只保留了很少一部分的table

起初尝试使用dynamic table实现source,但因为dynamic table的一个方法较难重载(方法提供可操作的入口不多),方法要求提供<rowdata,?>参数,而flink的hadoop兼容层,无法提供

因此放弃使用最新的dynamic table,而是用table的方式实现

2 elasticsearch-hadoop 默认可以提供两种outputformt Text和Map,都是hadoop的writable实现类,但是此Map flink却不能直接解析,必须自已把Map转为java的标准map才可用

试过实现,mapWritable到java.map的转换,不完美,基本放弃了使用mapwritable

试过实现Text转json,再转为map,也不完美,但这时想到,flink本身支持一些序列化,包括json,就想能否用flink-json内的方法实现

3 json序列化,简而言之,json序列化是和dynamic table方法深度集成的,加载创建json序列化类时,需要依赖dynamic table 一些方法的上下文,最直接的是context对象,但是因为问题1 ,我是按table来实现,并没有可用的入口

4 查看dynamic table,json序列化相关的代码,比较幸运的,能脱离context,把json序列化工具实例的构造提取出来

json序列化主要供dynamic table使用,dynamic table的行类是rowdata,而因此官方集成的的序列化工具类(包括json)返序列化出来的是个 rowdata对象

5 rowdata对象 table使用异常,因为table需要的是row对象

因此需要实现rowdata 对象到row对象的转换

这是个坑,思维定势了,按以上的步骤一步步来,转换也想着自已来,又是看源码,又是写代码,又是测试的,搞到半夜1点多,总算把rowdata 到row的转换写完了,但这有个缺陷

方式是遍历rowdata的每一个列,对每一列,判断列的实际类型,判断flink的声名类型,做类型转换后再构造新的row出来,但我只转换了json的最外层结构,对嵌套结构只能以String的方法展示,先去休息了

第二天起床转念一想,rowdata和row和转换是应该是比较通用的方法,flink同时支持table/dynamic table, row/rowdata 会不会官方代码内部就有转换代码呢?

一查果真有,而且嵌套解析完美

三行代码完全替换掉了我自已实现的转换类

以上几个大问题,中间阶段还有不少的小问题,比如参数构造,参数传递,参数用途...

本来预期会有此问题,只能做到可用,但多少有些应用受限,意外的感觉完成度还挺高

包装了elasticsearch-hadoop,可用elasticsearch-hadoop的所有特性,并实现了source/table,基于table的sql应用良好

同时结合官方sink实现,同时做读写

对es这么应用有些不轮不类,部分场景低效,但还是有一些应用场景的


https://github.com/cclient/flink-connector-elasticsearch-source

标签:flink,dynamic,hadoop,json,elasticsearch,table
来源: https://www.cnblogs.com/zihunqingxin/p/14957182.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有