ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Dubbo服务消费者Consumer启动流程(五)

2021-12-28 22:36:38  阅读:169  来源: 互联网

标签:Dubbo map URL 流程 url invoker interfaceClass Consumer Constants


在看完上篇的服务提供者启动流程之后,再来看消费者的启动流程就简单很多了,其大体的设计流程是差不多的。服务消费者的启动主要调用ReferenceConfig#get(), get方法跟服务提供者的export方法类似,主要关注checkAndUpdateSubConfigs和init方法。

 

ReferenceConfig#checkAndUpdateSubConfigs

这个checkAndUpdateSubConfigs方法这边不详细深入,也不是特别复杂,读者可以自己看每个步骤细节,这边只是大体上讲解流程。

public void checkAndUpdateSubConfigs() {
        if (StringUtils.isEmpty(interfaceName)) {
            throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
        }
        // 配置优先级 consumer > module > application
        completeCompoundConfigs();
        // 将外部配置中心的配置刷新到本地配置
        startConfigCenter();
        // get consumer's global configuration
        // 设置comsumer配置
        checkDefault();
        this.refresh();
        if (getGeneric() == null && getConsumer() != null) {
            setGeneric(getConsumer().getGeneric());
        }
        if (ProtocolUtils.isGeneric(getGeneric())) {
            interfaceClass = GenericService.class;
        } else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            // 检查<dubbo:method> 配置的方法是否包含在interfaceClass的方法中
            checkInterfaceAndMethods(interfaceClass, methods);
        }
        // 处理dubbo服务消费端resolve机制,也就是说消息消费者只连服务提供者,绕过注册中心。
        // 这个方法主要是获取直连服务的配置信息
        resolveFile();
        // 校验ReferenceBean的application是否为空,如果为空,new 一个application
        checkApplication();
        checkMetadataReport();
    }

接下来就是int方法了,初始化ref,即获取到的实际上是一个由proxyFactory来代理invoker对象,通过其Proxy  InvokerInvocationHandler的invoke方法,即调用invoker的invoker方法返回的对象。整个对象类型就是远程服务接口类型。

 

ReferenceConfig#init

    private void init() {
        if (initialized) {
            return;
        }
        initialized = true;
        //  校验stub、mock实现类与interface的兼容性
        checkStubAndLocal(interfaceClass);
        checkMock(interfaceClass);
        Map<String, String> map = new HashMap<String, String>();

        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        /**
         * 构建Map,封装服务消费者引用服务提供者URL的属性,这里主要填充side:consume(消费端)、dubbo:2.0.0(版本)、timestamp、pid:进程ID。
         */
        appendRuntimeParameters(map);
        // 如果不是泛化引用,增加methods:interface的所有方法名,多个用逗号隔开。
        if (!isGeneric()) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }
            // 使用Wrapper 目的是为了缓存interfaceClass的类元数据
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        // 用Map存储application配置、module配置、默认消费者参数(ConsumerConfig)、服务消费者dubbo:reference的属性。
        map.put(Constants.INTERFACE_KEY, interfaceName);
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, consumer, Constants.DEFAULT_KEY);
        appendParameters(map, this);
        Map<String, Object> attributes = null;
        /**
         * 获取服务键值 {group}/interface:版本,如果group为空,则为interface:版本,其值存为prifex,
         * 然后将dubbo:method的属性名称也填入map中,键前缀为dubbo.method.methodname.属性名。
         * dubbo:method的子标签dubbo:argument标签的属性也追加到attributes map中,键为 prifex + methodname.属性名。
         */
        if (CollectionUtils.isNotEmpty(methods)) {
            attributes = new HashMap<String, Object>();
            for (MethodConfig methodConfig : methods) {
                appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
            }
        }
         //填充register.ip属性,该属性是消息消费者连接注册中心的IP,并不是注册中心自身的IP。
        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

        ref = createProxy(map);
        // 获取服务键值 {group}/interface:版本
        String serviceKey = URL.buildKey(interfaceName, group, version);
        // 将消息消费者缓存在ApplicationModel中。
        ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
    }

init的作用还是根据配置构建消费者map中的查询参数URL,关键在粗体部分,ref = createProxy(map);本章重点也是在createProxy方法上详解。

 

