ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink流处理-Task之OnlineStatisticsTask

2021-11-08 01:03:18  阅读:136  来源: 互联网

标签:pers flink Task OnlineStatisticsTask Flink aishuang streaming import 数据


OnlineStatisticsTask

package pers.aishuang.flink.streaming.task;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.async.AsyncHttpQueryFunction;
import pers.aishuang.flink.streaming.entity.ItcastDataPartObj;
import pers.aishuang.flink.streaming.entity.OnlineDataObj;
import pers.aishuang.flink.streaming.entity.VehicleInfoModel;
import pers.aishuang.flink.streaming.function.flatmap.VehicleInfoMapMysqlFunction;
import pers.aishuang.flink.streaming.function.map.LocactionInfoReidsFunction;
import pers.aishuang.flink.streaming.function.window.OnlineStatisticsWindowFunction;
import pers.aishuang.flink.streaming.sink.mysql.OnlineStatisticsMysqlSink;
import pers.aishuang.flink.streaming.source.mysql.VehicleInfoMysqlSource;
import pers.aishuang.flink.streaming.utils.JsonParsePartUtil;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;

/**

  • 实现车辆的实时上报故障诊断业务分析
  • 1、读取车辆的数据,将jsob字符串转换成对象
  • 2、读取出来正确的数据
  • 3、将车辆的数据通过地理位置(经纬度)去redis中拉取(geoHash算法)
  • -- 如果拉取数据成功,直接封装成对象
  • -- 如果拉取省市区地理位置失败,异步数据流读取高德API请求地理位置并将数据保存到redis中
  • 4、将从redis和高德API拉宽的数据进行合并处理
  • 5、使用窗口操作,比如30s统计一些窗口内的故障告警对象返回
  • 6、读取mysql数据库中的车辆静态数据,车辆车型车系,销售时间等
  • 7、窗口数据和静态数据进行connect并flatMap,拉宽数据
  • 8、将数据写入到mysql中
  • 9、执行任务流环境

*/
public class OnlineStatisticsTask extends BaseTask{
private static final Logger logger = LoggerFactory.getLogger(OnlineStatisticsTask.class);

public static void main(String[] args) throws Exception{
    //1. 初始化Flink流处理的执行环境(事件时间、checkpoint、hadoop name)
    StreamExecutionEnvironment env = getEnv(OnlineStatisticsTask.class.getSimpleName());
    //2. 接入kafka数据源,消费kafka数据
    DataStreamSource<String> kafkaStream = getKafkaStream(
                env,
                "__consumer_online_alarm_analysis_",
                SimpleStringSchema.class);
    //3. 将消费到的json字符串转换成ItcastDataPartObj对象
    DataStream<ItcastDataPartObj> source = kafkaStream
            .map(JsonParsePartUtil::parseJsonToObject)
            //4. 过滤掉异常数据,根据errorDara属性判断(没有VIN号和终端时间 和json解析失败的数据都视为异常数据)
            .filter(obj -> StringUtils.isEmpty(obj.getErrorData()));
    //5. 读取redis中的位置数据<geohash,VehicleLocationModel> ,生成新的数据流
    SingleOutputStreamOperator<ItcastDataPartObj> itcastDataMapStream = source.map(new LocactionInfoReidsFunction());
    //6. 过滤出 redis拉宽成功的地理位置数据
    SingleOutputStreamOperator<ItcastDataPartObj> okWithLocationStream = itcastDataMapStream
            .filter(obj -> StringUtils.isNotEmpty(obj.getProvince()));
    //7. 过滤出 redis拉框失败的地理位置数据
    SingleOutputStreamOperator<ItcastDataPartObj> ngWithLocationStream = itcastDataMapStream
            .filter(obj -> StringUtils.isEmpty(obj.getProvince()));
    //8. 对redis拉框失败的地理位置数据使用异步IO访问高德地图地理位置查询地理位置信息,并将返回结果写入到reids中
    //-- 异步数据流 :处理之后的数据(成功补齐数据和失败的ItcastDataPartObj)
    //-- 存在问题,http请求失败的数据还在里面,仍然缺少坐标详细信息
    SingleOutputStreamOperator<ItcastDataPartObj> withLocationAsyncStream = AsyncDataStream
            //无序返回(可设置返回是否有序,先访问先返回,后访问后返回,设置有序会造成效率低,所以设置为无序)
            .unorderedWait(
                    ngWithLocationStream,
                    new AsyncHttpQueryFunction(),
                    3000, //设置超时时间,超过设定时间,认为任务请求失败,3000ms=》 3s
                    TimeUnit.MICROSECONDS //超时单位
            );

    //9. 将redis拉宽的地理位置数据与高德API拉宽的地理位置数据进行上下合并(合流)
    //flatmap(FlatMap) / map(Map) 用于单流
    // broadcast + connect + flatmap(CoFlatMap)/map(CoMap) 数据拉宽,主要用于两流的数据左右合并(不要求两流的数据类型一致)
    //              union 数据数据上下合并,要求数据类型一致。
    //FlatMap 和 Map是用于单流的,CoFlatMap和CoMap是用于两条流连接(co:connect)
    WindowedStream<ItcastDataPartObj, String, TimeWindow> windowStream = okWithLocationStream
            .union(withLocationAsyncStream)
            //10. 创建原始数据的30s的滚动窗口,根据vin进行分流操作
            .assignTimestampsAndWatermarks(
                    //水印乱序时间设为3s
                    new BoundedOutOfOrdernessTimestampExtractor<ItcastDataPartObj>(Time.seconds(3)) {
                        @Override
                        public long extractTimestamp(ItcastDataPartObj element) {
                            //指定JavaBean中某个字段数据作为事件时间,必须是long类型
                            return element.getTerminalTimeStamp();
                        }
                    }
            )
            //设置分组,指定JavaBean的vin字段作为分组字段
            .keyBy(obj -> obj.getVin())
            //设置窗口类型:为滚动事件时间窗口,并设置窗口大小
            .window(TumblingEventTimeWindows.of(Time.seconds(30)));
    //11. 对原始数据的窗口流数据进行实时故障分析(区分出来告警数据和非告警数据19个告警字段)
    SingleOutputStreamOperator<OnlineDataObj> onlineStatisticsStream = windowStream
            .apply(new OnlineStatisticsWindowFunction());
    //12. 加载业务中间表(7张表:车辆表、车辆类型表、车辆销售记录表、车辆用途表4张),并进行广播
    DataStream<HashMap<String, VehicleInfoModel>> vehicleInfoBroadcastStream = env
            .addSource(new VehicleInfoMysqlSource()).broadcast();
    //13. 将第11步和第12步的广播流结果进行关联,并应用拉宽操作。
    //上报车辆不在库记载的直接丢了
    SingleOutputStreamOperator<OnlineDataObj> result = onlineStatisticsStream
            .connect(vehicleInfoBroadcastStream)
            .flatMap(new VehicleInfoMapMysqlFunction());
    //14. 将拉框后的结果数据写入到mysql数据库中
    result.addSink(new OnlineStatisticsMysqlSink());

    //15. 启动作业(触发执行)
    env.execute(OnlineStatisticsTask.class.getSimpleName());
}

}

标签:pers,flink,Task,OnlineStatisticsTask,Flink,aishuang,streaming,import,数据
来源: https://www.cnblogs.com/zi-shuo/p/15522489.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有