ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flume出现The server disconnected before a response was received错误

2022-07-05 15:05:44  阅读:236  来源: 互联网

标签:Flume received java disconnected kafka a1 apache org FutureRecordMetadata


一、错误日志如下:

下午4点43:28.444分    ERROR    KafkaSink    
Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:243)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
下午4点43:28.445分    ERROR    SinkRunner    
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:267)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:243)
    ... 3 more
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
下午4点43:31.657分    INFO    ReliableTaildirEventReader    
Last read was never committed - resetting position

二、错误原因分析

  将Atlas中的元数据导出之后,使用Flume进行采集,因此需要分析文件大小,仔细查找之后发现最大的一个JSON文件有200M,故原因就是采集的JSON文件过大导致

三、解决思路

  1、自定义Flume拦截器,对大文件单独处理,看看能否将JSON文件中的数据拆分开

  2、同时调整如下参数:

#批次处理行数
a1.sources.r1.batchSize = 10000
#interceptor
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.meiyijia.pd.flume.interceptor.BigDataInterceptor$Builder
a1.sources.r1.interceptors.i1.param=parameter
#关闭没有新增内容的文件超时时间(毫秒
a1.sources.r1.idleTimeout = 1000

#最大数据大小
a1.sinks.k1.kafka.producer.max.request.size = 1053741824
#客户端总缓存大小
a1.sinks.k1.kafka.producer.buffer.memory = 15053741824
a1.sinks.k1.kafka.producer.max.block.ms = 30000
a1.sinks.k1.kafka.producer.request.timeout.ms = 10000
a1.sinks.k1.kafka.flumeBatchSize = 10000
a1.sinks.k1.kafka.linger.ms = 1
a1.sinks.k1.kafka.batch.size = 10000
#失败重试次数
a1.sinks.k1.kafka.producer.retries = 3

# channel
a1.channels.c1.type = memory
#channel中最多缓存多少
a1.channels.c1.capacity = 2000000
#channel一次最多吐给sink多少
a1.channels.c1.transactionCapacity = 12000
#event的活跃时间
a1.channels.c1.keep-alive = 3

四、参考文档:

https://www.csdn.net/tags/Ntjacg3sODcwMTUtYmxvZwO0O0OO0O0O.html

https://flume.liyifeng.org/#

标签:Flume,received,java,disconnected,kafka,a1,apache,org,FutureRecordMetadata
来源: https://www.cnblogs.com/qq1035807396/p/16446453.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有