ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink流处理-Task之ElectricFenceTask

2021-11-08 01:02:42  阅读:180  来源: 互联网

标签:flink Task streaming Flink 电子 围栏 ElectricFenceTask import 数据


ElectricFenceTask

package pers.aishuang.flink.streaming.task;

import com.mysql.jdbc.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import pers.aishuang.flink.streaming.entity.ElectricFenceModel;
import pers.aishuang.flink.streaming.entity.ElectricFenceResultTmp;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.function.window.ElectricFenceModelFunction;
import pers.aishuang.flink.streaming.function.flatmap.ElectricFenceRulesFunction;
import pers.aishuang.flink.streaming.function.window.ElectricFenceWindowFunction;
import pers.aishuang.flink.streaming.sink.mysql.ElectricFenceMysqlSink;
import pers.aishuang.flink.streaming.source.mysql.MySQLElectricFenceSource;
import pers.aishuang.flink.streaming.source.mysql.MysqlElectricFenceResultSource;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;


import java.util.HashMap;

/**
 * 电子围栏,用于判断实时车辆上报数据和电子围栏的规则静态数据进行关联,
 * 分析车辆在指定的电子栅栏的信息和进入栅栏和出栅栏的时间,并将分析结果入库
 * 分析步骤:
 *  1、电子围栏分析任务设置、原始数据json解析、过滤异常数据
 *  2、读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)
 *  3、原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream<ElectricFenceModel>)
 *  4、创建90s滚动窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)
 *  5、读取电子围栏分析结果表数据并广播
 *  6、滚动窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect
 *  7、对电子围栏对象模型,添加uuid和inMysql(车辆是否已存在musql表中)
 *  8、电子围栏分析结果数据落地mysql
 * ===最终在数据库中的结果是一条被记录在册的车辆出入电子围栏信息
 */
public class ElectricFenceTask extends BaseTask{
    public static void main(String[] args) {
        //1、电子围栏分析任务设置、原始数据json解析、过滤异常数据
        StreamExecutionEnvironment env = getEnv(ElectricFenceTask.class.getSimpleName());
        DataStreamSource<String> kafkaStream = getKafkaStream(
                env,
                "__consumer_electricFence_",
                SimpleStringSchema.class);
        //json解析:原始json数据 --> ItcastDataObj对象
        //异常数据过滤:无vin,无终端时间,数据格式不是完整json格式,满足其一都是异常数据,而异常数据在解析时,errorData字段有数据。
        DataStream<ItcastDataObj> source = kafkaStream.map(JsonParseUtil::parseJsonToObject)
                .filter(obj -> StringUtils.isNullOrEmpty(obj.getErrorData()));

        //2、读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)
        DataStream<HashMap<String, ElectricFenceResultTmp>> broadcastStream = env
                .addSource(new MySQLElectricFenceSource()).broadcast();
        //3、原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream<ElectricFenceModel>)
        //Javabean对象 <---> HashMap集合(每个元素组成是 String:Javabean对象)
        DataStream<ElectricFenceModel> electricFenceWidthStream = source.connect(broadcastStream)
                .flatMap(new ElectricFenceRulesFunction());

        //4、创建90s滚动窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)
        SingleOutputStreamOperator<ElectricFenceModel> electricFenceWindowStream = electricFenceWidthStream
                //指定哪个成员是事件事件和添加水印(允许乱序时间3秒)
                .assignTimestampsAndWatermarks(
                        new BoundedOutOfOrdernessTimestampExtractor<ElectricFenceModel>(Time.seconds(3)) {
                            @Override
                            public long extractTimestamp(ElectricFenceModel element) {
                                return element.getTerminalTimestamp();
                            }
                        }
                )
                //指定分组字段
                .keyBy(obj -> obj.getVin())
                //指定窗口类型
                .window(TumblingEventTimeWindows.of(Time.seconds(90)))
                //应用窗口函数(使用了MapState状态保存)
                //-- 经过这个窗口后 ,一个窗口输出数据,要么没有,要么只有一条进栅栏数据,要是只有一条出栅栏数据
                .apply(new ElectricFenceWindowFunction());

        //5、读取电子围栏分析结果表数据并广播 (每1s来一条结果数据)
        //HashMap里有多条数据,每个vin只有一条数据
        DataStream<HashMap<String, Integer>> electricFenceResultStream = env
                .addSource(new MysqlElectricFenceResultSource()).broadcast();
        //6、滚动窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect
        electricFenceWindowStream.connect(electricFenceResultStream)
                //7、对电子围栏对象模型,添加uuid和inMysql(车辆是否已存在musql表中)
                .flatMap(new ElectricFenceModelFunction())
                //8、电子围栏分析结果数据落地mysql,也可以选择落地mong0
                .addSink(new ElectricFenceMysqlSink());
        //9. 触发执行
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

标签:flink,Task,streaming,Flink,电子,围栏,ElectricFenceTask,import,数据
来源: https://www.cnblogs.com/zi-shuo/p/15522493.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有