ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Pulsar+SpringCloud 让Pulsar的配置可以热更新的方法

2021-11-19 19:03:46  阅读:219  来源: 互联网

标签:String SpringCloud pulsarProperties 更新 topic msg Pulsar public


上代码, 包括Pulsar的参数类, Pulsar Client, Producer和Consumer

================Pulsar参数类=====================
@Data
@RefreshScope
@Component
@ConfigurationProperties(prefix = "tdmq.pulsar")
public class PulsarProperties {

/**
* 接入地址
*/
private String serviceurl;

/**
* 命名空间tdc
*/
private String tdcNamespace;

/**
* 角色tdc的token
*/
private String tdcToken;

/**
* 集群name
*/
private String cluster;

/**
* topicMap
*/
private Map<String, String> topicMap;

/**
* 订阅
*/
private Map<String, String> subMap;

/**
* 开关 on:Consumer可用 ||||| off:Consumer断路
*/
private String onOff;

}
==================PulsarClient=======================
@Slf4j
@Configuration
@EnableConfigurationProperties(PulsarProperties.class)
public class PulsarConfig {

@Autowired
PulsarProperties pulsarProperties;

@RefreshScope
@Bean
public PulsarClient getPulsarClient() {

try {
return PulsarClient.builder()
.authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))
.serviceUrl(pulsarProperties.getServiceurl())
.build();
} catch (PulsarClientException e) {
log.error("初始化Pulsar Client失败", e);
}

throw new RuntimeException("初始化Pulsar Client失败");
}

}
===========Producer&Consumer&发送消息的工具类=================
@Slf4j
@Component
public class PulsarUtils {

@Autowired
PulsarProperties pulsarProperties;

@Autowired
PulsarClient client;

@Autowired
AuditCommentResultListener<String> auditCommentResultListener;

@Autowired
AuditReplyResultListener<String> auditReplyResultListener;

@Autowired
AuditResourceResultListener<String> auditResourceResultListener;

/**
* 创建一个生产者
*
* @param topic topic name
* @return Producer生产者
*/
public Producer<byte[]> createProducer(String topic) {

try {
return client.newProducer()
.topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
} catch (PulsarClientException e) {
log.error("初始化Pulsar Producer失败", e);
}

throw new RuntimeException("初始化Pulsar Producer失败");
}

/**
* 创建一个消费者
*
* @param topic topic name
* @param subscription sub name
* @param messageListener MessageListener的自定义实现类
* @return Consumer消费者
*/
public Consumer createConsumer(String topic, String subscription,
MessageListener messageListener) {
try {
return client.newConsumer()
.topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
.subscriptionName(subscription)
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.messageListener(messageListener)
.subscribe()
;
} catch (PulsarClientException e) {
log.error("初始化Pulsar Consumer失败", e);
}

throw new RuntimeException("初始化Pulsar Consumer失败");
}

/**
* 异步send一条msg
*
* @param message 消息体
*/
public void sendMessage(String message, Producer<byte[]> producer) {
producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
log.info("消息发送成功, MessageID为{}", msgId);
});
}

/**
* 同步发送一条msg
*
* @param message 消息体
* @param producer 生产者实例
*/
public void sendOnce(String message, Producer<byte[]> producer) throws PulsarClientException {
MessageId send = producer.send(message.getBytes());
log.info("消息成功发送, MessageId {},message {}", send, message);
}

//-----------consumer-----------
@RefreshScope
@Bean(name = "audit-resource-result-topic")
public Consumer getAuditResourceResultTopicConsumer() {
return this.createConsumer(pulsarProperties.getTopicMap().get("audit-resource-result-topic"),
pulsarProperties.getSubMap().get("resource-sub-audit-resource-result"),
auditResourceResultListener);
}

//-----------producer-----------
@RefreshScope
@Bean(name = "resource-publish-topic")
public Producer<byte[]> getResourcePublishTopicProducer() {
return this.createProducer(pulsarProperties.getTopicMap().get("resource-publish-topic"));
}
}
=====================AbstractListener===============================
@Slf4j
@Component
public abstract class AbstractListener<String> implements MessageListener<String> {

@Autowired
PulsarProperties pulsarProperties;

@Override
public void received(Consumer<String> consumer, Message<String> message) {

}

/**
* 判断开关
*
* @return is equals off
*/
public boolean judgeIsOff() {
return pulsarProperties.getOnOff().equals("off");
}
}
=================Listener自定义实现类====================
@Slf4j
@Component
public class AuditCommentResultListener<String> extends AbstractListener<String> {

@Autowired
CommentService commentService;

@Override
public void received(Consumer consumer, Message msg) {
try {
java.lang.String data = new java.lang.String(msg.getData());
log.info("接受到消息, MessageId {} data {}", msg.getMessageId(), data);
// 添加开关
if (super.judgeIsOff()) {
consumer.negativeAcknowledge(msg);
log.error("当前开关为off 拒绝消费消息, MessageId {} data {}", msg.getMessageId(), data);
}
// 处理业务逻辑

consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
log.error("拒绝消费消息, MessageId {} data {}", msg.getMessageId(), new java.lang.String(msg.getData()), e);
}
}
}
=========================================================================


标签:String,SpringCloud,pulsarProperties,更新,topic,msg,Pulsar,public
来源: https://www.cnblogs.com/zhaoyuxuan66/p/15578718.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有