ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java – Spring Web-Flux中的Backpressure机制

2019-10-03 22:11:15  阅读:668  来源: 互联网

标签:spring-webflux java reactive-programming backpressure


我是Spring Web-Flux的首发.我写了一个控制器如下:

@RestController
public class FirstController 
{
    @GetMapping("/first")
    public Mono<String> getAllTweets() 
    {
        return Mono.just("I am First Mono")
    }
}

我知道其中一个反应性好处是Backpressure,它可以平衡请求或响应率.我想知道如何在Spring Web-Flux中使用背压机制.

解决方法:

WebFlux中的Backpressure

为了理解Backpressure在当前WebFlux框架实现中的工作原理,我们必须回顾一下默认使用的传输层.我们可能还记得,浏览器和服务器之间的正常通信(服务器到服务器通信通常也是一样)是通过TCP连接完成的. WebFlux还使用该传输进行客户端和服务器之间的通信.
然后,为了获得背压控制项的含义,我们必须从Reactive Streams规范的角度来回顾一下背压的含义.

The basic semantics define how the transmission of stream elements is regulated through back-pressure.

因此,从该声明中,我们可以得出结论,在Reactive Streams中,背压是一种通过传输(通知)接收者可以消耗多少元素来调节需求的机制;在这里,我们有一个棘手的问题. TCP具有字节抽象而不是逻辑元素抽象.我们通常所说的背压控制是控制向/从网络发送/接收的逻辑元件的数量.即使TCP有自己的流控制(参见含义here和动画there),这个流控制仍然是字节而不是逻辑元素.

在WebFlux模块的当前实现中,背压由传输流控制来调节,但它不会暴露接收方的实际需求.为了最终看到交互流程,请参见下图:

enter image description here

为简单起见,上图显示了两个微服务之间的通信,其中左侧发送数据流,右侧消耗该流.以下编号列表提供了该图表的简要说明:

>这是WebFlux框架,它正确地将逻辑元素转换为字节并返回并将其传输/接收到TCP(网络).
>这是对作业完成后请求下一个元素的元素进行长时间处理的开始.
>在这里,虽然业务逻辑没有要求,但WebFlux将来自网络的字节排队,而没有他们的确认(业务逻辑没有要求).
>由于TCP流控制的性质,服务A仍然可以向网络发送数据.

正如我们从上图中可以看到的那样,接收者公开的需求与发送者的需求不同(这里需要逻辑元素).这意味着两者的需求是孤立的,仅适用于WebFlux< - >业务逻辑(服务)交互并且暴露较少的服务A< - >的背压.服务B互动.

所有这些意味着背压控制在WebFlux中并不像我们预期的那样公平.

但我仍然想知道如何控制背压

如果我们仍然希望对WebFlux中的背压进行不公平的控制,我们可以在Project Reactor运算符(如limitRate())的支持下执行此操作.以下示例显示了我们如何使用该运算符:

@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {

    return tweetService.process(tweetsFlux.limitRate(10))
                       .then();
}

正如我们在示例中看到的,limitRate()运算符允许定义一次预取的元素数.这意味着即使最终订阅者请求Long.MAX_VALUE元素,limitRate运算符也会将该请求拆分为块,并且不允许一次消耗更多.我们可以使用元素发送过程:

@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {

    return tweetService.retreiveAll()
                       .limitRate(10);
}

上面的示例显示,即使WebFlux一次请求超过10个元素,limitRate()也会限制对预取大小的需求,并防止一次消耗超过指定数量的元素.

另一种选择是实现自己的Subscriber或从Project Reactor扩展BaseSubscriber.例如,以下是我们如何做到这一点的简单例子:

class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {

    int consumed;
    final int limit = 5;

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(limit);
    }

    @Override
    protected void hookOnNext(T value) {
        // do business logic there 

        consumed++;

        if (consumed == limit) {
            consumed = 0;

            request(limit);
        }
    }
}

使用RSocket协议的公平背压

为了通过网络边界实现逻辑元素背压,我们需要一个适当的协议.幸运的是,有一个名为RScoket protocol.RSocket是一个应用程序级协议,允许通过网络边界传输实际需求.
该协议有一个RSocket-Java实现,允许设置RSocket服务器.在服务器到服务器通信的情况下,相同的RSocket-Java库也提供客户端实现.要了解有关如何使用RSocket-Java的更多信息,请参阅以下示例here.
对于浏览器 – 服务器通信,有一个RSocket-JS实现,允许通过WebSocket连接浏览器和服务器之间的流通信.

在RSocket之上的已知框架

现在有一些框架,建立在RSocket协议之上.

变形杆菌

其中一个框架是Proteus项目,它提供了构建在RSocket之上的完整的微服务.此外,Proteus与Spring框架很好地集成,因此现在我们可以实现公平的背压控制(参见示例there)

进一步阅读

> https://www.netifi.com/proteus
> https://medium.com/netifi
> http://scalecube.io/

标签:spring-webflux,java,reactive-programming,backpressure
来源: https://codeday.me/bug/20191003/1851141.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有