ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Flink-Sink(Kafka、Redis、ES、JDBC)

2022-07-01 15:05:52  阅读:151  来源: 互联网

标签:flink val api Flink Redis JDBC org apache import


Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。 stream.addSink(new MySink(xxxx)) 官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。  

5.0 File

package com.zhen.flink.api.sink

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._


/**
  * @Author FengZhen
  * @Date 6/8/22 10:43 PM
  * @Description TODO
  */
object FileSink {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)

    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )

    dataStream.print()
    val outFilePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor_out.txt"
    dataStream.writeAsCsv(outFilePath)

    val outFilePath1 = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor_out_1.txt"
    dataStream.addSink(
      StreamingFileSink.forRowFormat(
        new Path(outFilePath1),
        new SimpleStringEncoder[SensorReading]()
      ).build()
    )

    env.execute("file sink.")
  }

}

5.1 Kafka

package com.zhen.flink.api.sink

import java.util.Properties

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

/**
  * @Author FengZhen
  * @Date 6/11/22 3:20 PM
  * @Description TODO
  */
object KafkaSink {

  def main(args: Array[String]): Unit = {


    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)


    //从kafka读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val streamKafka = env.addSource( new FlinkKafkaConsumer[String](
      "topic_sensor",
      new SimpleStringSchema(),
      properties
    ))

    // 1.先转换成样例数据
    val dataStream: DataStream[String] = streamKafka
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
        }
      )

    dataStream.addSink(
      new FlinkKafkaProducer[String]("localhost:9092", "topic_flink_kafka_sink", new SimpleStringSchema())
    )

    //./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_sensor

    // ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_flink_kafka_sink

    env.execute("kafka sink.")


  }

}
 

5.2 Redis

package com.zhen.flink.api.sink

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

/**
  * @Author FengZhen
  * @Date 6/12/22 8:23 PM
  * @Description TODO
  */
object RedisSink {


  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)

    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )

    // 定义一个FlinkJedisConfigBase
    val conf = new FlinkJedisPoolConfig.Builder()
        .setHost("localhost")
        .setPort(6379)
        .setDatabase(1)
        .build()

    dataStream.addSink( new RedisSink[SensorReading](conf, new MyRedisMapper))

    env.execute("redis sink.")

  }

  // 定义一个redis mapper
  class MyRedisMapper extends RedisMapper[SensorReading]{

    // 定义保存数据写入Redis的命令,HSET 表名 key value
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")
    }


    // 将ID指定位可以
    override def getKeyFromData(t: SensorReading): String =
      t.id

    // 将温度指定为value
    override def getValueFromData(t: SensorReading): String =
      t.temperature.toString
  }

}
 

5.3 Elasticsearch

package com.zhen.flink.api.sink

import java.util

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkBase, ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

/**
  * @Author FengZhen
  * @Date 6/17/22 3:39 PM
  * @Description TODO
  */
object ElasticsearchSinkTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)

    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )


    // 定义HttpHosts
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost", 9200))

    // 自定义写入ES的EsSinkFunction
    val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
      override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {

        // 包装一个map作为DataSource
        val dataSource = new util.HashMap[String, String]()
        dataSource.put("id", element.id)
        dataSource.put("temperature", element.temperature.toString)
        dataSource.put("ts", element.timestamp.toString)

        // 创建index request,用于发送http请求
        val indexRequest = Requests.indexRequest()
          .index("sensor")
          .`type`("reading_data")
          .source(dataSource)

        // 用indexer发送请求
        indexer.add(indexRequest)

      }
    }

    dataStream.addSink(
      new ElasticsearchSink.Builder[SensorReading](httpHosts, myEsSinkFunc)
        .build()
    )
    env.execute("elasticsearch sink.")
  }
}
 

5.4 JDBC自定义sink

package com.zhen.flink.api.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.zhen.flink.api.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._


/**
  * @Author FengZhen
  * @Date 7/1/22 2:21 PM
  * @Description TODO
  */
object JdbcSink {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)

    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )

    dataStream.addSink(new MyJdbcSinkFunc())

    env.execute("jdbc sink")

  }

  class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{

    // 定义连接、预编译语句
    var conn: Connection = _
    var insertStmt: PreparedStatement = _
    var updateStmt: PreparedStatement = _


    override def open(parameters: Configuration): Unit = {
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "1234qwer")
      insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?,?)")
      updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
    }

    override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {

      // 先执行更新操作,查到就更新
      updateStmt.setDouble(1, value.temperature)
      updateStmt.setString(2, value.id)
      updateStmt.execute()

      //如果更新没有查到数据,那么就插入
      if(updateStmt.getUpdateCount == 0){
        insertStmt.setString(1, value.id)
        insertStmt.setDouble(2, value.temperature)
        insertStmt.execute()
      }
    }

    override def close(): Unit = {
      insertStmt.close()
      updateStmt.close()
      conn.close()
    }
  }

}

 

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zhen.flink</groupId>
    <artifactId>flink_learn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>flink_learn Maven</name>


    <properties>
        <scala_version>2.12</scala_version>
        <flink_version>1.13.1</flink_version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
        </dependency>

    </dependencies>

    <build>
        <plugins> <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution> <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

 

 

     

标签:flink,val,api,Flink,Redis,JDBC,org,apache,import
来源: https://www.cnblogs.com/EnzoDin/p/16434645.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有