ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

SpringCloud之Ribbon

2021-09-11 11:32:33  阅读:141  来源: 互联网

标签:return SpringCloud server 重试 context null public Ribbon


1、通过RestTemplate开启负载均衡

新增Configuration配置类,创建RestTemplate实例并通过@LoadBalanced开启负载均衡。

@Configuration
public class RestTemplateConfig {
    @Bean
    @LoadBalanced
    protected RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

@LoadBalanced注解的源码如下:

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

从其注释中看到唯一有用的信息就是LoadBalancerClient,下面我么看看LoadBalancerClient的作用:

/**
 * 负载均衡器客户端
 *
 * Represents a client-side load balancer.
 *
 * @author Spencer Gibb
 */
public interface LoadBalancerClient extends ServiceInstanceChooser {

	/**
	 * 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
	 *
	 * Executes request using a ServiceInstance from the LoadBalancer for the specified service.
	 * @param serviceId The service ID to look up the LoadBalancer.
	 * @param request Allows implementations to execute pre and post actions, such as incrementing metrics.
	 * @param <T> type of the response
	 * @throws IOException in case of IO issues.
	 * @return The result of the LoadBalancerRequest callback on the selected ServiceInstance.
	 */
	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

	/**
	 * 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
	 *
	 * Executes request using a ServiceInstance from the LoadBalancer for the specified service.
	 * @param serviceId The service ID to look up the LoadBalancer.
	 * @param serviceInstance The service to execute the request to.
	 * @param request Allows implementations to execute pre and post actions, such as incrementing metrics.
	 * @param <T> type of the response
	 * @throws IOException in case of IO issues.
	 * @return The result of the LoadBalancerRequest callback on the selected ServiceInstance.
	 */
	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

	/**
	 * 重构URI
	 * 创建一个适当的URI,该URI具有供系统使用的真实主机和端口。
	 * 有些系统使用逻辑服务名称作为主机的URI,例如http://myservice/path/to/service。
	 * 重构之后会使用ServiceInstance(服务实例)中的host:port替换服务名。
	 *
	 * Creates a proper URI with a real host and port for systems to utilize.
	 * Some systems use a URI with the logical service name as the host, such as http://myservice/path/to/service.
	 * This will replace the service name with the host:port from the ServiceInstance.
	 * @param instance service instance to reconstruct the URI
	 * @param original A URI with the host as a logical service name.
	 * @return A reconstructed URI.
	 */
	URI reconstructURI(ServiceInstance instance, URI original);

}

LoadBalancerClient接口代表了负载均衡器客户端,并提供了execute、reconstructURI两个方法,其继承了ServiceInstanceChooser接口,该接口的代码如下:

/**
 * 服务实例选择器
 * Implemented by classes which use a load balancer to choose a server to send a request to.
 *
 * @author Ryan Baxter
 */
public interface ServiceInstanceChooser {

