ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Flink实战(八十六):flink-sql使用(十三)Flink 与 hive 结合使用(五)Hive Streaming

2021-06-10 20:05:10  阅读:258  来源: 互联网

标签:Flink streaming flink Hive partition Streaming hive time table


声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。

《2021年最新版大数据面试题全面开启更新》

0 Hive Streaming

A typical hive job is scheduled periodically to execute, so there will be a large delay.

Flink supports to write, read and join the hive table in the form of streaming.

1 Streaming Writing

The Hive table supports streaming writes, based on Filesystem Streaming Sink.

The Hive Streaming Sink re-use Filesystem Streaming Sink to integrate Hadoop OutputFormat/RecordWriter to streaming writing. Hadoop RecordWriters are Bulk-encoded Formats, Bulk Formats rolls files on every checkpoint.

By default, now only have renaming committer, this means S3 filesystem can not supports exactly-once, if you want to use Hive streaming sink in S3 filesystem, You can configure the following parameter to false to use Flink native writers (only work for parquet and orc) in TableConfig (note that these parameters affect all sinks of the job):

Key Default Type Description

table.exec.hive.fallback-mapred-writer

true Boolean If it is false, using flink native writer to write parquet and orc files; if it is true, using hadoop mapred record writer to write parquet and orc files.

The below shows how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit, and runs a batch query to read that data back out.

复制代码

SET table.sql-dialect=hive;
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);

-- streaming sql, insert into hive table
INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

复制代码

2 Streaming Reading

To improve the real-time performance of hive reading, Flink support real-time Hive table stream read:

  • Partition table, monitor the generation of partition, and read the new partition incrementally.
  • Non-partition table, monitor the generation of new files in the folder, and read new files incrementally.

You can even use the 10 minute level partition strategy, and use Flink’s Hive streaming reading and Hive streaming writing to greatly improve the real-time performance of Hive data warehouse to quasi real-time minute level.

Key Default Type Description

streaming-source.enable

false Boolean Enable streaming source or not. NOTES: Please make sure that each partition/file should be written atomically, otherwise the reader may get incomplete data.

streaming-source.monitor-interval

1 m Duration Time interval for consecutively monitoring partition/file.

streaming-source.consume-order

create-time String The consume order of streaming source, support create-time and partition-time. create-time compare partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file modification time in filesystem; partition-time compare time represented by partition name, if the partition folder somehow gets updated, e.g. add new file into folder, it can affect how the data is consumed. For non-partition table, this value should always be 'create-time'.

streaming-source.consume-start-offset

1970-00-00 String Start offset for streaming consuming. How to parse and compare offsets depends on your order. For create-time and partition-time, should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use partition time extractor to extract time from partition.

Note:

  • Monitor strategy is to scan all directories/files in location path now. If there are too many partitions, there will be performance problems.
  • Streaming reading for non-partitioned requires that each file should be put atomically into the target directory.
  • Streaming reading for partitioned requires that each partition should be add atomically in the view of hive metastore. This means that new data added to an existing partition won’t be consumed.
  • Streaming reading not support watermark grammar in Flink DDL. So it can not be used for window operators.

The below shows how to read Hive table incrementally.

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

3 Hive Table As Temporal Tables

You can use a Hive table as temporal table and join streaming data with it. Please follow the example to find out how to join a temporal table.

When performing the join, the Hive table will be cached in TM memory and each record from the stream is looked up in the Hive table to decide whether a match is found. You don’t need any extra settings to use a Hive table as temporal table. But optionally, you can configure the TTL of the Hive table cache with the following property. After the cache expires, the Hive table will be scanned again to load the latest data.

Key Default Type Description

lookup.join.cache.ttl

60 min Duration The cache TTL (e.g. 10min) for the build table in lookup join. By default the TTL is 60 minutes.

Note:

  1. Each joining subtask needs to keep its own cache of the Hive table. Please make sure the Hive table can fit into the memory of a TM task slot.
  2. You should set a relatively large value for lookup.join.cache.ttl. You’ll probably have performance issue if your Hive table needs to be updated and reloaded too frequently.
  3. Currently we simply load the whole Hive table whenever the cache needs refreshing. There’s no way to differentiate new data from the old.

标签:Flink,streaming,flink,Hive,partition,Streaming,hive,time,table
来源: https://blog.51cto.com/u_9928699/2892526

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有