ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

大话微服务」深入聊聊SpringCloud之客户端负载均衡机制

2019-06-11 17:55:56  阅读:217  来源: 互联网

标签:return SpringCloud 大话 request serviceId IOException public 客户端


1)服务指的就是运行的节点(IP+Port)信息。

2)节点在启动时会将自身的信息注册到注册中心。

3)ServiceId是对节点运行的工程的标识。

4)InstanceId是对运行的节点的标识。

通过一个例子来捋一捋:

假设有一个和用户管理相关的工程,叫做UserManager。为了高可用,把该工程重复运行了4份,分别用um-01/um-02/um-03/um-04表示。假设它们都运行在本地,端口分别是8081/8082/8083/8084。

它们的注册信息分别是:

ServiceId=UserManager,InstanceId=um-01,IP:Port=127.0.0.1:8081

ServiceId=UserManager,InstanceId=um-02,IP:Port=127.0.0.1:8082

ServiceId=UserManager,InstanceId=um-03,IP:Port=127.0.0.1:8083

ServiceId=UserManager,InstanceId=um-04,IP:Port=127.0.0.1:8084

很显然ServiceId都是一样的,因为本来就是一个工程嘛。如果不想使用工程名的话,可以使用应用名。例如使用spring.application.name表示。

很显然InstanceId都不一样,毕竟实例之间互相独立嘛。这个实例Id通常是在注册时由注册中心自动生成,无需指定。

IP和Port肯定也不一样了,而且也无需指定,程序会自动检测运行时的IP和Port。

简言之,一个服务可以有多个实例,实例之间完全相同又互相独立,假如某个实例挂掉了,没关系,其它实例照样可以提供服务。

×××,非也


“单身狗”是令人可悲的,有一个更可悲的就是同时遇上两个对象,一个是你爱的,一个是爱你的。怎么办?要不一个一三五,一个二四六,周日不限号。哈哈。

“一个”会有单点问题,“多个”虽然解决了这个问题,但是又引入了另一个问题,那就是“选择综合症”。可以看到,随着事物发展,问题自然出现。真的,一切都是那么的自然。

顺便侃一侃我的看法,纯属娱乐一下。选择综合症的主要意思是指,不是担心自己选择的那个不好,而是害怕自己没有选择的那个会更好。

举个例子,假如你一不小心考了个省状元,该去清华呢还是北大呢。去清华吧,害怕万一去北大会更好呢。去北大吧,又担心万一去清华更好呢。为了避免出现以上两种情况,经过一番深思熟虑,最终去了蓝翔。哈哈。

Spring Cloud 深知选择痛苦,众口难调,所以就非常聪明地避开这些问题。那就是做出一套合理的抽象,只有规范,没有细节,每个人根据自己的口味整去吧。

抽象,用过的都说好


Spring Cloud 的目标原本就是做一块“主板”,抽象出合理的“接口”和“布线”,方便其他组件的插拔。

一个ServiceId表示一个服务,但是它可能会有多份实例,我们的请求肯定是在其中的一个实例上被执行的。所以要先能选出一个实例来,才可能会有后续的事情。

于是就有了一个简单粗暴的接口,ServiceInstanceChooser,服务实例选择器:


public interface ServiceInstanceChooser {
 /**
 * Chooses a ServiceInstance from the LoadBalancer for the specified service.
 * @param serviceId The service ID to look up the LoadBalancer.
 * @return A ServiceInstance that matches the serviceId.
 */
 ServiceInstance choose(String serviceId);
}


这个接口只能为我们选出一个实例,但这是不够的,因为我们还要执行请求啊,所以就有了进一步的接口,LoadBalancerClient,负载均衡器客户端:


public interface LoadBalancerClient extends ServiceInstanceChooser {
 /**
 * Executes request using a ServiceInstance from the LoadBalancer for the specified
 * service.
 * @param serviceId The service ID to look up the LoadBalancer.
 * @param request Allows implementations to execute pre and post actions, such as
 * incrementing metrics.
 * @param <T> type of the response
 * @throws IOException in case of IO issues.
 * @return The result of the LoadBalancerRequest callback on the selected
 * ServiceInstance.
 */
 <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
 /**
 * Executes request using a ServiceInstance from the LoadBalancer for the specified
 * service.
 * @param serviceId The service ID to look up the LoadBalancer.
 * @param serviceInstance The service to execute the request to.
 * @param request Allows implementations to execute pre and post actions, such as
 * incrementing metrics.
 * @param <T> type of the response
 * @throws IOException in case of IO issues.
 * @return The result of the LoadBalancerRequest callback on the selected
 * ServiceInstance.
 */
 <T> T execute(String serviceId, ServiceInstance serviceInstance,
 LoadBalancerRequest<T> request) throws IOException;
 /**
 * Creates a proper URI with a real host and port for systems to utilize. Some systems
 * use a URI with the logical service name as the host, such as
 * http://myservice/path/to/service. This will replace the service name with the
 * host:port from the ServiceInstance.
 * @param instance service instance to reconstruct the URI
 * @param original A URI with the host as a logical service name.
 * @return A reconstructed URI.
 */
 URI reconstructURI(ServiceInstance instance, URI original);
}


先来看第三个方法,重新构建URI,为什么要重新构建呢?其实原因很简单,下面通过例子说明。

假如我要写一个“用户查询”的地址,可是有四个服务实例呢,到底该写哪个IP和端口呢?既然不知道,那干脆就只写一个标识符(或称占位符)吧,到时让程序动态选择一个得了。

于是地址就是这样的:http://UserManager/users/queryDetails/{userId},这里的UserManager其实就是ServiceId了,运行时程序会把它替换为四份实例中的一个的地址,于是就变为:http://IP:Port/users/queryDetails/{userId}。

这就是对URI的重新构建,好理解吧。这其实就是众人皆知的负载均衡的原理了。

再看前两个方法,虽然形式不同,但本质一样,都是在一个实例上执行一个请求,并返回响应。所以方法名都叫执行。具体怎么执行请求,那就要看实现类怎么实现了。

但是比执行一个请求更重要的是,如何表示一个请求?

a)如果是执行一个普通Java方法的话,需要有方法本身Method,目标对象Object,方法参数Args。

b)如果要执行的是一个HTTP请求的话,需要有请求方法Method(GET/POST等),请求地址URL,还有一些参数,如表单数据,查询字符串或头信息等。

那这里的请求应该如何表示呢?就是这个接口,LoadBalancerRequest<T>:


public interface LoadBalancerRequest<T> {
 T apply(ServiceInstance instance) throws Exception;
}


看完上面的源码后,发现只有一个apply方法,泛型参数T就表示响应。意思就是在选出的服务实例上“应用”一下,就得到了响应。这也太抽象了吧,完全看不出意图是什么。

那就再找找有没有和它相似接口,很容找到一个,LoadBalancerRequestTransformer,请求转换器:


public interface LoadBalancerRequestTransformer {
 /**
 * Order for the load balancer request tranformer.
 */
 int DEFAULT_ORDER = 0;
 HttpRequest transformRequest(HttpRequest request, ServiceInstance instance);
}


也只有一个转换请求方法,可以根据服务实例完成对HttpRequest的转换。我们发现这里的请求是基于HTTP的,也就是说微服务之间是使用HTTP协议进行调用的。那为啥是Http呢?

因为现在的微服务理论好多都是来自 Martin Fowler 的那篇微服务的文章,下面是其中一句话:

In short, the microservice architectural style is an approach to developing a single application as a suite of small services, each running in its own process and communicating with lightweight mechanisms, often an HTTP resource API.


里面写到:“使用轻量级的机制通信,通常是HTTP资源API”,即restful形式的。所以 Spring Cloud 里面通常使用RestTemplate调用其它微服务。

现在只要能创建出LoadBalancerRequest接口的实例,然后在选出的服务实例上apply一下,就可以获取到响应了。正好有一个工厂类来负责创建,它就是LoadBalancerRequestFactory:


