ICode9

精准搜索请尝试: 精确搜索
  • java-Kafka JDBC连接器中的自定义分区分配2019-12-11 02:04:24

    我有一个用例,其中我需要根据消息中的某些关键参数编写自定义逻辑来分配分区.我对此进行了一些研究,发现kafka转换支持重写Transformation接口中的某些方法,但是我无法在git hub或其他地方执行一些示例代码.有人可以共享示例代码或git hub链接在kafka JDBC源连接器中进行自定义分区

  • 如何为IoT中心的每个新生产者添加新的Kafka主题?2019-12-11 00:06:55

    我正在研究Azure云解决方案.我正在使用连接到Kafka的IoT中心来处理来自各种IoT设备的数据. 我面临的是,来自多个设备的所有数据都存储在同一主题中.但是,我想将连接到IoT中心的每个设备的数据处理到Kafka中的特定主题(每个设备都有自己的Kafka主题) Toketi“ Azure IoT中心的Kafka

  • javascript-卡夫卡节点js客户端压缩问题与快照2019-11-21 15:38:10

    我正在使用kafka-node(https://github.com/SOHU-Co/kafka-node)使用者来检索数据.我认为我得到的数据已用SNAPPY压缩.获取数据后如何解压缩数据.我尝试使用node-snappy(https://github.com/kesla/node-snappy)解压缩数据,但没有用. 库中是否有任何选项可将压缩率设置为无? 任何人都

  • java-如何设置TOPOLOGY_MAX_SPOUT_PENDING参数2019-11-19 18:03:37

    在拓扑中,我从Kafka队列中读取触发消息.收到触发消息后,我需要向螺栓发送大约4096条消息.在螺栓中,经过一些处理后,它将发布到另一个Kafka队列(另一个拓扑将在以后使用此队列). 我正在尝试设置TOPOLOGY_MAX_SPOUT_PENDING参数以限制要发送的邮件数量.但我看到它没有任何作用.是否因

  • java-如何在Spark Streaming中映射kafka主题名称和相应记录2019-11-18 16:00:05

    我正在播放来自如下的kafka主题; JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, String.class,

  • java-Spring Kafka在一个使用者中使用多种消息类型2019-11-18 08:10:39

    我有多个生产者,可以将多种类型的事件发送到一个kafka主题. 我有一个必须使用所有类型消息的使用者.对每种消息使用不同的逻辑. @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") public void handleEvent(Message<EventOne> event) {

  • java-Kafka KStream-衡量消费者滞后2019-11-18 07:13:17

    由于我的基于KStream的应用程序未遵循传统的Kafka消费者路线,我应如何跟踪消费者的滞后时间?通常我会使用ConsumerOffsetChecker(或类似的值),但是它需要使用者组名称. 我应该怎么用呢? (我想对此进行跟踪,以便我可以判断是否/何时启动新消费者)解决方法:Kafka Streams内部使用KafkaC

  • python-Spark流.从Kafka并行读取导致重复数据2019-11-12 02:57:27

    我使用以下代码创建了6个输入DStream,这些DStream使用直接方法从Kafka的6个分区主题中读取,我发现即使为流指定相同的组ID,我也会重复获取数据6次.如果仅创建3个DStream,我将数据重复3次,依此类推. numStreams = 6 kafkaStreams = [KafkaUtils.createDirectStream(ssc, ["send6par

  • 在Java中通过Spark DataFrame进行迭代而无需收集2019-11-11 16:10:39

    我正在使用Spark 1.6.1 我有一个DataFrame,我需要对其进行遍历并将每一行写入Kafka.截至目前,我正在执行以下操作: Producer<String><String> message; for(Row x: my_df.collect()){ kafka_message = new Producer<String><String>(topic, String.valueOf(x)) my_kafka_pr

  • java-异步消息传递和微服务2019-11-09 09:00:14

    我正在计划开发基于微服务的体系结构应用程序,在阅读Ronnie Mitra的《微服务体系结构》一书时,我决定使用kafka进行内部通信.马特·麦克拉蒂;迈克·阿蒙森;伊拉克里·纳达雷什维利说: letting microservices directly interact with message brokers (such as RabbitMQ, etc.) is

  • c#-Kafka Confluent库中轮询和消耗之间的区别2019-11-09 04:10:03

    Confluent Kafka库的github示例page列出了两种方法,即轮询和消耗.两者有什么区别. 我确实在Confluent Kafka库here中查看了Consumer的实现,觉得它们在功能上是相同的,仅在返回值方面有所不同. Poll()调用consumer()来查看是否有准备接收的消息,如果是,则调用OnMessage事件.而使用,

  • java-Spark(Kafka)流内存问题2019-11-09 03:03:52

    我正在测试处理来自Kafka的消息的第一个Spark Streaming流水线.但是,经过几次测试运行后,我收到以下错误消息 没有足够的内存,Java运行时环境无法继续. 我的测试数据确实很小,因此应该不会发生.在研究了该过程之后,我意识到以前提交的Spark作业可能没有被完全删除? 我通常会提交以

  • java-使用Spark Streaming从Kafka读取数据时lz4异常2019-11-08 18:01:00

    我试图使用火花流式API从kafka读取json数据,当我这样做时,它将引发java.lang.NoSuchMethodError:net.jpountz.lz4.LZ4BlockInputStream.init异常.堆栈跟踪为- java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V at org.apache.sp

  • java-应用程序无法启动Spring Boot2019-11-08 16:01:17

    我有一个使用Spring Boot的框架,其中包含控制器RestController类, @RequestMapping("/details") @RestController public class DataController { private KafkaStreams kafkaStreams; public DataController(KafkaStreams kafkaStreams) { this.kafkaStreams =

  • java-Spring Boot Embedded Kafka无法连接2019-11-08 06:04:05

    我正在尝试为我的Kafka使用者编写集成测试.我已经遵循了official reference documentation,但是当我开始测试时,我只会看到这个重复的广告无限: -2019-04-03 15:47:34.002 WARN 13120 — [ main] org.apache.kafka.clients.NetworkClient : [Consumer clientId=con

  • java-在消费者端通过kafka对UUID进行Avro自定义解码2019-10-28 02:01:41

    我已经编写了一个类,用于将UUID类型的对象自定义编码为字节,以跨kafka和avro进行传输. 要使用此类,我在目标对象的uuid变量上方放置了一个@AvroEncode(using = UUIDAsBytesEncoding.class). (这是由Apache Avro反射库实现的) 我很难弄清楚如何让消费者自动使用自定义解码器. (或者

  • java-flink-测量背压2019-10-27 18:02:18

    我使用Flink进行了一些测试,以与其他流媒体平台进行比较.测试的数据源是一个kafka主题,其通信量各不相同,Im试图找出flink是否跟上潮流. 有没有办法知道kafka消费者承受了多少“背压” flink? IE是否跟上步伐?解决方法:Apache Kafka项目提供了一些工具,可以从Zookeeper中获取主题和消

  • java-压缩日志在Kafka中保存多长时间?2019-10-26 16:01:45

    我是Apache Kafka的新手.我一直在阅读有关“压缩”清理策略的信息.我对此特别感兴趣,因为我想在用于同步不同数据存储以实现最终一致性的主题上使用此策略. 我看到有一个delete.retention.ms选项可供我使用.但这仅适用于“删除”墓碑/有效载荷.我了解到,此选项限制了我在无法查看of

  • java-Kafka-如何同时使用filter和filternot?2019-10-26 13:03:06

    我有一个Kafka流,它从一个主题获取数据,并且需要将该信息过滤到两个不同的主题. KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic"); stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.Stri

  • java-在Kafka连接器中设置分区策略2019-10-25 23:01:32

    我正在使用自定义的Kafka连接器(使用Kafka Connect的Java API用Java编写)从外部源提取数据并存储在主题中.我需要设置自定义分区策略.我了解可以通过设置partitioner.class property在Kafka Producer中设置自定义partitioner.但是,此属性对于Kafka连接器似乎没有任何作用.如何配置K

  • 卡夫卡消费者启动延迟汇合的dotnet2019-10-25 13:09:11

    启动融合的dotnet使用者时,在调用订阅和随后的轮询之后,似乎需要很长时间才能从服务器接收到“分配的分区”事件,并因此收到消息(大约10-15秒). 起初我以为会有自动创建主题的开销,但是无论消费者的主题/消费者组是否已经存在,时间都是相同的. 我从此配置开始使用我的使用者,其余代

  • Java-Kafka崩溃时,Kafka使用者挂起轮询2019-10-25 07:04:19

    我一直在研究Zookeeper和Kafka的基本设置,以学习如何使用它,但是我在与消费者打交道时遇到了麻烦.当Kafka不可用时,对poll()方法的调用将挂起,直到它重新联机. 卡夫卡版本:0.10.1.0 我的代码如下所示: KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consum

  • java-如何将两个Kafka流结合在一起,并在具有Avro值的主题中产生结果2019-10-25 07:03:14

    我有两个Kafka Streams,它们具有String键和我使用KSQL创建的Avro格式的值. 这是第一个: DESCRIBE EXTENDED STREAM_1; Type : STREAM Key field : IDUSER Timestamp field : Not set - using <ROWTIME> Key format : STRING Value form

  • java-如何将Apache Kafka与Amazon S3连接?2019-10-25 01:01:59

    我想使用Kafka Connect将数据从Kafka存储到存储桶s3中.我已经在运行一个Kafka的主题,并且创建了一个s3存储桶.我的主题包含有关Protobuffer的数据,我尝试使用https://github.com/qubole/streamx并获得了下一个错误: [2018-10-04 13:35:46,512] INFO Revoking previously assigned

  • 我可以从Python调用Bluemix消息中心服务吗?2019-10-12 19:55:53

    kafka-python客户端支持Kafka 0.9,但显然不包括新的身份验证和加密功能,因此我猜测它只能与开放式服务器一起使用(与以前的版本一样).无论如何,甚至Java客户端都需要一个特殊的消息中心登录模块来连接(或者从示例中可以看出),这表明除非有类似的模块可用于Python,否则任何东西都将

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有