	/**
	 * 从LoadBalancer(负载均衡器)中为指定的服务选择一个ServiceInstance(服务实例)
	 * Chooses a ServiceInstance from the LoadBalancer for the specified service.
	 * @param serviceId The service ID to look up the LoadBalancer.
	 * @return A ServiceInstance that matches the serviceId.
	 */
	ServiceInstance choose(String serviceId);

}

ServiceInstanceChooser接口代表了服务器实例选择实例,并提供了一个choose方法来选择具体被调用的服务器实例。

通过这两个接口我们可以看到,LoadBalancerClient已经具有了选择服务器实例、执行、重构URI等功能;接下来我们就去看一下Ribbon的初始化过程。这里涉及到连个重要的配置类RibbonAutoConfiguration和LoadBalancerAutoConfiguration,下面我们针对这两个配置类的初始化做分析。

2、RibbonAutoConfiguration

Ribbon的初始化过程通过RibbonAutoConfiguration类引导完成。

@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
// EurekaClientAutoConfiguration --> EurekaClient自动配置类
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
// LoadBalancerAutoConfiguration --> 负载均衡 自动配置类
// AsyncLoadBalancerAutoConfiguration --> 负载均衡 异步请求 自动配置类
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,	AsyncLoadBalancerAutoConfiguration.class })
// RibbonEagerLoadProperties --> Ribbon热加载属性
// ServerIntrospectorProperties --> 服务器内省属性
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {

	@Autowired(required = false)
	private List<RibbonClientSpecification> configurations = new ArrayList<>();

	// Ribbon热加载属性配置
	@Autowired
	private RibbonEagerLoadProperties ribbonEagerLoadProperties;

	// HasFeatures 特性配置
	@Bean
	public HasFeatures ribbonFeature() {
		return HasFeatures.namedFeature("Ribbon", Ribbon.class);
	}

	// 创建客户端、负载均衡器和客户端配置实例的工厂
	@Bean
	public SpringClientFactory springClientFactory() {
		SpringClientFactory factory = new SpringClientFactory();
		factory.setConfigurations(this.configurations);
		return factory;
	}

	// 负载均衡器客户端
	@Bean
	@ConditionalOnMissingBean(LoadBalancerClient.class)
	public LoadBalancerClient loadBalancerClient() {
		return new RibbonLoadBalancerClient(springClientFactory());
	}

    // 重试
	@Bean
	@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
	@ConditionalOnMissingBean
	public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {
		return new RibbonLoadBalancedRetryFactory(clientFactory);
	}

    // 提供了Ribbon需要的组件类名
	@Bean
	@ConditionalOnMissingBean
	public PropertiesFactory propertiesFactory() {
		return new PropertiesFactory();
	}

	// 开启热加载
	@Bean
	@ConditionalOnProperty("ribbon.eager-load.enabled")
	public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
		return new RibbonApplicationContextInitializer(springClientFactory(),
				ribbonEagerLoadProperties.getClients());
	}

	@Configuration
	@ConditionalOnClass(HttpRequest.class)
	@ConditionalOnRibbonRestClient
	protected static class RibbonClientHttpRequestFactoryConfiguration {

		@Autowired
		private SpringClientFactory springClientFactory;

		@Bean
		public RestTemplateCustomizer restTemplateCustomizer(
				final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
			return restTemplate -> restTemplate.setRequestFactory(ribbonClientHttpRequestFactory);
		}

		@Bean
		public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
			return new RibbonClientHttpRequestFactory(this.springClientFactory);
		}

	}

	// TODO: support for autoconfiguring restemplate to use apache http client or okhttp
	@Target({ ElementType.TYPE, ElementType.METHOD })
	@Retention(RetentionPolicy.RUNTIME)
	@Documented
	@Conditional(OnRibbonRestClientCondition.class)
	@interface ConditionalOnRibbonRestClient {

	}

	private static class OnRibbonRestClientCondition extends AnyNestedCondition {

		OnRibbonRestClientCondition() {
			super(ConfigurationPhase.REGISTER_BEAN);
		}

		@Deprecated // remove in Edgware"
		@ConditionalOnProperty("ribbon.http.client.enabled")
		static class ZuulProperty {

		}

		@ConditionalOnProperty("ribbon.restclient.enabled")
		static class RibbonProperty {

		}

	}

	/**
	 * {@link AllNestedConditions} that checks that either multiple classes are present.
	 */
	static class RibbonClassesConditions extends AllNestedConditions {

		RibbonClassesConditions() {
			super(ConfigurationPhase.PARSE_CONFIGURATION);
		}

		@ConditionalOnClass(IClient.class)
		static class IClientPresent {

		}

		@ConditionalOnClass(RestTemplate.class)
		static class RestTemplatePresent {

		}

		@ConditionalOnClass(AsyncRestTemplate.class)
		static class AsyncRestTemplatePresent {

		}

		@ConditionalOnClass(Ribbon.class)
		static class RibbonPresent {

		}

	}

}

这里初始化的比较重要的组件有:

  • SpringClientFactory:创建客户端、负载均衡器和客户端配置实例的工厂,它为每个客户机名创建一个Spring ApplicationContext,并从中提取所需的bean。
  • LoadBalancerClient:负载均衡器客户端
  • LoadBalancedRetryFactory:SpringCloud重试功能定制工厂类

关于这几个组件具体使用的时间节点,会在后面通过一次负载均衡的Http请求来讲解。

3、LoadBalancerAutoConfiguration

LoadBalancerAutoConfiguration是Ribbon负载均衡自动配置类,其代码如下。

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

	@Autowired(required = false)
	private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

	@Bean
	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
			for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
				for (RestTemplateCustomizer customizer : customizers) {
					customizer.customize(restTemplate);
				}
			}
		});
	}

	// 负载均衡请求工厂类
	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
	}

	// Auto configuration for retry mechanism.
	// 重试机制配置类
	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryAutoConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public LoadBalancedRetryFactory loadBalancedRetryFactory() {
			return new LoadBalancedRetryFactory() {
			};
		}

	}

	// 重试拦截机制配置类
	// Auto configuration for retry intercepting mechanism.
	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryInterceptorAutoConfiguration {

		@Bean
		@ConditionalOnMissingBean
		public RetryLoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRetryProperties properties,
				LoadBalancerRequestFactory requestFactory,
				LoadBalancedRetryFactory loadBalancedRetryFactory) {
			return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
					requestFactory, loadBalancedRetryFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(
						restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

	}

	// 忽略
	@Configuration
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {

		@Bean
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(
						restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

	}

}

这里初始化的比较重要的组件有:

  • LoadBalancerRequestFactory:为LoadBalancerInterceptor和RetryLoadBalancerInterceptor创建LoadBalancerRequest,并将LoadBalancerRequestTransformer应用于拦截的HttpRequest
  • LoadBalancedRetryFactory:如果容器中没有LoadBalancedRetryFactory实例的话,这里会创建一个空的LoadBalancedRetryFactory实例
  • RestTemplateCustomizer:定制RestTemplate,加入RetryLoadBalancerInterceptor拦截器,通过RetryLoadBalancerInterceptor拦截Http请求,并调用choose方法选择服务器,实现负载均衡

关于这几个组件具体使用的时间节点,下面我们通过一次负载均衡的Http请求来讲解。

4、负载均衡的Http请求调用过程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-w8Mrm0kB-1631330467682)(media/15604189016881/15638776502118.jpg)]

