ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Apollo源码-配置发布通知客户端

2020-12-02 18:31:00  阅读:273  来源: 互联网

标签:namespace results content 源码 logger Apollo message 客户端


前言

在更新Spring Cloud Alibaba Nacos时,想到之前阅读过Apollo的源码,便在这插入记录了过来,后续更新Nacos Config源码

Apollo简介

fork 源码地址 apollo源码
参考apollo架构中心设计
主要分为 Config ServiceAdmin ServicePortalClient 四部分
上文介绍到ReleaseMessage 对象的发布,portal发布配置第一件事新增 Release 对象,第二件事发布ReleaseMessage 紧接着第三件事便是本文要讲的 ConfigPublishEvent 事件

新建ConfigPublishEvent事件

@EventListener
public void onConfigPublish(ConfigPublishEvent event) {
executorService.submit(new ConfigPublishNotifyTask(event.getConfigPublishInfo()));
}


private class ConfigPublishNotifyTask implements Runnable {

private ConfigPublishEvent.ConfigPublishInfo publishInfo;

ConfigPublishNotifyTask(ConfigPublishEvent.ConfigPublishInfo publishInfo) {
this.publishInfo = publishInfo;
}

@Override
public void run() {
ReleaseHistoryBO releaseHistory = getReleaseHistory();
if (releaseHistory == null) {
 Tracer.logError("Load release history failed", null);
 return;
}

sendPublishEmail(releaseHistory);

sendPublishMsg(releaseHistory);
}

跟踪代码也没发现和configservice的交互,事件监听者无非是创建一个线程池,执行线程任务,任务为发送release对象邮件,和调用远端hermes的一个接口。

配置发布,通知客户端

阅读官方文档 : 服务端设计文档中详细介绍了configservice是如何拉取ReleaseMessage的

实现方式如下:
1.Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace,参见DatabaseMessageSender

2.Config Service有一个线程会每秒扫描一次ReleaseMessage表,看看是否有新的消息记录,参见ReleaseMessageScanner
3.Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如NotificationControllerV2,消息监听器的注册过程参见ConfigServiceAutoConfiguration
4.NotificationControllerV2得到配置发布的AppId+Cluster+Namespace后,会通知对应的客户端

如图 :
在这里插入图片描述
那么我们跟着官方文档,来认识一下 ReleaseMessageScanner 这个类,该类实现了 InitializingBean 接口,会在容器启动,bean初始化后调用 afterPropertiesSet 方法

@Override
public void afterPropertiesSet() throws Exception {
//默认扫描间隔为1s
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
//查询最大的一条Release记录
maxIdScanned = loadLargestMessageId();
//延迟1s后执行定时任务,受任务影响,需要等任务完成之后才开始计时
executorService.scheduleWithFixedDelay((Runnable) () -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
 //扫描消息
 scanMessages();
 transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
 transaction.setStatus(ex);
 logger.error("Scan and send message failed", ex);
} finally {
 transaction.complete();
}
}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

}
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
//有新的message发布,通知configservice
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
//取最后一条数据的id,赋值最大id
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
//是否还有数据
return messageScanned == 500;
}
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
 try {
   //通知所有消息监听器,触发handleMessage方法  
   listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
 } catch (Throwable ex) {
   Tracer.logError(ex);
   logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
 }
}
}
}

fireMessageScanned 会去通知所有的监听者,我们看看 ReleaseMessageListener 这个类图
在这里插入图片描述
监听器的注册过程参见 ConfigServiceAutoConfiguration , handleMessage 方法是得到发布的配置并处理,根据官方文档指示 : Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器 ,客户端具体定位到 NotificationControllerV2#handleMessage

@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
//内容即为 : aapId + cluster + namespace
String content = message.getMessage();
Tracer.logEvent("Apollo.LongPoll.Messages", content);
//如果channel不是apollo-release,则不处理
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
//retrieveNamespaceFromReleaseMessage实现了Function的lambda表达式,apply是通过该表达式返回namespace
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);

if (Strings.isNullOrEmpty(changedNamespace)) {
logger.error("message format invalid - {}", content);
return;
}

if (!deferredResults.containsKey(content)) {
return;
}

//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
//key : appId+cluster+namespace, value: messageId
configNotification.addMessage(content, message.getId());

//do async notification if too many clients
//客户端长轮询连接数 > 100
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
 logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
     bizConfig.releaseMessageNotificationBatch());
 for (int i = 0; i < results.size(); i++) {
   //100一个批次,就睡眠100ms
   if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
     try {
       //睡眠100ms
       TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
     } catch (InterruptedException e) {
       //ignore
     }
   }
   logger.debug("Async notify {}", results.get(i));
   //通知客户端,消息为:namespace,messageId
   results.get(i).setResult(configNotification);
 }
});
return;
}

logger.debug("Notify {} clients for key {}", results.size(), content);
//同步通知
for (DeferredResultWrapper result : results) {
result.setResult(configNotification);
}
logger.debug("Notification completed");
}
  • 拉取到消息,通知到所有的ReleaseMessageListener实现类 ReleaseMessageScanner#fireMessageScanned ,会调用 handleMessage 方法。
  • content:appId+cluster+namespace 中取出 namespace
  • 组装 ApolloConfigNotification 对象 messageId、namespaceName、key : appId+cluster+namespace, value: messageId 的map 通知客户端。
  • 客户端使用 DeferredResult 长轮询技术。

标签:namespace,results,content,源码,logger,Apollo,message,客户端
来源: https://blog.csdn.net/m0_37268363/article/details/110493806

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有