标签:
现状:我的应用如下:跑一个 Java 应用,消费多个 topic 。创建了好几个 consumer 进行消费,每次 consumer.poll 得到的 records 都通过线程池 ThreadPoolExecutor 处理数据。每个 consumer 对应一个 ThreadPoolExecutor
新需求:现需要新增消费一个 topic ,该 topic 的量比存量的 topic 要远远大于。因此需要控制一下该 topic 每个批次消费到的数量。
发现问题:刚开始我以为只需要配置 max.poll.record 就可以控制每个批次的消费速率,但经过测试发现,由于每次消费到的 records 都让线程池去处理了,因此 consumer.poll 一次数据在一个批次内就识别到很快就处理完,然后 consumer 就会在一个批次内尽可能地去 poll 多几次。这样就没法实现每个批次控制了。
请教大家:针对以上情况有优化的可能性吗?需要尽可能精确控制指定 topic 一个批次内只需要消费固定的数据量,我目前发现 sparkStreaming 倒是很好地控制,但是 Java 目前没找到合适的方案来实现控制。
目前是通过 guava 组件的 RateLimter 限流器来实现特定限流。 通过设置 RateLimter.create(1),即 1 秒处理 1 次。 通过这个机制来限流控制特定的 consumer 在进行 poll
if(rateLimter.tryActive()) {
consumer.poll();
for (record:records) {
// 丢到线程池异步业务处理
}
}
2 分钟写了个案例,如果要更精细用 nanoTime
```java
int MAX_CNT_OF_SEC = 100;
int count = 0;
long lastPoll = -1l;
long ONE_SEC = Duration.ofSeconds(1L).toMillis();
while (true) {
long now = System.currentTimeMillis();
long diff = now - lastPoll;
if (diff < ONE_SEC || count >= MAX_CNT_OF_SEC) {
Thread.sleep(ONE_SEC - diff);
}
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
count += records.count();
// TODO
}
```
有没有可能根本不需要限流。
kafka 本来让业务主动去拉取,就是让你在拉取的时候控制速率。
fetch-max-wait: 10000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
fetch-min-size: 10 # poll 一次消息拉取的最小数据量,单位:字节
max-poll-records: 100 # poll 一次消息拉取的最大数量
可以完全通过这三个参数控制你的消费速率,直接同步消费就是最好的选择。你却本末倒置,做异步消费然后再限流。本来配置修改一下就可以的事情,你却写一堆代码,把简单的事情搞复杂。
标签: 来源:
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。