ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

librdkafka-example-modeC

2022-03-21 15:33:04  阅读:216  来源: 互联网

标签:topic partition librdkafka kafka rd modeC offset example rk


/*结构体说明  
rd_kafka_toppar_s:topic & partition combination

*/
else if (mode == 'C') { /* * Consumer */ //初始化设置 rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL, 0); /* Create Kafka handle */
//判断是否可以使用当前consumer
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr); exit(1); } //当设置-s:watermark使用 if (get_wmarks) { int64_t lo, hi; /* Only query for hi&lo partition watermarks */ if ((err = rd_kafka_query_watermark_offsets( rk, topic, partition, &lo, &hi, 5000))) { fprintf(stderr, "%% query_watermark_offsets() " "failed: %s\n", rd_kafka_err2str(err)); exit(1); } printf( "%s [%d]: low - high offsets: " "%" PRId64 " - %" PRId64 "\n", topic, partition, lo, hi); rd_kafka_destroy(rk); exit(0); } /* Create topic */ rkt = rd_kafka_topic_new(rk, topic, topic_conf); topic_conf = NULL; /* Now owned by topic */ /* Start consuming */
//判断是否可以消费
/*rd_kafka_consume_start
判断条件:
1.topic是否长度超限 是否为空
2.加线程锁
3.判断offset(最新,最小,上次消费的地方) 判断offset是否合法 */
if (rd_kafka_consume_start(rkt, partition, start_offset) == -1) { err = rd_kafka_last_error(); fprintf(stderr, "%% Failed to start consuming: %s\n", rd_kafka_err2str(err)); if (err == RD_KAFKA_RESP_ERR__INVALID_ARG) fprintf(stderr, "%% Broker based offset storage " "requires a group.id, " "add: -X group.id=yourGroup\n"); exit(1); } /*

*/ while (run) { rd_kafka_message_t *rkmessage; /* Poll for errors, etc. */ rd_kafka_poll(rk, 0); /* Consume single message. * See rdkafka_performance.c for high speed * consuming of messages. */ rkmessage = rd_kafka_consume(rkt, partition, 1000); if (!rkmessage) /* timeout */ continue; msg_consume(rkmessage, NULL); /* Return message to rdkafka */ rd_kafka_message_destroy(rkmessage); if (seek_offset) { err = rd_kafka_seek(rkt, partition, seek_offset, 2000); if (err) printf("Seek failed: %s\n", rd_kafka_err2str(err)); else printf("Seeked to %" PRId64 "\n", seek_offset); seek_offset = 0; } } /* Stop consuming */ rd_kafka_consume_stop(rkt, partition); while (rd_kafka_outq_len(rk) > 0) rd_kafka_poll(rk, 10); /* Destroy topic */ rd_kafka_topic_destroy(rkt); /* Destroy handle */ rd_kafka_destroy(rk); }



标签:topic,partition,librdkafka,kafka,rd,modeC,offset,example,rk
来源: https://www.cnblogs.com/supermanwx/p/16034906.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有