ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Sphu的调用链

2022-08-14 17:33:44  阅读:166  来源: 互联网

标签:count node 调用 Sphu resourceWrapper context entry null


SphU调用流程

流程图

代码跟踪

  1. 开始调用
    在总结资源生成的时候,我们已经列举了不同的资源生成,会调用不同的方法,下面我们使用较为常用的方法SphU.Entry来作为入口分析
    /**
     * 记录统计数据并对给定资源执行规则检查    
     * Record statistics and perform rule checking for the given resource.
     *
     * @param name the unique name of the protected resource
     * @return the {@link Entry} of this invocation (used for mark the invocation complete and get context data)
     * @throws BlockException if the block criteria is met (e.g. metric exceeded the threshold of any rules)
     */
    public static Entry entry(String name) throws BlockException {
        // 重载了各种方法 通过参数来确定不同的资源
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }
------------------------------------------
    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
            // 这里会使用string资源作为我们初始化资源的入参,这里是使用的ResourceWrapper的一个string子类 
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
  1. 调用途中
    沿着调用栈逐渐深入,可以发现无论是从哪个入口,最终都是在 com.alibaba.csp.sentinel.CtSph#entryWithPriority(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, boolean, java.lang.Object...) 中进行处理
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        // 构建context
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            // 超过阈值后 才会执行到这里,是一个空context
            // 所以只初始化, 不做其他规则校验
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            // 兜底
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        // 全局的开关,如果没有打开,不去做规则校验
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
        // 开始获取processSlot  
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * 为空说明 链路超过了常量 6000 条, 就不处理了
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
        // 新建一个当前执行的entry, 每次对资源的操作都会生成,
        // 然后将上一次生成的entry 作为parent
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        // 最终返回的entry 操作实体
        return e;
    }
  1. 获取对应ProcessSlot
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        // 从chainMap 根据当前资源获取slot, 这里也说明了 一个resource 只能对应一个porcessSlot
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                // double check
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    // 超过了上限, 就不弄这个资源的限制了
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
                    //  新弄一个chain
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
  1. 责任链的形式一直去执行chain 里面的entry方法, 我们会在 NodeSelectorSlot 之中更新当前的currNode,并绑定defaultNode
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        /*
        有趣的是,我们使用上下文名称而不是资源名称作为映射键。记住,无论在哪个上下文中,相同的资源({@link ResourceWrapperequals(Object)})将全局共享相同的{@link ProcessorSlotChain}。因此,如果代码进入{@link条目(Context, ResourceWrapper, DefaultNode, int, Object…)},资源名必须相同,但上下文名可以不同。如果我们使用{@link com.alibaba.csp.sentinel。SphUentry(String resource)}在不同的上下文中输入相同的资源,使用上下文名称作为映射键可以区分相同的资源。在这种情况下,将为每个不同的上下文(不同的上下文名称)创建多个具有相同资源名称的{@link DefaultNode}。考虑另一个问题。一个资源可能有多个{@link DefaultNode},那么获得同一资源的统计数据的最快方法是什么?答案是所有具有相同资源名的{@link DefaultNode}共享一个{@link ClusterNode}。详情请参见{@link ClusterBuilderSlot}。
          */
         // 在这里根据上下文去生DefaultNode 
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    // 一个context 生成的 一个defaultNode 而且是根据context的name 来确认唯一性的                  
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    // 调用树,添加子节点
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }
        // 设置为当前Node
        context.setCurNode(node);
        // 执行其他的slotchain
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
  1. 在flowSlot 根据规则拦截,然后在StatisticSlot 统计当前次数数据
//  StatisticSlot 中的数据统计,, 
// 1 当前entry的次数和成功次数+1
// 2. 整个服务(culsterNode) 的次数和成功次数+1
// 3. 成功统计之后的回调处理.
// 如失败,则当前请求次数+1,成功次数不做增减
try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // 阻止访问了的话 不做访问次数+1 
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // 未知异常的跑抛出
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);
            throw e;
        }

标签:count,node,调用,Sphu,resourceWrapper,context,entry,null
来源: https://www.cnblogs.com/wzqshb/p/16585817.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有