ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Flink--Table Api 和 sql 之 watermark开窗间属性(二)

2021-10-19 19:03:25  阅读:292  来源: 互联网

标签:watermark -- px Flink qty offer id security DECIMAL


1. Flink 设置watermak

这里说下这个时间时间的取值,本来我kafka的数据是clickhouse 查询时间特意处理成时间戳。然后使用 TO_TIMESTAMP(date_time) 来设置watermark。 阿里云官网 blink 是支持的,但是这个实际中却不支持。
真的有点狗了。。。。

解决办法如下写法。

 public static final String SOURCE_KAFKA_SNAPSHOT = "CREATE TABLE  tableName (\n" +
            "`date_time` BIGINT ,\n" +
            "`hs_security_id` VARCHAR ,\n" +
            "`security_id` VARCHAR ,\n" +
            "`pre_close_px` DECIMAL,\n" +
            "`open_px` DECIMAL,\n" +
            "`high_px` DECIMAL ,\n" +
            "`low_px` DECIMAL,\n" +
            "`last_px` DECIMAL,\n" +
            "`num_trades` DECIMAL,\n" +
            "`volume` BIGINT,\n" +
            "`amount` DECIMAL,\n" +
            "`phase_code` BIGINT,\n" +
            "bid_price VARCHAR,\n" +
            "bid_qty VARCHAR,\n" +
            "offer_price VARCHAR,\n" +
            "offer_qty VARCHAR,\n" +
            "ts AS TO_TIMESTAMP(FROM_UNIXTIME(date_time / 1000, 'yyyy-MM-dd HH:mm:ss'))," +
            " WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" +
            ")WITH (\n" +
            "  'connector' = 'kafka', \n" +
            "  'topic'='xxx',\n" +
            "  'properties.bootstrap.servers' = 'xxx.xxx.xx.xx:9092', \n" +
            "  'format' = 'json',\n" +
            "  'scan.startup.mode' = 'earliest-offset',\n" +
            "'json.fail-on-missing-field' = 'false',\n" +
            " 'json.ignore-parse-errors' = 'true'" +
            ")";

2.设置开窗

public class OfflineDataAggregationTableApi implements Serializable {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        String sourceDDL = CustomTable.SOURCE_KAFKA_SNAPSHOT;
//        String sinkDDL = CustomTable.SNAPSHOT_PRINT;
        //注册source和sink
        tableEnv.executeSql(sourceDDL);
//        tableEnv.executeSql(sinkDDL);

        Table sourceTable = tableEnv.from("snapshot");

        Table timeTable = tableEnv.sqlQuery("select \n" +
                "TUMBLE_START(ts, INTERVAL '15' SECOND), \n" +
                " hs_security_id,\n" +
                " security_id,\n" +
                " MAX(pre_close_px) as pre_close_px, \n" +
                " MAX(open_px) as open_px, \n" +
                " MAX(high_px) as high_px, \n" + 
                " FIRST_VALUE(phase_code) as phase_code, \n" +
                " FIRST_VALUE(bid_price) as bid_price, \n" +
                " FIRST_VALUE(bid_qty) as bid_qty, \n" +
                " FIRST_VALUE(offer_price) as offer_price, \n" +
                " FIRST_VALUE(offer_qty) as offer_qty \n" +
                " from " +
                sourceTable
                + " group by TUMBLE(ts, INTERVAL '15' SECOND),hs_security_id,security_id");

        TableResult tableResult = tableEnv.executeSql(" select *  from " + timeTable);
        tableResult.print();
        env.execute("快照数据读取");
    }
}

标签:watermark,--,px,Flink,qty,offer,id,security,DECIMAL
来源: https://blog.csdn.net/weixin_43975771/article/details/120852044

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有