ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink 读取 CSV 文件,并将 DataStream 转 Table 对象

2020-09-18 02:33:14  阅读:1144  来源: 互联网

标签:DataStream import flink streaming api org apache Table


package com.myflink

import java.lang.reflect.Field
import java.util

import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.java.io.{PojoCsvInputFormat, RowCsvInputFormat}
import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Tumble
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

object Main {

  def main(args: Array[String]): Unit = {
    import org.apache.flink.streaming.api.scala._

    val env = StreamExecutionEnvironment.getExecutionEnvironment;
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    val senSor = SenSor("a", 0L, 0.2);
    val lst = new util.ArrayList[PojoField]();
    val arrFields: Array[Field] = senSor.getClass.getDeclaredFields;
    for (field <- arrFields) {
      lst.add(new PojoField(field, TypeInformation.of(field.getType)));
    }

    val path = new Path("D:\\allspace\\flink0906\\src\\main\\resources\\input\\test.csv");
    val ds: DataStream[SenSor] = env
      .createInput(new PojoCsvInputFormat(path, new PojoTypeInfo(classOf[SenSor], lst)))
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SenSor](Time.seconds(10)) {
        override def extractTimestamp(element: SenSor): Long = element.timestamp * 1000
      });

    val tableEnv = StreamTableEnvironment.create(env);
    val table = tableEnv.fromDataStream(ds, 'id, 'timestamp.rowtime, 'uempearture);
    table.window(Tumble over 10.seconds on 'timestamp as 'tw)
      .groupBy('id, 'tw)
      .select('id, 'id.count, 'uempearture.avg, 'tw.end)
      .toRetractStream[Row].print()


    env.execute();
  }

}

case class SenSor(var id: String, var timestamp: Long, var uempearture: Double) {};

  这里面坑:

SenSor 对象的属性,在构造 PojoTypeInfo 时按照名字重排序,这直接造成对 csv 文件解析出错。

csv文件内容:
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4

flink版本为 1.10.1




标签:DataStream,import,flink,streaming,api,org,apache,Table
来源: https://www.cnblogs.com/wudeyun/p/13688745.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有