访问UserController的一个get方法,其调用时序图如上。其中关于RestTemplate的调用过程我们不多赘述,其最终会调用到RetryLoadBalancerInterceptor的intercept方法。接下来我们从这里开始分析:

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
        final ClientHttpRequestExecution execution) throws IOException {
    // 获取URL和serviceName
    final URI originalUri = request.getURI();
    final String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    // 获取负载均衡重试策略并创建重试模板
    final LoadBalancedRetryPolicy retryPolicy = this.lbRetryFactory.createRetryPolicy(serviceName,this.loadBalancer);
    // 创建重试策略模板,并为模板设置重试策略;
    // 未开启重试机制使用NeverRetryPolicy策略;
    // 若开启重试机制使用InterceptorRetryPolicy策略
    RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
    // 执行请求
    return template.execute(context -> {
        ServiceInstance serviceInstance = null;
        // 获取serviceInstance
        if (context instanceof LoadBalancedRetryContext) {
            LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
            serviceInstance = lbContext.getServiceInstance();
        }
        // 若未能获取到serviceInstance,则通过loadBalancer选择一个服务器
        if (serviceInstance == null) {
            serviceInstance = this.loadBalancer.choose(serviceName);
        }
        // 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
        ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer
                .execute(serviceName, serviceInstance,this.requestFactory.createRequest(request, body, execution));
        int statusCode = response.getRawStatusCode();
        if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
            byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
            response.close();
            throw new ClientHttpResponseStatusCodeException(serviceName, response,bodyCopy);
        }
        return response;
    }, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
        // This is a special case, where both parameters to
        // LoadBalancedRecoveryCallback are
        // the same. In most cases they would be different.
        @Override
        protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
            return response;
        }
    });
}

这里执行的较为重要的步骤有:

  • 获取负载均衡重试策略并创建重试模板
  • 创建重试策略模板,并为模板设置重试策略
  • 执行请求
4.1 获取负载均衡重试策略并创建重试模板
@Override
public LoadBalancedRetryPolicy createRetryPolicy(String service,ServiceInstanceChooser serviceInstanceChooser) {
	RibbonLoadBalancerContext lbContext = this.clientFactory.getLoadBalancerContext(service);
	return new RibbonLoadBalancedRetryPolicy(service, lbContext,
			serviceInstanceChooser, clientFactory.getClientConfig(service));
}

// 构造函数
public RibbonLoadBalancedRetryPolicy(String serviceId,RibbonLoadBalancerContext context,
		ServiceInstanceChooser loadBalanceChooser,IClientConfig clientConfig) {
	this.serviceId = serviceId;
	this.lbContext = context;
	this.loadBalanceChooser = loadBalanceChooser;
	// 获取并设置重试的http状态码
	String retryableStatusCodesProp = clientConfig.getPropertyAsString(RETRYABLE_STATUS_CODES, "");
	String[] retryableStatusCodesArray = retryableStatusCodesProp.split(",");
	for (String code : retryableStatusCodesArray) {
		if (!StringUtils.isEmpty(code)) {
			try {
				retryableStatusCodes.add(Integer.valueOf(code.trim()));
			}
			catch (NumberFormatException e) {
				log.warn("We cant add the status code because the code [ " + code
						+ " ] could not be converted to an integer. ", e);
			}
		}
	}
}
4.2 创建重试策略模板,并为模板设置重试策略
private RetryTemplate createRetryTemplate(String serviceName,HttpRequest request,LoadBalancedRetryPolicy retryPolicy){
	RetryTemplate template = new RetryTemplate();
	// 创建并设置BackOffPolicy
	BackOffPolicy backOffPolicy = this.lbRetryFactory.createBackOffPolicy(serviceName);
	template.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
	template.setThrowLastExceptionOnExhausted(true);
	// 设置RetryListener
	RetryListener[] retryListeners = this.lbRetryFactory.createRetryListeners(serviceName);
	if (retryListeners != null && retryListeners.length != 0) {
		template.setListeners(retryListeners);
	}
	// 重点:
    // 如果配置了spring.cloud.loadbalancer.retry=false;或者retryPolicy==null,则使用NeverRetryPolicy
    // 否则使用InterceptorRetryPolicy
	template.setRetryPolicy(!this.lbProperties.isEnabled() || retryPolicy == null
			? new NeverRetryPolicy():new InterceptorRetryPolicy(request,retryPolicy,this.loadBalancer,serviceName));
	return template;
}

该方法中只要关心NeverRetryPolicy和InterceptorRetryPolicy即可,其调用方式我们在后面继续分析。

4.3 执行请求

前面提到了NeverRetryPolicy和InterceptorRetryPolicy,那么下面我们针对两种不同的策略,来看其具体的调用过程:首先在配置文件中禁用重试(此属性默认开启)

#允许重试
spring.cloud.loadbalancer.retry.enabled=false

调用RetryTemplate的execute方法,在execute方法中调用doExecute,在doExecute方法中有:

while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
    // 省略部分代码
}

这里canRetry会调用当前使用的重试策略中的方法,因为我们未开启请求重试,则使用的是NeverRetryPolicy,这里要注意:

public boolean canRetry(RetryContext context) {
	return !((NeverRetryContext) context).isFinished();
}