ReferenceConfig#createProxy

createProxy 首先判断是否是injvm协议的本地引用,还是revoke机制的直连模式,还是普通消费者,从注册中心获取订阅服务,目的就是为了初始化引用服务的urls,其中url的subscribe设置为true。然后根据url来获取对应协议的invoker,最后根据invoker来代理远程服务。

判断是否是injvm服务?通过shouldJvmRefer方法,第一是根据配置,如果没有配置,发现是直连服务,那么久不是injvm。或者如果scope=local或injvm=true,那么isJvmRefer=true

如何判断是否是直连模式呢?根据前面的resolveFile,如果是直连,那么就会初始化这个点对点的url。

剩下的就是普通消费者模式了,依赖注册中心。

    private T createProxy(Map<String, String> map) {
        // 判断该消费者是否是引用本(JVM)内提供的服务。
        if (shouldJvmRefer(map)) {
            /**
             * 如果消费者引用本地JVM中的服务,则利用InjvmProtocol创建Invoker,dubbo中的invoker主要负责服务调用的功能
             * injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?application=dubbo-demo-api-consumer
             * &default.lazy=false&default.sticky=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello
             * &pid=36176&register.ip=192.168.0.102&release=&side=consumer&sticky=false&timestamp=1640070027986
             */
            URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {  // 处于直连模式
            /**
             * 对直连URL进行分割,多个直连URL用分号隔开,如果URL中不包含path属性,则为URL设置path属性为interfaceName。
             * 如果直连提供者的协议为registry,则对url增加refer属性,其值为消息消费者所有的属性。(表示从注册中心发现服务提供者)
             * 如果是其他协议提供者,则合并服务提供者与消息消费者的属性,并移除服务提供者默认属性。以default开头的属性。
             */
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration  普通消息消费者,从注册中心订阅服务
                /**
                 * :获取所有注册中心URL,其中参数false表示消费端,需要排除dubbo:registry subscribe=false的注册中心,其值为false表示不接受订阅。
                 * 根据注册中心URL,构建监控中心URL。
                 * 如果监控中心不为空,在注册中心URL后增加属性monitor。
                 * 在注册中心URL中,追加属性refer,其值为消费端的所有配置组成的URL。
                 */
                checkRegistry();
                List<URL> us = loadRegistries(false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                // 根据URL获取对应协议的Invoker。
                //如果只有一个服务提供者URL,则直接根据协议构建Invoker,具体有如下协议:
                // Protocal$Adaptive.refer  ->  RegisterProtocal.refer
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use RegistryAwareCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
                    // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
        // 如果dubbo:referecnce的check=true或默认为空,则需要判断服务提供者是否存在。
        if (shouldCheck() && !invoker.isAvailable()) {
            // make it possible for consumer to retry later if provider is temporarily unavailable
            initialized = false;
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        MetadataReportService metadataReportService = null;
        if ((metadataReportService = getMetadataReportService()) != null) {
            URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
            metadataReportService.publishConsumer(consumerURL);
        }
        // create service proxy
        /**
         * 根据invoker获取代理类,其实现逻辑如下:
         * 从消费者URL中获取interfaces的值,用,分隔出单个服务应用接口。
         * 增加默认接口EchoService接口。
         * 根据需要实现的接口,使用jdk或Javassist创建代理类。
         */
        return (T) proxyFactory.getProxy(invoker);
    }

 这里面的checkRegister跟服务者类似。  这里边就不展开了。urls都是register://开头,其中参数register对应具体的注册类型,比如向zookeeper或者redis等。

根据urls的size,我们就可以初始化invoker了

invoker = refprotocol.refer(interfaceClass, url);

如果多个url,则会产生多个invoker,最后通过cluser的join来返回一个invoker对象    invoker = cluster.join(new StaticDirectory(invokers));

最后通过Javassist来代理。具体细节将会在下一章消费者调用流程中介绍。

 

标签:Dubbo,map,URL,流程,url,invoker,interfaceClass,Consumer,Constants
来源: https://www.cnblogs.com/gaojy/p/15709137.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有