标签:namespace results content 源码 logger Apollo message 客户端
前言
在更新Spring Cloud Alibaba Nacos时,想到之前阅读过Apollo的源码,便在这插入记录了过来,后续更新Nacos Config源码
Apollo简介
fork 源码地址 apollo源码
参考apollo架构中心设计
主要分为 Config Service、Admin Service、Portal、Client 四部分
上文介绍到ReleaseMessage
对象的发布,portal发布配置第一件事新增 Release
对象,第二件事发布ReleaseMessage
紧接着第三件事便是本文要讲的 ConfigPublishEvent
事件
新建ConfigPublishEvent事件
@EventListener
public void onConfigPublish(ConfigPublishEvent event) {
executorService.submit(new ConfigPublishNotifyTask(event.getConfigPublishInfo()));
}
private class ConfigPublishNotifyTask implements Runnable {
private ConfigPublishEvent.ConfigPublishInfo publishInfo;
ConfigPublishNotifyTask(ConfigPublishEvent.ConfigPublishInfo publishInfo) {
this.publishInfo = publishInfo;
}
@Override
public void run() {
ReleaseHistoryBO releaseHistory = getReleaseHistory();
if (releaseHistory == null) {
Tracer.logError("Load release history failed", null);
return;
}
sendPublishEmail(releaseHistory);
sendPublishMsg(releaseHistory);
}
跟踪代码也没发现和configservice的交互,事件监听者无非是创建一个线程池,执行线程任务,任务为发送release对象邮件,和调用远端hermes的一个接口。
配置发布,通知客户端
阅读官方文档 : 服务端设计文档中详细介绍了configservice是如何拉取ReleaseMessage的
实现方式如下:
1.Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace,参见DatabaseMessageSender2.Config Service有一个线程会每秒扫描一次ReleaseMessage表,看看是否有新的消息记录,参见ReleaseMessageScanner
3.Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如NotificationControllerV2,消息监听器的注册过程参见ConfigServiceAutoConfiguration
4.NotificationControllerV2得到配置发布的AppId+Cluster+Namespace后,会通知对应的客户端
如图 :
那么我们跟着官方文档,来认识一下 ReleaseMessageScanner
这个类,该类实现了 InitializingBean
接口,会在容器启动,bean初始化后调用 afterPropertiesSet
方法
@Override
public void afterPropertiesSet() throws Exception {
//默认扫描间隔为1s
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
//查询最大的一条Release记录
maxIdScanned = loadLargestMessageId();
//延迟1s后执行定时任务,受任务影响,需要等任务完成之后才开始计时
executorService.scheduleWithFixedDelay((Runnable) () -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
//扫描消息
scanMessages();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Scan and send message failed", ex);
} finally {
transaction.complete();
}
}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
}
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
//有新的message发布,通知configservice
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
//取最后一条数据的id,赋值最大id
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
//是否还有数据
return messageScanned == 500;
}
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
try {
//通知所有消息监听器,触发handleMessage方法
listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
}
}
}
}
在 fireMessageScanned
会去通知所有的监听者,我们看看 ReleaseMessageListener
这个类图
监听器的注册过程参见 ConfigServiceAutoConfiguration
, handleMessage
方法是得到发布的配置并处理,根据官方文档指示 : Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器
,客户端具体定位到 NotificationControllerV2#handleMessage
@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
//内容即为 : aapId + cluster + namespace
String content = message.getMessage();
Tracer.logEvent("Apollo.LongPoll.Messages", content);
//如果channel不是apollo-release,则不处理
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
//retrieveNamespaceFromReleaseMessage实现了Function的lambda表达式,apply是通过该表达式返回namespace
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
if (Strings.isNullOrEmpty(changedNamespace)) {
logger.error("message format invalid - {}", content);
return;
}
if (!deferredResults.containsKey(content)) {
return;
}
//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
//key : appId+cluster+namespace, value: messageId
configNotification.addMessage(content, message.getId());
//do async notification if too many clients
//客户端长轮询连接数 > 100
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
bizConfig.releaseMessageNotificationBatch());
for (int i = 0; i < results.size(); i++) {
//100一个批次,就睡眠100ms
if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
try {
//睡眠100ms
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
} catch (InterruptedException e) {
//ignore
}
}
logger.debug("Async notify {}", results.get(i));
//通知客户端,消息为:namespace,messageId
results.get(i).setResult(configNotification);
}
});
return;
}
logger.debug("Notify {} clients for key {}", results.size(), content);
//同步通知
for (DeferredResultWrapper result : results) {
result.setResult(configNotification);
}
logger.debug("Notification completed");
}
- 拉取到消息,通知到所有的ReleaseMessageListener实现类
ReleaseMessageScanner#fireMessageScanned
,会调用handleMessage
方法。 - 从
content:appId+cluster+namespace
中取出namespace
。 - 组装
ApolloConfigNotification
对象messageId、namespaceName、key : appId+cluster+namespace, value: messageId 的map
通知客户端。 - 客户端使用
DeferredResult
长轮询技术。
标签:namespace,results,content,源码,logger,Apollo,message,客户端 来源: https://blog.csdn.net/m0_37268363/article/details/110493806
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。