该方法会在在第一个异常之后返回false。其默认总是有一个尝试,然后防止重试。所以这里即使配置了禁用重试,这里默认也会返回true。那么如果我们开启了重试策略,则使用InterceptorRetryPolicy,我们来看一下其canRetry方法:

@Override
public boolean canRetry(RetryContext context) {
	LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
	if (lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
		// We haven't even tried to make the request yet so return true so we do
        // 为LoadBalancedRetryContext设置ServiceInstance
		lbContext.setServiceInstance(this.serviceInstanceChooser.choose(this.serviceName));
		return true;
	}
	return this.policy.canRetryNextServer(lbContext);
}

有一点需要注意,这里已经为LoadBalancedRetryContext设置了ServiceInstance,关于其choose方法,我们会在后面讲解。

回到RetryLoadBalancerInterceptor的execute方法,我们看一下开启重试、关闭重试这两种策略是如何获取调用服务器的。

// 获取serviceInstance,如果为禁用重试策略,则这里是true
if (context instanceof LoadBalancedRetryContext) {
    LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
    serviceInstance = lbContext.getServiceInstance();
}
// 若未能获取到serviceInstance,则通过loadBalancer选择一个服务器
if (serviceInstance == null) {
    serviceInstance = this.loadBalancer.choose(serviceName);
}

对于开启重试策略,因为已经在canRetry中设置了serviceInstance,所以直接获取即可;对于关闭重试,则需要通过choose方法来获取serviceInstance,关于choose过程,会在后面的负载均衡策略方式中分析。那有了这些准备工作,接下来就可以正式对serviceInstance发起请求了。其具体的调用依然是使用ClientHttpRequest来进行,不多赘述,感兴趣的同学可以自己查看其代码。

5、Ribbon负载均衡策略

前文分析到了一个choose方法,该方法即负载均衡的入口。

// RibbonLoadBalancerClient类的choose、getServer方法
@Override
public ServiceInstance choose(String serviceId) {
	return choose(serviceId, null);
}

public ServiceInstance choose(String serviceId, Object hint) {
    // 获取被调用的服务器
	Server server = getServer(getLoadBalancer(serviceId), hint);
	// 服务器为空返回null;否则返回RibbonServer
	if (server == null) {
		return null;
	}
	return new RibbonServer(serviceId, server, isSecure(server, serviceId),
			serverIntrospector(serviceId).getMetadata(server));
}

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
	if (loadBalancer == null) {
		return null;
	}
	// Use 'default' on a null hint, or just pass it on?
	return loadBalancer.chooseServer(hint != null ? hint : "default");
}


// BaseLoadBalancer类的chooseServer方法
// 根据指定的key获取一个可用的服务器
public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    // 无可用负载均衡策略,返回null
    if (rule == null) {
        return null;
    } else {
        try {
            // 根据负载均衡策略选择服务器
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

从上面的方法调用过程中可以看到,rule.choose(key)是最终选择服务器的方法。rule是IRule接口的实现类。IRule接口为LoadBalancer定义“规则”的接口。规则可以看作是负载平衡的策略。众所周知的负载平衡策略包括轮询、基于响应时间的策略等。最常见的负载均衡策略有轮询、随机、权重等,接下来分析一下这三种最常见的策略。

5.1 ZoneAvoidanceRule 过滤轮询策略

该类继承了PredicateBasedRule,其属于轮询策略,也是SpringCloud的默认负载均衡策略,该方法会先筛选出所有可用的服务器列表,然后使用轮询算法得到被调用的服务器。

@Override
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
    // 获取被调用的服务器
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

从代码中看,该方法的核心是chooseRoundRobinAfterFiltering,其代码如下:

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    // 获取所有可用的服务器列表
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    // 使用轮询策略从可用服务器列表中选出被调用服务器
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

获取被调用服务器的过程分为三步:

  • 获取所有的服务器列表,包括可用和不可用
  • 过滤并得到可用的候选服务器列表,即从中筛选出可用的服务器列表
  • 使用轮询策略从候选服务器列表中选择一个被调用的服务器
5.1.1 获取候选服务器列表
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    if (loadBalancerKey == null) {
        return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
    } else {
        List<Server> results = Lists.newArrayList();
        for (Server server: servers) {
            if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                results.add(server);
            }
        }
        return results;            
    }
}

该方法的核心是apply方法,最终会调用ZoneAvoidancePredicate类下的apply方法:

public boolean apply(@Nullable PredicateKey input) {
    // 如niws.loadbalancer.zoneAvoidanceRule.enabled=false,
    // 则不开启server zone,无需过滤
    if (!ENABLED.get()) {
        return true;
    }
    // 如server zone名称为null,无需过滤
    String serverZone = input.getServer().getZone();
    if (serverZone == null) {
        // there is no zone information from the server, we do not want to filter out this server
        return true;
    }
    // 如无可用的LoadBalancerStats,无需过滤
    LoadBalancerStats lbStats = getLBStats();
    if (lbStats == null) {
        // no stats available, do not filter
        return true;
    }
    // 如可用的server zone数量<=1,无需过滤
    if (lbStats.getAvailableZones().size() <= 1) {
        // only one zone is available, do not filter
        return true;
    }
    // 如负载均衡中未包含当前获取到的分片zone,无需过滤
    Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
    if (!zoneSnapshot.keySet().contains(serverZone)) {
        // The server zone is unknown to the load balancer, do not filter it out
        return true;
    }
    // 获取可用的server zone
    logger.debug("Zone snapshots: {}", zoneSnapshot);
    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot,
            triggeringLoad.get(),
            triggeringBlackoutPercentage.get());
    logger.debug("Available zones: {}", availableZones);
    // 再次判断获取到的可用server zone中是否包含要过滤的server zone
    if (availableZones != null) {
        return availableZones.contains(input.getServer().getZone());
    } else {
        return false;
    }
}
5.1.2 使用轮询策略选出被调用服务器
private final AtomicInteger nextIndex = new AtomicInteger();
private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

