ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【kafka】生产者和消费者代码

2022-05-24 13:32:55  阅读:181  来源: 互联网

标签:Console string producer 生产者 代码 server new kafka consumer


 

Producer

static void Main(string[] args)
        {
            Console.WriteLine("请输入消息内容");
            using (var producer = new KafkaProducer())
            {
                while (true)
                {
                    string message = Console.ReadLine();
                    try
                    {
                        //topic名称是test
                        var result = producer.ProduceAsync("test",
                        new Confluent.Kafka.Message<string, string>() { Key = Guid.NewGuid().ToString(), Value = message })
                            .GetAwaiter().GetResult();
                        Console.WriteLine($"offset:{result.Offset.Value},partition:{result.Partition.Value}");
                    }
                    catch (ProduceException<string, string> e)
                    {
                        Console.WriteLine($"失败的消息: {e.Message} [{e.Error.Code}]");
                        continue;
                    }

                }
            }
        }
        class KafkaProducer : IDisposable
        {
            private ProducerConfig _config = new ProducerConfig();
            private IProducer<string, string> _producer;
            public KafkaProducer(string server = null)
            {
                if (string.IsNullOrEmpty(server))
                {
                    server = "127.0.0.0.1:9001,127.0.0.0.1:9002,127.0.0.0.1:9003";

                }
                _config.BootstrapServers = server;
                _producer = new ProducerBuilder<string, string>(_config).Build();

            }

            public async Task<DeliveryResult<string, string>> ProduceAsync(string topic, Message<string, string> message)
            {
                return await _producer.ProduceAsync(topic, message);

            }

            public void Dispose()
            {
                _producer?.Dispose();
            }
        }

 Consumer

static void Main(string[] args)
        {
            Console.WriteLine("默认只关注test主题的消息)");
            using (var consumer = new KafkaConsumer())
            {
                while (true)
                {
                    consumer.Consume(a =>
                    {
                        if (a == null)
                        {
                            Console.WriteLine("暂无消息");
                        }
                        else
                        {
                            Console.WriteLine($"Key:{a.Message.Key},Value:{a.Message.Value}");
                        }
                    });
                }
            }
        }

        class KafkaConsumer : IDisposable
        {
            private IConsumer<string, string> _consumer;
            public KafkaConsumer(string server = null)
            {
                if (string.IsNullOrEmpty(server))
                {
                   server = "127.0.0.0.1:9001,127.0.0.0.1:9002,127.0.0.0.1:9003";
                }
                var config = new ConsumerConfig
                {
                    GroupId = "TestGroupone",
                    BootstrapServers = server,
                    AutoOffsetReset = AutoOffsetReset.Earliest
                };
                _consumer = new ConsumerBuilder<string, string>(config).Build();
                //topic名称默认是test
                _consumer.Subscribe("test");

            }

            public void Consume(Action<ConsumeResult<string, string>> action = null)
            {
                var consumerResult = _consumer.Consume(TimeSpan.FromSeconds(2));
                action?.Invoke(consumerResult);
            }

            public void Dispose()
            {
                _consumer?.Dispose();
            }
        }

 

标签:Console,string,producer,生产者,代码,server,new,kafka,consumer
来源: https://www.cnblogs.com/Ginease/p/16305225.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有