public class LoadBalancerRequestFactory {
 private LoadBalancerClient loadBalancer;
 private List<LoadBalancerRequestTransformer> transformers;
 public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
 List<LoadBalancerRequestTransformer> transformers) {
 this.loadBalancer = loadBalancer;
 this.transformers = transformers;
 }
 public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
 this.loadBalancer = loadBalancer;
 }
 public LoadBalancerRequest<ClientHttpResponse> createRequest(
 final HttpRequest request, final byte[] body,
 final ClientHttpRequestExecution execution) {
 return instance -> {
 HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
 this.loadBalancer);
 if (this.transformers != null) {
 for (LoadBalancerRequestTransformer transformer : this.transformers) {
 serviceRequest = transformer.transformRequest(serviceRequest,
 instance);
 }
 }
 return execution.execute(serviceRequest, body);
 };
 }
}


通过lambda表达式返回一个接口实例,在该实例上调用apply方法时会执行这个lambda表达式。

这个lambda表达式主要做三件事:

1)一是使用选出来的实例的IP和Port重新构建HttpRequest中的URI,

2)二是使用转换器对HttpRequest进行转换,

3)三是进行Http请求调用。

其中第三步其实使用的是spring-web工程里面的http客户端机制发起调用了,因为被ClientHttpRequestExecution这个类给暴露了,它的背后是抽象好的一套完善的http客户端机制。

spring-web里的http客户端模型


发送http请求,获取响应,这是很常见的功能,其实就是一个http的客户端,所以Spring框架就给实现了,位于spring-web工程里的org.springframework.http.client包下。

它的模型或者说抽象思路并不复杂,有一点需要注意,这是站在客户端的角度的,和我们经常遇到的服务器端正好相反。

Http是基于请求/响应的,所以得有个请求,ClientHttpRequest,客户端请求接口:


public interface ClientHttpRequest extends HttpRequest, HttpOutputMessage {
 /**
 * Execute this request, resulting in a {@link ClientHttpResponse} that can be read.
 * @return the response result of the execution
 * @throws IOException in case of I/O errors
 */
 ClientHttpResponse execute() throws IOException;
}


我们要把请求信息写入这个接口,以客户端的立场来看相当于“输出”,所以继承了HttpOutputMessage接口。它只有一个方法,叫执行,直接就返回响应。

所以响应就是,ClientHttpResponse,客户端响应接口:


public interface ClientHttpResponse extends HttpInputMessage, Closeable {
 /**
 * Return the HTTP status code of the response.
 * @return the HTTP status as an HttpStatus enum value
 * @throws IOException in case of I/O errors
 * @throws IllegalArgumentException in case of an unknown HTTP status code
 * @see HttpStatus#valueOf(int)
 */
 HttpStatus getStatusCode() throws IOException;
 /**
 * Return the HTTP status code (potentially non-standard and not
 * resolvable through the {@link HttpStatus} enum) as an integer.
 * @return the HTTP status as an integer
 * @throws IOException in case of I/O errors
 * @since 3.1.1
 * @see #getStatusCode()
 * @see HttpStatus#resolve(int)
 */
 int getRawStatusCode() throws IOException;
 /**
 * Return the HTTP status text of the response.
 * @return the HTTP status text
 * @throws IOException in case of I/O errors
 */
 String getStatusText() throws IOException;
 /**
 * Close this response, freeing any resources created.
 */
 @Override
 void close();
}


我们要把结果从这个接口中读出来,以客户端的立场来看相当于“输入”,所以继承了HttpInputMessage接口。

可以看到,只要能够获取“请求接口”的实例,就可以调用“执行”方法了。按照惯例,自然有相关的工厂类来创建实例了,ClientHttpRequestFactory,客户端请求工厂:


public interface ClientHttpRequestFactory {
 /**
 * Create a new {@link ClientHttpRequest} for the specified URI and HTTP method.
 * <p>The returned request can be written to, and then executed by calling
 * {@link ClientHttpRequest#execute()}.
 * @param uri the URI to create a request for
 * @param httpMethod the HTTP method to execute
 * @return the created request
 * @throws IOException in case of I/O errors
 */
 ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException;
}


它是一个工厂接口,那么它的实现类就非常重要了,只要拿到了实现类,就相当于拿到了“响应”了。

为了使整个处理过程更加灵活,又引入了老掉牙的套路,就是拦截器了,ClientHttpRequestInterceptor,客户端请求拦截器:


