ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink处理--异步IO

2021-12-03 01:33:57  阅读:120  来源: 互联网

标签:Flink -- flink new streaming IO org apache import


async

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ItcastDataPartObj;
import pers.aishuang.flink.streaming.entity.VehicleLocationModel;
import pers.aishuang.flink.streaming.utils.GaoDeMapUtils;
import pers.aishuang.flink.streaming.utils.GeoHashUtil;
import pers.aishuang.flink.streaming.utils.RedisUtil;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * 通过异步请求获取指定经纬度的位置信息,从高德API获取位置数据
 * 将指定vin某个时间的位置数据保存到redis中
 */
public class AsyncHttpQueryFunction extends RichAsyncFunction<ItcastDataPartObj, ItcastDataPartObj> {
    //创建日志打印器
    private static final Logger logger = LoggerFactory.getLogger(AsyncHttpQueryFunction.class);

    //实现读取异步请求的客户端 (可关闭的http异步请求客户端)
    private static CloseableHttpAsyncClient httpAsyncClient = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //设置HttpAsyncClient配置
        RequestConfig config = RequestConfig.custom()
                //-- 设置连接超时时间
                .setConnectTimeout(5000)
                //-- 设置socket超时时间
                .setSocketTimeout(3000)
                .build();
        //初始化异步Http的client
        httpAsyncClient = HttpAsyncClients
                .custom()
                //设置最大连接数量
                .setMaxConnTotal(5)
                .setDefaultRequestConfig(config)
                .build();
        //开启异步http的客户端
        httpAsyncClient.start();
    }

    //实现读取高德API获取位置数据并将位置数据保存到redis中并返回ItcastDataPartObj
    @Override
    public void asyncInvoke(ItcastDataPartObj input, ResultFuture<ItcastDataPartObj> resultFuture) throws Exception {
        //1. 获取当前车辆的经纬度
        Double lng = input.getLng();
        Double lat = input.getLat();
        //2. 通过GaoDeMapUtils工具类根据参数获取请求的url
        String urlByLonLat = GaoDeMapUtils.getUrlByLonLat(lng,lat);
        //3. 创建http get请求对象
        HttpGet httpGet = new HttpGet(urlByLonLat);
        //4. 使用刚创建的http异步客户端执行 http请求对象
        Future<HttpResponse> future = httpAsyncClient.execute(httpGet, null);
        //5. 从执行完成的future中获取数据,返回ItcastDataPartObj对象
        CompletableFuture<ItcastDataPartObj> completableFuture = CompletableFuture.supplyAsync(new Supplier<ItcastDataPartObj>(){
                        //重写get方法
                        //成功时,Redis写入了数据,ItcastDataPartObj的相关字段数据也补齐了。
                        //失败时,什么也不做,原样返回
                        @Override
                        public ItcastDataPartObj get() {
                            try {
                            String country = null;
                            String province = null;
                            String city = null;
                            String district = null;
                            String address = null;
                            //再开个线程自己去拿
                            HttpResponse httpResponse = future.get();

                            //使用future获取到返回的值
                            if(httpResponse.getStatusLine().getStatusCode() == 200 ){
                                HttpEntity entity = httpResponse.getEntity();
                                InputStream contentStream = entity.getContent();
                                //①通过IO流工具类直接生成字符串
                                String content1 = IOUtils.toString(contentStream);

                                //②通过将InputStream转换成输入Reader (转换流:字节流->字符流)
                                InputStreamReader inputStreamReader = new InputStreamReader(contentStream);
                                //--再读取数据流到buffer缓冲区(字符流->高效字符流)
                                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                                final int bufferSize = 1024;
                                final char[] buffer = new char[bufferSize];
                                final StringBuilder out = new StringBuilder();
                                int len;
                                while ((len = bufferedReader.read(buffer)) != -1){
                                    out.append(new String(buffer,0,len));
                                }
                                inputStreamReader.close();
                                bufferedReader.close();
                                String content2 = out.toString();

                                //③Entity工具类
                                String content3 = EntityUtils.toString(entity);

                                //----------------------------
                                //将json字符串转换成对象然后读取出来国家,省、市、区、address
                                JSONObject jsonObject = JSONObject.parseObject(content3);
                                JSONObject regeocode = jsonObject.getJSONObject("regeocode");
                                if(regeocode !=null && regeocode.size() > 0 ){
                                    address = regeocode.getString("formatted_address");
                                    JSONObject addressComponent = regeocode.getJSONObject("addressComponent");
                                    if(addressComponent != null && addressComponent.size() > 0) {
                                        country = addressComponent.getString("country");
                                        province = addressComponent.getString("province");
                                        city = addressComponent.getString("city");
                                        district = addressComponent.getString("district");
                                        //将其封装为VehicleLocationModel 并写入到redis
                                        VehicleLocationModel vehicleLocationModel = new VehicleLocationModel(
                                                country,
                                                province,
                                                city,
                                                district,
                                                address,
                                                lat,
                                                lng
                                        );
                                        //获取geohash值作为存储到redis的key
                                        String geoHash = GeoHashUtil.encode(lat,lng);
                                        RedisUtil.set(
                                                Bytes.toBytes(geoHash), //字节数组 (二进制数据)
                                                vehicleLocationModel.toJsonStringArr()//字节数组(二进制数据)
                                        );
                                        //将当前车辆的位置信息赋值
                                        input.setCountry(country);
                                        input.setProvince(province);
                                        input.setCity(city);
                                        input.setDistrict(district);
                                        input.setAddress(address);
                                    }else{
                                        logger.error("当前解析出来的地理信息为空,请检查");
                                    }
                                }else  {
                                    logger.error("当前解析出来的对象为空,请检查!");
                                }


                            }else {
                                logger.error("当前url请求返回reponse错误!");
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            }
                            return input;
                        }
                    });

        //6. 从future的thenAccept
        completableFuture.thenAccept(new Consumer<ItcastDataPartObj>() {
            //重写accept方法,使用集合中只放一个对象
            @Override
            public void accept(ItcastDataPartObj itcastDataPartObj) {
                resultFuture.complete(Collections.singleton(itcastDataPartObj));
            }
        });
    }

    //超时了怎么处理(如果当前请求超时,打印输出超时日志或告警信息)
    @Override
    public void timeout(ItcastDataPartObj input, ResultFuture<ItcastDataPartObj> resultFuture) throws Exception {
        //超时时间,打印输出异步请求的超时警告
        System.out.println("当前异步请求超时!");
    }

    //关闭当前的http异步请求客户端

    @Override
    public void close() throws Exception {
        if(httpAsyncClient.isRunning()) httpAsyncClient.close();
    }
}