轮询的算法还是很简单的,不多赘述。

5.2 开启自定义负载均衡策略

如果想要开启自定义负载均衡策略,可以有两种方式:
1、使用Ribbon中已存在的负载均衡策略,如轮询

/**
 * RoundRobinRule 轮询负载均衡策略
 */
@Configuration
public class RoundRobinRuleConfig {
    @Bean
    public IRule roundRobinRule() {
        return new RoundRobinRule();
    }
}

2、实现IRule接口并将其作为配置类

/**
 * 自定义负载均衡策略
 */
@Configuration
public class MyRule implements IRule {
    @Override
    public Server choose(Object key) {
        // do something
        return null;
    }

    @Override
    public void setLoadBalancer(ILoadBalancer lb) {
        // do something
    }

    @Override
    public ILoadBalancer getLoadBalancer() {
        // do something
        return null;
    }

    @Bean
    public IRule myRule() {
        return new MyRule();
    }
}
5.3 RoundRobinRule 轮询策略
public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        // 可用服务器和所有服务器信息
        List<Server> reachableServers = lb.getReachableServers();
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        // 无可用服务器
        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }

        // 获取下一次请求调用的服务器
        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        // 判断获取到的服务器状态,若不可用,继续循环;如可用,则返回
        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    // 若尝试获取服务器次数大于等于10,则直接返回;并记录警告日志
    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: " + lb);
    }
    return server;
}

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextServerCyclicCounter.get();
        int next = (current + 1) % modulo;
        if (nextServerCyclicCounter.compareAndSet(current, next))
            return next;
    }
}
5.4 RandomRule 随机策略
public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        return null;
    }
    Server server = null;

    while (server == null) {
        if (Thread.interrupted()) {
            return null;
        }

        // 可用服务
        List<Server> upList = lb.getReachableServers();
        // 所有服务
        List<Server> allList = lb.getAllServers();
        // 所有服务数
        int serverCount = allList.size();
        // 无服务
        if (serverCount == 0) {
            /*
             * No servers. End regardless of pass, because subsequent passes only get more restrictive.
             */
            return null;
        }

        // 随机获取一个server
        int index = chooseRandomInt(serverCount);
        server = upList.get(index);

        // 判断server可用性,返回server或继续重试
        if (server == null) {
            /*
             * The only time this should happen is if the server list were somehow trimmed.
             * This is a transient condition. Retry after yielding.
             */
            Thread.yield();
            continue;
        }

        if (server.isAlive()) {
            return (server);
        }

        // Shouldn't actually happen.. but must be transient or a bug.
        server = null;
        Thread.yield();
    }

    return server;

}

// 获取一个[0~serverCount)随机数,使用ThreadLocalRandom减少竞争,提高并发效率
protected int chooseRandomInt(int serverCount) {
    return ThreadLocalRandom.current().nextInt(serverCount);
}
5.4 WeightedResponseTimeRule 权重策略

使用平均响应时间为每个服务器分配动态“权重”的规则,然后以“加权循环”方式使用该规则。相对于轮询、随机,该策略稍微复杂一些。

5.4.1 维护权重

该方法首先会开启定时任务来维护权重,即服务器的平均响应时间。

void initialize(ILoadBalancer lb) {
    if (serverWeightTimer != null) {
        serverWeightTimer.cancel();
    }
    serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true);
    serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval);
    // do a initial run
    ServerWeight sw = new ServerWeight();
    sw.maintainWeights();
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        public void run() {
            logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);
            serverWeightTimer.cancel();
        }
    }));
}

class DynamicServerWeightTask extends TimerTask {
    public void run() {
        ServerWeight serverWeight = new ServerWeight();
        try {
            serverWeight.maintainWeights();
        } catch (Exception e) {
            logger.error("Error running DynamicServerWeightTask for {}", name, e);
        }
    }
}

class ServerWeight {