public interface ClientHttpRequestInterceptor {
 /**
 * Intercept the given request, and return a response. The given
 * {@link ClientHttpRequestExecution} allows the interceptor to pass on the
 * request and response to the next entity in the chain.
 * <p>A typical implementation of this method would follow the following pattern:
 * <ol>
 * <li>Examine the {@linkplain HttpRequest request} and body</li>
 * <li>Optionally {@linkplain org.springframework.http.client.support.HttpRequestWrapper
 * wrap} the request to filter HTTP attributes.</li>
 * <li>Optionally modify the body of the request.</li>
 * <li><strong>Either</strong>
 * <ul>
 * <li>execute the request using
 * {@link ClientHttpRequestExecution#execute(org.springframework.http.HttpRequest, byte[])},</li>
 * <strong>or</strong>
 * <li>do not execute the request to block the execution altogether.</li>
 * </ul>
 * <li>Optionally wrap the response to filter HTTP attributes.</li>
 * </ol>
 * @param request the request, containing method, URI, and headers
 * @param body the body of the request
 * @param execution the request execution
 * @return the response
 * @throws IOException in case of I/O errors
 */
 ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
 throws IOException;
}


拦截器和过滤器是一样的,多个过滤器构成一个过滤器链,在链的尽头会调用Servlet。多个拦截器也构成一个拦截器链,在链的尽头会真正的发起http调用。

所以需要另外一个接口,ClientHttpRequestExecution,就是为了配合实现过滤器链的:


public interface ClientHttpRequestExecution {
 /**
 * Execute the request with the given request attributes and body,
 * and return the response.
 * @param request the request, containing method, URI, and headers
 * @param body the body of the request to execute
 * @return the response
 * @throws IOException in case of I/O errors
 */
 ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException;
}


所以在工厂创建一个请求接口实例时,会把拦截器直接包含进去,InterceptingClientHttpRequestFactory,这个类是对一个真正工厂的“装饰”,目的就是为了把拦截器链添加进去:


public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {
 private final List<ClientHttpRequestInterceptor> interceptors;
 /**
 * Create a new instance of the {@code InterceptingClientHttpRequestFactory} with the given parameters.
 * @param requestFactory the request factory to wrap
 * @param interceptors the interceptors that are to be applied (can be {@code null})
 */
 public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
 @Nullable List<ClientHttpRequestInterceptor> interceptors) {
 super(requestFactory);
 this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());
 }
 @Override
 protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
 return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
 }
}


它的父类里包含了真正的工厂实例,那才是用来真正创建请求实例的。

还需要一个类来处理链式调用的逻辑,InterceptingRequestExecution,就是如果后面还有拦截器,就执行后面的拦截器,否则就发起http调用:


 private class InterceptingRequestExecution implements ClientHttpRequestExecution {
 private final Iterator<ClientHttpRequestInterceptor> iterator;
 public InterceptingRequestExecution() {
 this.iterator = interceptors.iterator();
 }
 @Override
 public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
 if (this.iterator.hasNext()) {
 ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
 return nextInterceptor.intercept(request, body, this);
 }
 else {
 HttpMethod method = request.getMethod();
 Assert.state(method != null, "No standard HTTP method");
 ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
 request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
 if (body.length > 0) {
 if (delegate instanceof StreamingHttpOutputMessage) {
 StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
 streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
 }
 else {
 StreamUtils.copy(body, delegate.getBody());
 }
 }
 return delegate.execute();
 }
 }
 }


可以看它的执行方法,前面都是对拦截器的逐个迭代调用,最后是使用工厂创建了一个请求接口的实例,然后调用该实例的执行方法,返回响应。

这就是http客户端的整体情况,这只是抽象,不包括具体实现的,所以不算复杂。那么Spring也提供了四种实现:

基于 Apache HttpComponents 的实现,基于 Netty4 的实现,基于OkHttp3 的实现,还有基于JDK的 HttpURLConnection 的实现。

其实RestTemplate就是一个rest风格的http客户端,内部机制和上面完全一样,只是添加了很多好用的rest风格的便捷方法而已。

让RestTemplate支持微服务调用


