标签:java spring-boot-2 spring-webflux
我使用的是spring-boot版本2.0.0.M6.
我需要从spring-boot应用程序发出异步HTTP调用说APP1到另一个应用程序(播放框架)说APP2.
因此,如果我需要从APP1到APP2进行20次不同的异步调用,APP2会收到20个请求,其中很少是重复请求,这意味着这些重复请求替换了几个不同的请求.
预期:
api/v1/call/1
api/v1/call/2
api/v1/call/3
api/v1/call/4
实际:
api/v1/call/1
api/v1/call/2
api/v1/call/4
api/v1/call/4
我正在使用Spring反应式WebClient.
下面是build.gradle中的spring boot版本
buildscript {
ext {
springBootVersion = '2.0.0.M6'
//springBootVersion = '2.0.0.BUILD-SNAPSHOT'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
maven {url "https://plugins.gradle.org/m2/"}
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
classpath("se.transmode.gradle:gradle-docker:1.2")
}
}
我的WebClient init片段
private WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector((HttpClientOptions.Builder builder) -> builder.disablePool()))
.build();
我的POST方法
public <T> Mono<JsonNode> postClient(String url, T postData) {
return Mono.subscriberContext().flatMap(ctx -> {
String cookieString = ctx.getOrDefault(Constants.SubscriberContextConstnats.COOKIES, StringUtils.EMPTY);
URI uri = URI.create(url);
return webClient.post().uri(uri).body(BodyInserters.fromObject(postData)).header(HttpHeaders.COOKIE, cookieString)
.exchange().flatMap(clientResponse ->
{
return clientResponse.bodyToMono(JsonNode.class);
})
.onErrorMap(err -> new TurtleException(err.getMessage(), err))
.doOnSuccess(jsonData -> {
});
});
}
调用此postClient方法的代码
private void getResultByKey(PremiumRequestHandler request, String key, BrokerConfig brokerConfig) {
/* Live calls for the insurers */
LOG.info("[PREMIUM SERVICE] LIVE CALLLLL MADE FOR: " + key + " AND REQUEST ID: " + request.getRequestId());
String uri = brokerConfig.getHostUrl() + verticalResolver.determineResultUrl(request.getVertical()) + key;
LOG.info("[PREMIUM SERVICE] LIVE CALL WITH URI : " + uri + " FOR REQUEST ID: " + request.getRequestId());
Mono<PremiumResponse> premiumResponse = reactiveWebClient.postClient(uri, request.getPremiumRequest())
.map(json -> PlatformUtils.mapToClass(json, PremiumResponse.class));
premiumResponse.subscribe(resp -> {
resp.getPremiumResults().forEach(result -> {
LOG.info("Key " + result.getKey());
repository.getResultRepoRawType(request.getVertical())
.save(result).subscribe();
saveResult.subscriberContext(ctx -> {
MultiBrokerMongoDBFactory.setDatabaseNameForCurrentThread(brokerConfig.getBroker());
return ctx;
}).subscribe();
});
}, error -> {
LOG.info("[PREMIUM SERVICE] ERROR RECIEVED FOR " + key + " AND REQUEST ID" + request.getRequestId() + " > " + error.getMessage());
});
}
如果将日志放在客户端代码的终点,则无法在该点看到多个请求.
可能是WebClient中的一个错误,其中URI在多线程环境中被交换.
尝试改变WebClient,仍然会交换URI
请帮忙.
Git Repo添加了github.com/praveenk007/ps-demo
解决方法:
加上我的一些观察:
webClient.get()或webClient.post()总是在每次调用URI的调用上返回新的DefaultRequestBodyUriSpec,我认为它看起来不像URI被交换.
class DefaultWebClient implements WebClient {
..
@Override
public RequestHeadersUriSpec<?> get() {
return methodInternal(HttpMethod.GET);
}
@Override
public RequestBodyUriSpec post() {
return methodInternal(HttpMethod.POST);
}
..
@Override
public Mono<ClientResponse> exchange() {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return exchangeFunction.exchange(request).switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
}
private ClientRequest.Builder initRequestBuilder() {
URI uri = (this.uri != null ? this.uri : uriBuilderFactory.expand(""));
return ClientRequest.create(this.httpMethod, uri)
.headers(headers -> headers.addAll(initHeaders()))
.cookies(cookies -> cookies.addAll(initCookies()))
.attributes(attributes -> attributes.putAll(this.attributes));
}
..
}
和methodInternal方法如下所示
@SuppressWarnings("unchecked")
private RequestBodyUriSpec methodInternal(HttpMethod httpMethod) {
return new DefaultRequestBodyUriSpec(httpMethod);
}
此外,在进行实际请求时,还会创建新的ClientRequest.
类来源
标签:java,spring-boot-2,spring-webflux 来源: https://codeday.me/bug/20190710/1424754.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。