    // 权重维护
    public void maintainWeights() {
        ILoadBalancer lb = getLoadBalancer();
        if (lb == null) {
            return;
        }

        if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {
            return;
        }

        try {
            // 开始调整权重
            logger.info("Weight adjusting job started ...");

            // 获取负载均衡数据信息统计
            AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
            LoadBalancerStats stats = nlb.getLoadBalancerStats();
            if (stats == null) {
                // no statistics, nothing to do
                return;
            }

            // 计算所有服务器平均响应时间之和
            double totalResponseTime = 0;
            // find maximal 95% response time
            for (Server server : nlb.getAllServers()) {
                // this will automatically load the stats if not in cache
                ServerStats ss = stats.getSingleServerStat(server);
                totalResponseTime += ss.getResponseTimeAvg();
            }

            // weight for each server is (sum of responseTime of all servers - responseTime)
            // so that the longer the response time, the less the weight and the less likely to be chosen
            // 每个服务器的权重为(所有服务器的响应时间之和 - 平均响应时间)
            // 因此,响应时间越长,权重越小,被选择的可能性就越小
            // 服务器权重之和
            Double weightSoFar = 0.0;

            // create new list and hot swap the reference
            List<Double> finalWeights = new ArrayList<Double>();
            for (Server server : nlb.getAllServers()) {
                ServerStats ss = stats.getSingleServerStat(server);
                // 单独服务器权重 = 所有服务器的响应时间之和 - 平均响应时间
                double weight = totalResponseTime - ss.getResponseTimeAvg();
                // 累加服务器权重之和
                weightSoFar += weight;
                // 添加累加服务器权重之和到累计权重列表之中
                finalWeights.add(weightSoFar);
            }
            setWeights(finalWeights);
        } catch (Exception e) {
            logger.error("Error calculating server weights", e);
        } finally {
            serverWeightAssignmentInProgress.set(false);
        }

    }
}
5.4.2 选择服务器

然后通过choose方法根据权重来挑选服务器,即平均响应时间越短越有可能被选中

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        return null;
    }
    Server server = null;

    while (server == null) {
        // get hold of the current reference in case it is changed from the other thread
        // 获取当前引用,以防它被其他线程更改
        List<Double> currentWeights = accumulatedWeights;
        if (Thread.interrupted()) {
            return null;
        }

        // 所有服务器信息
        List<Server> allList = lb.getAllServers();
        int serverCount = allList.size();
        if (serverCount == 0) {
            return null;
        }

        int serverIndex = 0;
        // last one in the list is the sum of all weights
        // 最大权重;若权重累积集合为空,则取0;否则取权重累积集合中最后一个元素
        double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);

        // No server has been hit yet and total weight is not initialized,fallback to use round robin
        // 若若权重累积尚未被初始化,或者统计的服务数量与总服务器数量不等,则使用轮询负载均衡策略
        if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
            server = super.choose(getLoadBalancer(), key);
            if(server == null) {
                return server;
            }
        } else {
            // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
            // 随机生成一个在0(包含)到 最大权重(不包含)之间的随机数
            double randomWeight = random.nextDouble() * maxTotalWeight;
            // pick the server index based on the randomIndex
            // 根据生成的随机数在累计权重列表中获取下标,并根据下标获取对应的服务器
            int n = 0;
            for (Double d : currentWeights) {
                if (d >= randomWeight) {
                    serverIndex = n;
                    break;
                } else {
                    n++;
                }
            }
            server = allList.get(serverIndex);
        }

        // 判断server可用性,返回server或继续重试
        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive()) {
            return (server);
        }

        // Next.
        server = null;
    }
    return server;
}

6、Ribbon 重试机制

前面已分析了Ribbon通过RestTemplate开启负载均衡,并分析了完整的一次Http调用过程,那如果在调用过程中发生错误,我们想重试调用的话,可以使用Ribbon的重试机制。前文分析到在RetryLoadBalancerInterceptor类的intercept方法中有如下代码:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
        final ClientHttpRequestExecution execution) throws IOException {
    // 获取URL和serviceName
    final URI originalUri = request.getURI();
    final String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    // 获取负载均衡重试策略并创建重试模板
    final LoadBalancedRetryPolicy retryPolicy = this.lbRetryFactory.createRetryPolicy(serviceName,this.loadBalancer);
    // 创建重试策略模板,并为模板设置重试策略;
    // 未开启重试机制使用NeverRetryPolicy策略;
    // 若开启重试机制使用InterceptorRetryPolicy策略
    RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
    // 执行请求
    return template.execute(context -> {
        ServiceInstance serviceInstance = null;
        // 获取serviceInstance,如果为禁用重试策略,则这里是true
        if (context instanceof LoadBalancedRetryContext) {
            LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
            serviceInstance = lbContext.getServiceInstance();
        }
        // 若未能获取到serviceInstance,则通过loadBalancer选择一个服务器
        if (serviceInstance == null) {
            serviceInstance = this.loadBalancer.choose(serviceName);
        }
        // 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
        ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer
                .execute(serviceName, serviceInstance,this.requestFactory.createRequest(request, body, execution));
        int statusCode = response.getRawStatusCode();
        // 重试策略不为null 且 重试策略指定的重试http状态码包含当前请求返回的状态码
        if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
            byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
            response.close();
            throw new ClientHttpResponseStatusCodeException(serviceName, response,bodyCopy);
        }
        return response;
    }, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
        // This is a special case, where both parameters to LoadBalancedRecoveryCallback are the same.
        // In most cases they would be different.
        @Override
        protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
            return response;
        }
    });
}

如果重试策略不为null,并且重试策略指定的重试http状态码包含当前请求返回的状态码,那么Ribbon会重试该Http请求。其中获取重试策略通过this.lbRetryFactory.createRetryPolicy(serviceName,this.loadBalancer)来完成,我们来看其具体的获取过程。

/**
 * 创建负载均衡的重试策略模板
 */