从上面得知,RestTemplate就是一个rest风格的http客户端,使用它在Spring Cloud内部进行微服务调用是再合适不过了。

但是会有一个问题,微服务调用时只能指定ServiceId,IP和Port是动态发现的。RestTemplate是不具备这个功能的。但是要加上这个功能也是很简单的。

只需在中间某个环节拦截住执行流程,把ServiceId替换为动态发现的IP和Port,不就搞定了嘛。这种能力自然非拦截器莫属了。

所以Spring Cloud就专门实现了一个拦截器,LoadBalancerInterceptor,负载均衡器拦截器:


public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
 private LoadBalancerClient loadBalancer;
 private LoadBalancerRequestFactory requestFactory;
 public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
 LoadBalancerRequestFactory requestFactory) {
 this.loadBalancer = loadBalancer;
 this.requestFactory = requestFactory;
 }
 public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
 // for backwards compatibility
 this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
 }
 @Override
 public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
 final ClientHttpRequestExecution execution) throws IOException {
 final URI originalUri = request.getURI();
 String serviceName = originalUri.getHost();
 Assert.state(serviceName != null,
 "Request URI does not contain a valid hostname: " + originalUri);
 return this.loadBalancer.execute(serviceName,
 this.requestFactory.createRequest(request, body, execution));
 }
}


其实就是在一个负载均衡器上执行一个负载均衡请求,这个负载均衡请求实例,虽然是工厂创建出来的,其实就是一个lambda表达式,就起一个包装作用。

为了将这个拦截器应用到RestTemplate上,特意定义了这个接口,RestTemplateCustomizer,RestTemplate自定制器:


public interface RestTemplateCustomizer {
 void customize(RestTemplate restTemplate);
}


再来看这个具体的应用过程,是一个@Configuration类,LoadBalancerInterceptorConfig,拦截器配置:


 @Configuration
 @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
 static class LoadBalancerInterceptorConfig {
 @Bean
 public LoadBalancerInterceptor ribbonInterceptor(
 LoadBalancerClient loadBalancerClient,
 LoadBalancerRequestFactory requestFactory) {
 return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
 }
 @Bean
 @ConditionalOnMissingBean
 public RestTemplateCustomizer restTemplateCustomizer(
 final LoadBalancerInterceptor loadBalancerInterceptor) {
 return restTemplate -> {
 List<ClientHttpRequestInterceptor> list = new ArrayList<>(
 restTemplate.getInterceptors());
 list.add(loadBalancerInterceptor);
 restTemplate.setInterceptors(list);
 };
 }
 }


首先把RestTemplate里的所有拦截器都取出来,然后把负载均衡器拦截器添加到最后,最后再设置回RestTemplate中。

意思就是,在即将发起请求的上一步中,重新构建URI,把ServiceId替换为动态发现的真正的IP和Port,很好理解吧。

至此,RestTemplate就具备了客户端负载均衡的能力了。但是RestTemplate一开始并不是为Spring Cloud而生的,所以需要区分一下,哪些RestTemplate具备微服务调用的能力,哪些不具备。

为此,Spring Cloud专门定义了一个注解,@LoadBalanced,来为RestTemplate开启这种能力:


@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}


只要在注册RestTemplate时标上这个注解,那么这个RestTemplate就可以用来进行微服务调用了,

Netflix的ribbon组件


Spring Cloud的客户端负载均衡机制只是一套抽象。ribbon具有客户端负载均衡能力,但是和Spring Cloud完全没有关系。

要想把ribbon纳入Spring Cloud体系,就必须写代码实现这套抽象,把真正要做的事情委托给ribbon去做。可以理解为在Spring Cloud的机制和ribbon之间架起一座桥梁。

为了便于从整体上把握,使用RestTemplate把所有的知识点串一下,会变的比较清晰一些,也更容易理解。

当把@LoadBalanced注解标记在RestTemplate上时,表明需要开启微服务调用的能力。因为RestTemplate本质是基于spring-web里的http客户端实现的,所以只能按照这个客户端的机制去扩展。

