ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Kafka-java代码向kafka中输入和消费数据

2022-07-25 22:02:42  阅读:155  来源: 互联网

标签:java String val Kafka apache org kafka properties


Kafka-java

1. 在写代码前需要导入依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

2. 使用java代码从kafka中拿数据

package com.wt.flink.scurce
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._

object Demo5KafkaSource {
  def main(args: Array[String]): Unit = {
    //创建flink的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    /**
     * 构建kafka source
     */

    val source: KafkaSource[String] = KafkaSource
      .builder[String]
      .setBootstrapServers("master:9092,node1:9092,node2:9092") //kafka集群broker列表
      .setTopics("test_topic2")                                 //指定topic
      .setGroupId("my_group")                                   //指定消费组,一条数据指能在一个组内只能被消费一次
      .setStartingOffsets(OffsetsInitializer.earliest())        //读取数据的位置,earliest:读取所有的数据,latest:读取最新的数据
      .setValueOnlyDeserializer(new SimpleStringSchema())       //反序列的类
      .build()

    //使用kafka source
    val kafkaDS: DataStream[String] = env.fromSource(source,WatermarkStrategy.noWatermarks(),"kafka Source")

    kafkaDS.print()

    env.execute()
  }
}

3. 用java代码向kafka中打入数据

package com.wt.flink.kafka

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import java.util.Properties

object Demo1KafkaProducer {
  def main(args: Array[String]): Unit = {
    /**
     * 1. 创建生产者
     *
     */
    val properties = new Properties()

    //指定kafka broker的地址
    properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092")

    //设置key 和 value的序列化的类
    properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")


    val producer = new KafkaProducer[String, String](properties)

    val record = new ProducerRecord[String, String]("test_topic2", "woaini,zhongguo")

    //发送数据到kafka中
    producer.send(record)
    producer.flush()

    //关闭连接
    producer.close()
  }
}

4. 向kafka中批量打入学生数据

package com.wt.flink.kafka
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.util.Properties
import scala.io.Source

object Demo2StudentToKafka {
  def main(args: Array[String]): Unit = {
    /**
     * 创建生产者
     *
      */
    val properties = new Properties()

    //指定kafka broker 的地址
    //指定kafka broker地址
    properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092")

    //设置key 和value的序列化类
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](properties)

    /**
     * 将学生表数据批量写到kafka中
     *
     */
    val studentList: List[String] = Source.fromFile("data/students.txt").getLines().toList

    //发送数据到kafka中
    for (student <- studentList) {
      val record = new ProducerRecord[String, String]("student", student)

      producer.send(record)
      producer.flush()
    }
      producer.close()
  }
}

5. 在kafka中批量拿数据

package com.wt.flink.kafka
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}

import java.time.Duration
import java.util.Properties
import java.{lang, util}

object Demo3KafkaConsumer {
  def main(args: Array[String]): Unit = {
    /**
     * 1. 创建消费者
     *
      */
    val properties = new Properties()

    properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092")

    //key 和value 反序列化的类
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    /**
     * earliest
     * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
     * latest  默认
     * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据
     * none
     * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
     *
     */

    properties.setProperty("auto.offset.reset","earliest")

    //消费者组
    properties.setProperty("group.id","suibian_mingzi")

    val consumer = new KafkaConsumer[String, String](properties)

    /**
     * 2. 订阅一个 topic, 可以一次定义多个topic
     *
     */
    val topics = new util.ArrayList[String]()
    topics.add("student")
    consumer.subscribe(topics)

    while (true) {
      println("正在消费")

      /**
       * 消费数据,这需要设置一个超时时间
       *
       */
      val consumerRecords: ConsumerRecords[String, String] = consumer
        .poll(Duration.ofSeconds(2))

      //解析数据
      val records: lang.Iterable[ConsumerRecord[String, String]] = consumerRecords.records("student")

      val iterRecord: util.Iterator[ConsumerRecord[String, String]] = records.iterator()

      while (iterRecord.hasNext) {
        //获取一行数据
        val record: ConsumerRecord[String, String] = iterRecord.next()

        val topic: String = record.topic() //topic
        val offset: Long = record.offset() //数据偏移量
        val key: String = record.key() //数据的key,默认情况下没有指定的的话为null
        val value: String = record.value() //保存数据
        val ts: Long = record.timestamp() //时间戳,默认存入的时间

        println(s"$topic\t$offset\t$key\t$value\t$ts")

      }
    }
    //关闭连接
    consumer.close()
  }
}

标签:java,String,val,Kafka,apache,org,kafka,properties
来源: https://www.cnblogs.com/atao-BigData/p/16518991.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有