@Override
public LoadBalancedRetryPolicy createRetryPolicy(String service,ServiceInstanceChooser serviceInstanceChooser) {
	RibbonLoadBalancerContext lbContext = this.clientFactory.getLoadBalancerContext(service);
	return new RibbonLoadBalancedRetryPolicy(service, lbContext,
			serviceInstanceChooser, clientFactory.getClientConfig(service));
}

public RibbonLoadBalancedRetryPolicy(String serviceId,
                                     RibbonLoadBalancerContext context,
                                     ServiceInstanceChooser loadBalanceChooser,
                                     IClientConfig clientConfig) {
    this.serviceId = serviceId;
    this.lbContext = context;
    this.loadBalanceChooser = loadBalanceChooser;
    // 获取并设置重试的http状态码
    // 通过clientName.ribbon.retryableStatusCodes设置
    String retryableStatusCodesProp = clientConfig.getPropertyAsString(RETRYABLE_STATUS_CODES, "");
    String[] retryableStatusCodesArray = retryableStatusCodesProp.split(",");
    for (String code : retryableStatusCodesArray) {
        if (!StringUtils.isEmpty(code)) {
            try {
                retryableStatusCodes.add(Integer.valueOf(code.trim()));
            } catch (NumberFormatException e) {
                log.warn("We cant add the status code because the code [ " + code
                        + " ] could not be converted to an integer. ", e);
            }
        }
    }
}

RibbonLoadBalancedRetryPolicy策略实例化的过程很简单,除了将必要的变量信息赋值以外,还解析了重试http状态码。http状态码通过clientName.ribbon.retryableStatusCodes指定,而且从源码中我们已经可以看到多个状态码之间用英文逗号分隔,而且必须是整形数字。那么到这里重试策略已经获取完成了。接下来再看看Ribbon的常用重试配置,其具体的作用见注释:

# 允许重试,该属性默认为true
spring.cloud.loadbalancer.retry.enabled=true
# 同一个Server最大的重试次数,该属性值默认为0
spring-cloud-provider.ribbon.MaxAutoRetries=5
# 最大重试Server的个数,该属性默认值为1
spring-cloud-provider.ribbon.MaxAutoRetriesNextServer=5
# 是否开启任何异常都重试(默认在get请求下会重试,其他情况不会重试,除非设置为true)
spring-cloud-provider.ribbon.OkToRetryOnAllOperations=false
# 指定重试的http状态码
spring-cloud-provider.ribbon.retryableStatusCodes=404,401,500
6.1、重试策略触发及流程

RetryLoadBalancerInterceptor的intercept方法,最终会通过RetryTemplate进行调用,其调用方法如下:

protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
			RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {

    RetryPolicy retryPolicy = this.retryPolicy;
    BackOffPolicy backOffPolicy = this.backOffPolicy;

    // Allow the retry policy to initialise itself...
    // 允许重试策略自我初始化
    RetryContext context = open(retryPolicy, state);
    if (this.logger.isTraceEnabled()) {
        this.logger.trace("RetryContext retrieved: " + context);
    }

    // Make sure the context is available globally for clients who need it...
    // 注册重试上下文
    RetrySynchronizationManager.register(context);

    Throwable lastException = null;

    boolean exhausted = false;
    try {

        // Give clients a chance to enhance the context...
        // 给客户一个增强上下文的机会
        // 若RetryListener的open方法返回false,整个重试将被终止,并抛出异常
        boolean running = doOpenInterceptors(retryCallback, context);

        if (!running) {
            // 在第一次尝试前拦截器异常终止重试
            throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
        }

        // Get or Start the backoff context...
        BackOffContext backOffContext = null;
        Object resource = context.getAttribute("backOffContext");

        if (resource instanceof BackOffContext) {
            backOffContext = (BackOffContext) resource;
        }

        if (backOffContext == null) {
            backOffContext = backOffPolicy.start(context);
            if (backOffContext != null) {
                context.setAttribute("backOffContext", backOffContext);
            }
        }

        /*
         * We allow the whole loop to be skipped if the policy or context already forbid the first try.
         * This is used in the case of external retry to allow a
         * recovery in handleRetryExhausted without the callback processing (which
         * would throw an exception).
         *
         * canRetry -->  决定是否继续进行正在进行的重试尝试。
         *               此方法在执行RetryCallback之前调用,但在执行backoff和open拦截器之后调用。
         *
         * context.isExhaustedOnly() --> RetryContext已经设置了不再尝试或重试当前的RetryCallback
         *
         */
        while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
            try {
                this.logger.info("...Retry: count=" + context.getRetryCount());
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Retry: count=" + context.getRetryCount());
                }
                // Reset the last exception,
                // so if we are successful the close interceptors will not think we failed...
                // 重置(清空)最后的一次异常
                lastException = null;
                return retryCallback.doWithRetry(context);
            }
            catch (Throwable e) {
                lastException = e;
                try {
                    // 注册异常信息,并记录当前重试次数
                    registerThrowable(retryPolicy, state, context, e);
                }
                catch (Exception ex) {
                    throw new TerminatedRetryException("Could not register throwable",ex);
                }
                finally {
                    doOnErrorInterceptors(retryCallback, context, e);
                }

                if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                    try {
                        backOffPolicy.backOff(backOffContext);
                    }
                    catch (BackOffInterruptedException ex) {
                        lastException = e;
                        // back off was prevented by another thread - fail the retry
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Abort retry because interrupted: count="	+ context.getRetryCount());
                        }
                        throw ex;
                    }
                }

                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
                }

                if (shouldRethrow(retryPolicy, context, state)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Rethrow in retry for policy: count="+ context.getRetryCount());
                    }
                    throw RetryTemplate.<E>wrapIfNecessary(e);
                }
            }
            /*
             * A stateful attempt that can retry may rethrow the exception before now,
             * but if we get this far in a stateful retry there's a reason for it,
             * like a circuit breaker or a rollback classifier.
             *
             * 可以重试的有状态尝试可能会在现在之前重新抛出异常,
             * 但是如果我们在有状态重试中走到这一步,那么这是有原因的,
             * 比如断路器或回滚分类器。
             */
            if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                break;
            }
        }

        if (state == null && this.logger.isDebugEnabled()) {
            this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
        }
        exhausted = true;
        return handleRetryExhausted(recoveryCallback, context, state);
    }
    catch (Throwable e) {
        throw RetryTemplate.<E>wrapIfNecessary(e);
    }
    finally {
        close(retryPolicy, context, state, lastException == null || exhausted);
        doCloseInterceptors(retryCallback, context, lastException);
        RetrySynchronizationManager.clear();
    }
}