所以此时会自动为RestTemplate再添加一个额外的拦截器,LoadBalancerInterceptor,这个拦截器就把我们拉入到Spring Cloud的范畴里了,因为它需要依赖两个类,LoadBalancerClient和LoadBalancerRequestFactory。

LoadBalancerClient是一个接口,需要有人实现它,稍后再说。LoadBalancerRequestFactory本身就是一个类,它使用Spring Cloud客户端负载均衡的理论把一个HttpRequest进行包装和预处理,最终委托给http客户端去发起真正的http调用。

所以我们刚刚从http客户端机制里被拉入到Spring Cloud客户端负载均衡理论里,又立马被拉回到http客户端机制里了。其实就是借助于RestTemplate把Spring Cloud的客户端负载均衡理论和http客户端机制相结合起来。

那么在Spring Cloud客户端负载均衡理论里需要实现一个接口,就是刚刚提到的LoadBalancerClient。在spring-web的http客户端机制里需要实现两个接口(其实好几个呢),ClientHttpRequest和ClientHttpRequestFactory。

因此在基于ribbon的实现(spring-cloud-netflix-ribbon)里,一定能找到这三个接口的实现类。

LoadBalancerClient接口的实现类是RibbonLoadBalancerClient,它的所有功能都是依赖于ribbon自身的能力来提供的。其实相当于一个适配器了。


public class RibbonLoadBalancerClient implements LoadBalancerClient {
 private SpringClientFactory clientFactory;
 public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
 this.clientFactory = clientFactory;
 }
 @Override
 public URI reconstructURI(ServiceInstance instance, URI original) {
 Assert.notNull(instance, "instance can not be null");
 String serviceId = instance.getServiceId();
 RibbonLoadBalancerContext context = this.clientFactory
 .getLoadBalancerContext(serviceId);
 URI uri;
 Server server;
 if (instance instanceof RibbonServer) {
 RibbonServer ribbonServer = (RibbonServer) instance;
 server = ribbonServer.getServer();
 uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
 }
 else {
 server = new Server(instance.getScheme(), instance.getHost(),
 instance.getPort());
 IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
 ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
 uri = updateToSecureConnectionIfNeeded(original, clientConfig,
 serverIntrospector, server);
 }
 return context.reconstructURIWithServer(server, uri);
 }
 @Override
 public ServiceInstance choose(String serviceId) {
 return choose(serviceId, null);
 }
 /**
 * New: Select a server using a 'key'.
 * @param serviceId of the service to choose an instance for
 * @param hint to specify the service instance
 * @return the selected {@link ServiceInstance}
 */
 public ServiceInstance choose(String serviceId, Object hint) {
 Server server = getServer(getLoadBalancer(serviceId), hint);
 if (server == null) {
 return null;
 }
 return new RibbonServer(serviceId, server, isSecure(server, serviceId),
 serverIntrospector(serviceId).getMetadata(server));
 }
 @Override
 public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
 throws IOException {
 return execute(serviceId, request, null);
 }
 /**
 * New: Execute a request by selecting server using a 'key'. The hint will have to be
 * the last parameter to not mess with the `execute(serviceId, ServiceInstance,
 * request)` method. This somewhat breaks the fluent coding style when using a lambda
 * to define the LoadBalancerRequest.
 * @param <T> returned request execution result type
 * @param serviceId id of the service to execute the request to
 * @param request to be executed
 * @param hint used to choose appropriate {@link Server} instance
 * @return request execution result
 * @throws IOException executing the request may result in an {@link IOException}
 */
 public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
 throws IOException {
 ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
 Server server = getServer(loadBalancer, hint);
 if (server == null) {
 throw new IllegalStateException("No instances available for " + serviceId);
 }
 RibbonServer ribbonServer = new RibbonServer(serviceId, server,
 isSecure(server, serviceId),
 serverIntrospector(serviceId).getMetadata(server));
 return execute(serviceId, ribbonServer, request);
 }
 @Override
 public <T> T execute(String serviceId, ServiceInstance serviceInstance,
 LoadBalancerRequest<T> request) throws IOException {
 Server server = null;
 if (serviceInstance instanceof RibbonServer) {
 server = ((RibbonServer) serviceInstance).getServer();
 }
 if (server == null) {
 throw new IllegalStateException("No instances available for " + serviceId);
 }
 RibbonLoadBalancerContext context = this.clientFactory
 .getLoadBalancerContext(serviceId);
 RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
 try {
 T returnVal = request.apply(serviceInstance);
 statsRecorder.recordStats(returnVal);
 return returnVal;
 }
 // catch IOException and rethrow so RestTemplate behaves correctly
 catch (IOException ex) {
 statsRecorder.recordStats(ex);
 throw ex;
 }
 catch (Exception ex) {
 statsRecorder.recordStats(ex);
 ReflectionUtils.rethrowRuntimeException(ex);
 }
 return null;
 }
}