实例

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.async.AsyncHttpQueryFunction;
import pers.aishuang.flink.streaming.entity.ItcastDataPartObj;
import pers.aishuang.flink.streaming.entity.OnlineDataObj;
import pers.aishuang.flink.streaming.entity.VehicleInfoModel;
import pers.aishuang.flink.streaming.function.flatmap.VehicleInfoMapMysqlFunction;
import pers.aishuang.flink.streaming.function.map.LocactionInfoReidsFunction;
import pers.aishuang.flink.streaming.function.window.OnlineStatisticsWindowFunction;
import pers.aishuang.flink.streaming.sink.mysql.OnlineStatisticsMysqlSink;
import pers.aishuang.flink.streaming.source.mysql.VehicleInfoMysqlSource;
import pers.aishuang.flink.streaming.utils.JsonParsePartUtil;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;

/**
 * 实现车辆的实时上报故障诊断业务分析
 * 1、读取车辆的数据,将jsob字符串转换成对象
 * 2、读取出来正确的数据
 * 3、将车辆的数据通过地理位置(经纬度)去redis中拉取(geoHash算法)
 * -- 如果拉取数据成功,直接封装成对象
 * -- 如果拉取省市区地理位置失败,异步数据流读取高德API请求地理位置并将数据保存到redis中
 * 4、将从redis和高德API拉宽的数据进行合并处理
 * 5、使用窗口操作,比如30s统计一些窗口内的故障告警对象返回
 * 6、读取mysql数据库中的车辆静态数据,车辆车型车系,销售时间等
 * 7、窗口数据和静态数据进行connect并flatMap,拉宽数据
 * 8、将数据写入到mysql中
 * 9、执行任务流环境
 *
 */
public class OnlineStatisticsTask extends BaseTask{
    private static final Logger logger = LoggerFactory.getLogger(OnlineStatisticsTask.class);

