标签:00 01 hudi TIMESTAMP flinksql 写入 1970 t1
flinksql写入hudi
测试环境:
Flink 1.11.1
hudi 0.8.0
Hadoop 3.0.0
Hive 2.1.1
准备工作:
1.安装flink 1.11.1,要下载带hadoop版本的;
2.下载hudi-flink-bundle_2.1?.jar,并放入$FLINK_HOME/lib下。下载地址:
https://repo.maven.apache.org/maven2/org/apache/hudi/hudi-flink-bundle_2.11/
具体实施步骤:
Batch模式:
启动flink sql-client:
bin/sql-client.sh embedded -j lib/hudi-flink-bundle_2.11-0.8.0.jar shell
创建hudi表:
-- 设置一下查询模式为tableau。
set execution.result-mode=tableau;
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://nameservice/hudi/t1',
'table.type' = 'MERGE_ON_READ'
);
插入数据:
-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
查询数据:
-- query from the hudi table
select * from t1;
更新数据:
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
Streaming模式:
建表并插入数据:
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- 这里将 table option read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据;
'read.streaming.start-commit' = '20210316134557', -- opiton read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;
'read.streaming.check-interval' = '4' -- option table.type 设置表类型为 MERGE_ON_READ,目前只有 MERGE_ON_READ 表支持 streaming 读.
);
流式模式查询数据:
-- Then query the table in stream mode
select * from t1;
踩坑:
1.报错如下:
问题解决:flink中有个jar包冲突,不知其他版本会不会有这样的问题。
测试总结:
目前flink写入使用的是同步合并,默认五次提交一合并,导致会有log文件没有生成parquet文件,log文件中的数据暂没有查到,还请大神指教。
标签:00,01,hudi,TIMESTAMP,flinksql,写入,1970,t1 来源: https://blog.csdn.net/weixin_49218925/article/details/115511022
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。