ClientHttpRequest接口的实现类是RibbonHttpRequest,它也是使用ribbon的RestClient(com.netflix.niws.client.http.RestClient)客户端发起的请求。


public class RibbonHttpRequest extends AbstractClientHttpRequest {
 private HttpRequest.Builder builder;
 private URI uri;
 private HttpRequest.Verb verb;
 private RestClient client;
 private IClientConfig config;
 private ByteArrayOutputStream outputStream = null;
 public RibbonHttpRequest(URI uri, HttpRequest.Verb verb, RestClient client,
 IClientConfig config) {
 this.uri = uri;
 this.verb = verb;
 this.client = client;
 this.config = config;
 this.builder = HttpRequest.newBuilder().uri(uri).verb(verb);
 }
 @Override
 public HttpMethod getMethod() {
 return HttpMethod.valueOf(verb.name());
 }
 @Override
 public String getMethodValue() {
 return getMethod().name();
 }
 @Override
 public URI getURI() {
 return uri;
 }
 @Override
 protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException {
 if (outputStream == null) {
 outputStream = new ByteArrayOutputStream();
 }
 return outputStream;
 }
 @Override
 protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
 try {
 addHeaders(headers);
 if (outputStream != null) {
 outputStream.close();
 builder.entity(outputStream.toByteArray());
 }
 HttpRequest request = builder.build();
 HttpResponse response = client.executeWithLoadBalancer(request, config);
 return new RibbonHttpResponse(response);
 }
 catch (Exception e) {
 throw new IOException(e);
 }
 }
 private void addHeaders(HttpHeaders headers) {
 for (String name : headers.keySet()) {
 // apache http RequestContent pukes if there is a body and
 // the dynamic headers are already present
 if (isDynamic(name) && outputStream != null) {
 continue;
 }
 // Don't add content-length if the output stream is null. The RibbonClient
 // does this for us.
 if (name.equals("Content-Length") && outputStream == null) {
 continue;
 }
 List<String> values = headers.get(name);
 for (String value : values) {
 builder.header(name, value);
 }
 }
 }
 private boolean isDynamic(String name) {
 return "Content-Length".equalsIgnoreCase(name)
 || "Transfer-Encoding".equalsIgnoreCase(name);
 }
}


ClientHttpRequestFactory接口的实现类是RibbonClientHttpRequestFactory,它来负责创建出上面这个请求的实例。


public class RibbonClientHttpRequestFactory implements ClientHttpRequestFactory {
 private final SpringClientFactory clientFactory;
 public RibbonClientHttpRequestFactory(SpringClientFactory clientFactory) {
 this.clientFactory = clientFactory;
 }
 @Override
 @SuppressWarnings("deprecation")
 public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod)
 throws IOException {
 String serviceId = originalUri.getHost();
 if (serviceId == null) {
 throw new IOException(
 "Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
 }
 IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
 RestClient client = this.clientFactory.getClient(serviceId, RestClient.class);
 HttpRequest.Verb verb = HttpRequest.Verb.valueOf(httpMethod.name());
 return new RibbonHttpRequest(originalUri, verb, client, clientConfig);
 }
}


最后,需要说明的是,即使是ribbon发起的http请求调用,底层使用的还是HttpComponents组件库或OkHttp组件库。

其实这里面充满了封装/适配,代码写的绕来绕去,有些乱啊。


标签:return,SpringCloud,大话,request,serviceId,IOException,public,客户端
来源: https://blog.51cto.com/14227759/2407487

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有