    public static void main(String[] args) throws Exception{
        //1. 初始化Flink流处理的执行环境(事件时间、checkpoint、hadoop name)
        StreamExecutionEnvironment env = getEnv(OnlineStatisticsTask.class.getSimpleName());
        //2. 接入kafka数据源,消费kafka数据
        DataStreamSource<String> kafkaStream = getKafkaStream(
                    env,
                    "__consumer_online_alarm_analysis_",
                    SimpleStringSchema.class);
        //3. 将消费到的json字符串转换成ItcastDataPartObj对象
        DataStream<ItcastDataPartObj> source = kafkaStream
                .map(JsonParsePartUtil::parseJsonToObject)
                //4. 过滤掉异常数据,根据errorDara属性判断(没有VIN号和终端时间 和json解析失败的数据都视为异常数据)
                .filter(obj -> StringUtils.isEmpty(obj.getErrorData()));
        //5. 读取redis中的位置数据<geohash,VehicleLocationModel> ,生成新的数据流
        SingleOutputStreamOperator<ItcastDataPartObj> itcastDataMapStream = source.map(new LocactionInfoReidsFunction());
        //6. 过滤出 redis拉宽成功的地理位置数据
        SingleOutputStreamOperator<ItcastDataPartObj> okWithLocationStream = itcastDataMapStream
                .filter(obj -> StringUtils.isNotEmpty(obj.getProvince()));
        //7. 过滤出 redis拉框失败的地理位置数据
        SingleOutputStreamOperator<ItcastDataPartObj> ngWithLocationStream = itcastDataMapStream
                .filter(obj -> StringUtils.isEmpty(obj.getProvince()));
        //8. 对redis拉框失败的地理位置数据使用异步IO访问高德地图地理位置查询地理位置信息,并将返回结果写入到reids中
        //-- 异步数据流 :处理之后的数据(成功补齐数据和失败的ItcastDataPartObj)
        //-- 存在问题,http请求失败的数据还在里面,仍然缺少坐标详细信息
        SingleOutputStreamOperator<ItcastDataPartObj> withLocationAsyncStream = AsyncDataStream
                //无序返回(可设置返回是否有序,先访问先返回,后访问后返回,设置有序会造成效率低,所以设置为无序)
                .unorderedWait(
                        ngWithLocationStream,
                        new AsyncHttpQueryFunction(),
                        3000, //设置超时时间,超过设定时间,认为任务请求失败,3000ms=》 3s
                        TimeUnit.MICROSECONDS //超时单位
                );

        //9. 将redis拉宽的地理位置数据与高德API拉宽的地理位置数据进行上下合并(合流)
        //flatmap(FlatMap) / map(Map) 用于单流
        // broadcast + connect + flatmap(CoFlatMap)/map(CoMap) 数据拉宽,主要用于两流的数据左右合并(不要求两流的数据类型一致)
        //              union 数据数据上下合并,要求数据类型一致。
        //FlatMap 和 Map是用于单流的,CoFlatMap和CoMap是用于两条流连接(co:connect)
        WindowedStream<ItcastDataPartObj, String, TimeWindow> windowStream = okWithLocationStream
                .union(withLocationAsyncStream)
                //10. 创建原始数据的30s的滚动窗口,根据vin进行分流操作
                .assignTimestampsAndWatermarks(
                        //水印乱序时间设为3s
                        new BoundedOutOfOrdernessTimestampExtractor<ItcastDataPartObj>(Time.seconds(3)) {
                            @Override
                            public long extractTimestamp(ItcastDataPartObj element) {
                                //指定JavaBean中某个字段数据作为事件时间,必须是long类型
                                return element.getTerminalTimeStamp();
                            }
                        }
                )
                //设置分组,指定JavaBean的vin字段作为分组字段
                .keyBy(obj -> obj.getVin())
                //设置窗口类型:为滚动事件时间窗口,并设置窗口大小
                .window(TumblingEventTimeWindows.of(Time.seconds(30)));
        //11. 对原始数据的窗口流数据进行实时故障分析(区分出来告警数据和非告警数据19个告警字段)
        SingleOutputStreamOperator<OnlineDataObj> onlineStatisticsStream = windowStream
                .apply(new OnlineStatisticsWindowFunction());
        //12. 加载业务中间表(7张表:车辆表、车辆类型表、车辆销售记录表、车辆用途表4张),并进行广播
        DataStream<HashMap<String, VehicleInfoModel>> vehicleInfoBroadcastStream = env
                .addSource(new VehicleInfoMysqlSource()).broadcast();
        //13. 将第11步和第12步的广播流结果进行关联,并应用拉宽操作。
        //上报车辆不在库记载的直接丢了
        SingleOutputStreamOperator<OnlineDataObj> result = onlineStatisticsStream
                .connect(vehicleInfoBroadcastStream)
                .flatMap(new VehicleInfoMapMysqlFunction());
        //14. 将拉框后的结果数据写入到mysql数据库中
        result.addSink(new OnlineStatisticsMysqlSink());

        //15. 启动作业(触发执行)
        env.execute(OnlineStatisticsTask.class.getSimpleName());
    }
}

标签:Flink,--,flink,new,streaming,IO,org,apache,import
来源: https://www.cnblogs.com/zi-shuo/p/15636250.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有