ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java-如何将两个Kafka流结合在一起,并在具有Avro值的主题中产生结果

2019-10-25 07:03:14  阅读:216  来源: 互联网

标签:apache-kafka-streams apache-kafka ksql java


我有两个Kafka Streams,它们具有String键和我使用KSQL创建的Avro格式的值.

这是第一个:

DESCRIBE EXTENDED STREAM_1; 
Type                 : STREAM
Key field            : IDUSER
Timestamp field      : Not set - using <ROWTIME>
Key format           : STRING
Value format         : AVRO
Kafka output topic   : STREAM_1 (partitions: 4, replication: 1)

 Field                      | Type
--------------------------------------------------------
 ROWTIME                    | BIGINT           (system)
 ROWKEY                     | VARCHAR(STRING)  (system)
 FIRSTNAME                  | VARCHAR(STRING)
 LASTNAME                   | VARCHAR(STRING)
 IDUSER                     | VARCHAR(STRING)

第二个:

DESCRIBE EXTENDED STREAM_2;
Type                 : STREAM
Key field            : IDUSER
Timestamp field      : Not set - using <ROWTIME>
Key format           : STRING
Value format         : AVRO
Kafka output topic   : STREAM_2 (partitions: 4, replication: 1)

 Field                      | Type
--------------------------------------------------------
 ROWTIME                    | BIGINT           (system)
 ROWKEY                     | VARCHAR(STRING)  (system)
 USERNAME                   | VARCHAR(STRING)
 IDUSER                     | VARCHAR(STRING)
 DEVICE                     | VARCHAR(STRING)

所需的输出应包括IDUSER,LASTNAME,DEVICE和USERNAME.

我想使用Streams API离开(在IDUSER上)加入这些流,并将输出写入kafka主题.

为此,我尝试了以下方法:

public static void main(String[] args) {

    final Properties streamsConfiguration = new Properties();

    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-strteams");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    final Serde<String> stringSerde = Serdes.String();
    final Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();


    boolean isKeySerde = false;
    genericAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), isKeySerde);

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, GenericRecord> left = builder.stream("STREAM_1");
    KStream<String, GenericRecord> right = builder.stram("STREAM_2");

    // Java 8+ example, using lambda expressions
    KStream<String, GenericRecord> joined = left.leftJoin(right,
        (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
        JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
        Joined.with(
          stringSerde, /* key */
          genericAvroSerde,   /* left value */
          genericAvroSerde)  /* right value */
      );
    joined.to(stringSerde, genericAvroSerde, "streams-output-testing");

    KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

然而,

KStream<String, GenericRecord> joined = ...

在我的IDE上抛出错误:

incompatible types: inference variable VR has incompatible bounds

当我尝试将String Serde用作键和值时,它可以工作,但数据不能从kafka-console-consumer读取.我要做的是以AVRO格式生成数据,以便能够使用kafka-avro-console-consumer读取它们.

解决方法:

我的第一个猜测是您要从join操作返回一个String,而您的代码期望将GenericRecord作为结果:

KStream<String, GenericRecord> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, ...)

请注意,联接的类型为KStream< String,GenericRecord&gt ;,即值的类型为GenericRecord,但是联接输出是通过类型为String的“ left =“ leftValue”,right =“ rightValue计算的.

标签:apache-kafka-streams,apache-kafka,ksql,java
来源: https://codeday.me/bug/20191025/1926724.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有