该段代码较长,其中重点有两部分:
第一:while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) 判断是否可继续重试,这里包含了两个条件。
第一个条件,canRetry 是否可重试,与具体的重试策略有关,我们来看下RibbonLoadBalancedRetryPolicy策略是如何判断可重试的:

public boolean canRetry(LoadBalancedRetryContext context) {
    HttpMethod method = context.getRequest().getMethod();
    return HttpMethod.GET == method || lbContext.isOkToRetryOnAllOperations();
}

如果当前请求类型是GET或者设置了spring-cloud-provider.ribbon.OkToRetryOnAllOperations=true,那么该请求满足canRetry条件。
第二个条件,RetryContext上下文已经设置了不再尝试或重试当前的RetryCallback,例如重试次数已经达到我们设置的重试次数之后,Ribbon会通过setExhaustedOnly设置该属性为true,不再进行请求重试。

第二:registerThrowable(retryPolicy, state, context, e) 注册异常信息,在RetryLoadBalancerInterceptor类的intercept方法中已经介绍过,当被调用server返回的状态码中包含我们通过spring-cloud-provider.ribbon.retryableStatusCodes配置的状态码时会抛出ClientHttpResponseStatusCodeException,该异常会被捕获到并通过registerThrowable方法注册异常信息。

@Override
public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
    // if this is a circuit tripping exception then notify the load balancer
    // 如果该异常是断路异常,则通知负载均衡器
    if (lbContext.getRetryHandler().isCircuitTrippingException(throwable)) {
        updateServerInstanceStats(context);
    }
    // Check if we need to ask the load balancer for a new server.
    // Do this before we increment the counters because the first call to this method
    // is not a retry it is just an initial failure.
    // 检查是否需要向负载平衡器请求新服务器。
    // 在增加计数器之前执行此操作,因为对该方法的第一次调用不是重试,而是初始失败。
    if (!canRetrySameServer(context) && canRetryNextServer(context)) {
        context.setServiceInstance(loadBalanceChooser.choose(serviceId));
    }
    // This method is called regardless of whether we are retrying or making the first request.
    // Since we do not count the initial request in the retry count we don't reset the counter
    // until we actually equal the same server count limit. This will allow us to make the initial
    // request plus the right number of retries.
    // 无论我们是重试还是发出第一个请求,都会调用此方法。
    // 由于在重试计数中不计算初始请求,所以在实际等于相同的服务器计数限制之前不会重置计数器。
    // 这将允许我们发出初始请求和正确的重试次数。
    /**
     * 实例变量 sameServerCount 和 nextServerCount 默认都是0
     *
     * 这里假如我们开启了重试机制,那么MaxAutoRetries和MaxAutoRetriesNextServer(默认值为1)配置将决定重试次数
     *
     * 若重试包含首次调用的话,那么重试次数 =(MaxAutoRetries+1)*(MaxAutoRetriesNextServer+1)
     */
    if (sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context)) {
        // reset same server since we are moving to a new server
        sameServerCount = 0;
        nextServerCount++;
        // 若不满足在其他服务器实例重试的条件,则将终止重试标识设置为true
        if (!canRetryNextServer(context)) {
            context.setExhaustedOnly();
        }
    } else {
        sameServerCount++;
    }
}

该段代码最重要的就是最后的ifelse判断,在分析这段代码之前,先来看一下canRetryNextServer方法:

@Override
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
    return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context);
}

canRetryNextServer判断条件很简单,满足canRetry条件,前文已有介绍,nextServerCount计数器小于等于spring-cloud-provider.ribbon.MaxAutoRetriesNextServer。了解了canRetryNextServer之后,再来看ifelse判断,

标签:return,SpringCloud,server,重试,context,null,public,Ribbon
来源: https://blog.csdn.net/lyc_liyanchao/article/details/120235127

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有