标签:watermark -- px Flink qty offer id security DECIMAL
1. Flink 设置watermak
这里说下这个时间时间的取值,本来我kafka的数据是clickhouse 查询时间特意处理成时间戳。然后使用 TO_TIMESTAMP(date_time) 来设置watermark。 阿里云官网 blink 是支持的,但是这个实际中却不支持。
真的有点狗了。。。。
解决办法如下写法。
public static final String SOURCE_KAFKA_SNAPSHOT = "CREATE TABLE tableName (\n" +
"`date_time` BIGINT ,\n" +
"`hs_security_id` VARCHAR ,\n" +
"`security_id` VARCHAR ,\n" +
"`pre_close_px` DECIMAL,\n" +
"`open_px` DECIMAL,\n" +
"`high_px` DECIMAL ,\n" +
"`low_px` DECIMAL,\n" +
"`last_px` DECIMAL,\n" +
"`num_trades` DECIMAL,\n" +
"`volume` BIGINT,\n" +
"`amount` DECIMAL,\n" +
"`phase_code` BIGINT,\n" +
"bid_price VARCHAR,\n" +
"bid_qty VARCHAR,\n" +
"offer_price VARCHAR,\n" +
"offer_qty VARCHAR,\n" +
"ts AS TO_TIMESTAMP(FROM_UNIXTIME(date_time / 1000, 'yyyy-MM-dd HH:mm:ss'))," +
" WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" +
")WITH (\n" +
" 'connector' = 'kafka', \n" +
" 'topic'='xxx',\n" +
" 'properties.bootstrap.servers' = 'xxx.xxx.xx.xx:9092', \n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
"'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'" +
")";
2.设置开窗
public class OfflineDataAggregationTableApi implements Serializable {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
String sourceDDL = CustomTable.SOURCE_KAFKA_SNAPSHOT;
// String sinkDDL = CustomTable.SNAPSHOT_PRINT;
//注册source和sink
tableEnv.executeSql(sourceDDL);
// tableEnv.executeSql(sinkDDL);
Table sourceTable = tableEnv.from("snapshot");
Table timeTable = tableEnv.sqlQuery("select \n" +
"TUMBLE_START(ts, INTERVAL '15' SECOND), \n" +
" hs_security_id,\n" +
" security_id,\n" +
" MAX(pre_close_px) as pre_close_px, \n" +
" MAX(open_px) as open_px, \n" +
" MAX(high_px) as high_px, \n" +
" FIRST_VALUE(phase_code) as phase_code, \n" +
" FIRST_VALUE(bid_price) as bid_price, \n" +
" FIRST_VALUE(bid_qty) as bid_qty, \n" +
" FIRST_VALUE(offer_price) as offer_price, \n" +
" FIRST_VALUE(offer_qty) as offer_qty \n" +
" from " +
sourceTable
+ " group by TUMBLE(ts, INTERVAL '15' SECOND),hs_security_id,security_id");
TableResult tableResult = tableEnv.executeSql(" select * from " + timeTable);
tableResult.print();
env.execute("快照数据读取");
}
}
标签:watermark,--,px,Flink,qty,offer,id,security,DECIMAL 来源: https://blog.csdn.net/weixin_43975771/article/details/120852044
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。