ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Reactor响应式快速上手-2 (学习笔记 2021.11.5)

2021-11-10 20:59:43  阅读:588  来源: 互联网

标签:2021.11 Reactor 一个 Mono 元素 Flux 线程 上手 序列


Reactor响应式快速上手-2 (学习笔记 2021.11.5)

前言: (中文文档)

Reactor Core 运行于 Java 8 及以上版本。

Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFutureStream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。

Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。

引入依赖

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> 
        
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>Bismuth-RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
// 步骤: 
1 依赖 Core 库
2 没有 version 标签
3 reactor-test 提供了对 reactive streams 的单测

Reactor 核心特性

Reactor 项目的主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。

Reactor 引入了实现 Publisher 的响应式类 FluxMono,以及丰富的操作方式。 一个 Flux 对象代表一个包含 0…N 个元素的响应式序列 (类似集合),而一个 Mono 对象代表一个包含 零/一个(0…1)元素的结果 (单个或者没有)。

这种区别为这俩类型带来了语义上的信息——表明了异步处理逻辑所面对的元素基数。比如, 一个 HTTP 请求产生一个响应,所以对其进行 count 操作是没有多大意义的。表示这样一个 结果的话,应该用 Mono<HttpResponse> 而不是 Flux<HttpResponse>,因为要置于其上的 操作通常只用于处理 0/1 个元素。

有些操作可以改变基数,从而需要切换类型。比如,count 操作用于 Flux,但是操作 返回的结果是 Mono<Long>

Flux, 包含 0-N 个元素的异步序列

IKcM2F.md.png

Flux<T> 是一个能够发出 0 到 N 个元素的标准的 Publisher<T>,它会被一个“错误(error)” 或“完成(completion)”信号终止。因此,一个 flux 的可能结果是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的 onNextonCompleteonError方法。

由于多种不同的信号可能性,Flux 可以作为一种通用的响应式类型。注意,所有的信号事件, 包括代表终止的信号事件都是可选的:如果没有 onNext 事件但是有一个 onComplete 事件, 那么发出的就是 空的 有限序列,但是去掉 onComplete 那么得到的就是一个 无限的 空序列。 当然,无限序列也可以不是空序列,比如,Flux.interval(Duration) 生成的是一个 Flux<Long>, 这就是一个无限地周期性发出规律 tick 的时钟序列。

Mono, 异步的 0-1 结果

IKcLGT.md.png

Mono<T> 是一种特殊的 Publisher<T>, 它最多发出一个元素,然后终止于一个 onComplete 信号或一个 onError 信号。

它只适用其中一部分可用于 Flux 的操作。比如,(两个 Mono 的)结合类操作可以忽略其中之一 而发出另一个 Mono,也可以将两个都发出,对于后一种情况会切换为一个 Flux

例如,Mono#concatWith(Publisher) 返回一个 Flux,而 Mono#then(Mono) 返回另一个 Mono

注意,Mono 可以用于表示“空”的只有完成概念的异步处理(比如 Runnable)。这种用 Mono<Void> 来创建。

简单的创建和订阅 Flux 或 Mono 的方法

最简单的上手 FluxMono 的方式就是使用相应类提供的多种工厂方法之一。

比如,如果要创建一个 String 的序列,你可以直接列举它们,或者将它们放到一个集合里然后用来创建 Flux,如下:

创建Flux

void contextLoads() {
        // 通过可变参数构建出一个 0-N个的Flux
        Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
        List<String> iterable = Arrays.asList("foo", "bar", "foobar");
        // 通过数组构建出一个 0-N个的Flux
        Flux<String> seq2 = Flux.fromIterable(iterable);
    	// 创建一个从5到7的Flux<Integer> 类似集合[5,6,7]
        Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
    }
//	第一个参数是 range 的开始,第二个参数是要生成的元素个数。

创建Mono

void contextLoads() {
        // 创建一个空的Mono
        Mono<String> noData = Mono.empty();
        // 创建一个 0-1个的带数据Mono
        Mono<String> data = Mono.just("foo");     
    }
//	注意,即使没有值,工厂方法仍然采用通用的返回类型。

订阅(subscribe)

在订阅(subscribe)的时候,FluxMono 使用 Java 8 lambda 表达式。 .subscribe() 方法有多种不同的方法签名,你可以传入各种不同的 lambda 形式的参数来定义回调。如下所示:

void contextLoads() {
    	// 创建一个 0-N个的Flux集
        Flux<Integer> ints = Flux.range(1, 3);
        ints.subscribe(i -> System.out.println(i)); // 订阅并消费
    
        // 创建一个 0-1个的带数据Mono
        Mono<String> data1 = Mono.just("1");
        Mono<Integer> map = data1.map((e) -> Integer.valueOf(e));
        map.subscribe(); // subscribe(); 订阅并触发序列 (触发之前的操作)

        Mono<Integer> data2 = Mono.just(1);
        Mono<String> flatMap = data2.flatMap((e) -> Mono.justOrEmpty("张韶涵"));
        flatMap.subscribe((e)->{
            System.out.println(e);  // subscribe(Consumer<? super T> consumer); 对每一个生成的元素进行消费。
        });

        // subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer) 对正常元素进行消费,也对错误进行响应。
        Mono<String> data3 = Mono.just("abc");
        Mono<Integer> integerMono = data3.flatMap((e) -> Mono.justOrEmpty(Integer.valueOf(e)));
        integerMono.subscribe((e)->{
        }, (throwable)-> System.out.println("3-异常拉"+throwable.getMessage()));

        // subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,
        // Runnable completeConsumer);  对正常元素和错误均有响应,还定义了序列正常完成后的回调
        Mono<String> data4 = Mono.just("4");
        Mono<Integer> integerMono2 = data4.flatMap((e) -> Mono.justOrEmpty(Integer.valueOf(e)));
        integerMono2.subscribe(
                (e)->{System.out.println(e);},
                (throwable)-> System.out.println("4-异常拉"+throwable.getMessage()),
                ()-> System.out.println("4-Mono 流执行正常"));

        // subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,
        // Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
        // 对正常元素、错误和完成信号均有响应, 同时也定义了对该 subscribe 方法返回的 Subscription 执行的回调。
        Mono<String> data5 = Mono.just("5");
        Mono<Integer> integerMono5 = data5.flatMap((e) -> Mono.justOrEmpty(Integer.valueOf(e)));
        integerMono5.subscribe(
                (e)->{System.out.println(e);},
                (throwable)-> System.out.println("5-异常拉"+throwable.getMessage()),
                ()-> System.out.println("5-Mono 流执行正常"),
                (e)->{e.cancel();} // e.request(1) 只订阅一个, e.cancel()取消订阅
        );
        // 以上方法会返回一个 Subscription 的引用,如果不再需要更多元素你可以通过它来取消订阅。
        // 取消订阅时, 源头会停止生成新的数据,并清理相关资源。取消和清理的操作在 Reactor 中是在 接口 Disposable 中定义的。
        System.out.println("结束演示");
    }

错误和完成信号都是终止信号,并且二者只会出现其中之一。为了能够最终全部正常完成,你必须处理错误信号。

用于处理完成信号的 lambda 是一对空的括号,因为它实际上匹配的是 Runnalbe 接口中的 run 方法, 不接受参数。

最后一个 subscribe(CoreSubscriber<? super T> var1) 方法签名包含一个自定义的 CoreSubscriber

package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

// 调整ints.subscribe()的某些行为
public class SampleSubscriber<T> extends BaseSubscriber<T> {

    	// 更换Consumer<? super Subscription> subscriptionConsumer行为
        public void hookOnSubscribe(Subscription subscription) {
                System.out.println("Subscribed");
                request(1);
        }
		
    	// 更换Consumer<? super T> consume
        public void hookOnNext(T value) {
                System.out.println(value);
                request(1);
        }
}
// ---------------------------------------------
void contextLoads() {

        SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
        Flux<Integer> ints = Flux.range(1, 4);
        ints.subscribe(i -> System.out.println("$"+i),
                error -> System.err.println("Error " + error),
                () -> {System.out.println("Done");},
                s -> ss.request(10));

        ints.subscribe(ss);
        System.out.println("结束演示");
    }
// -------------
Subscribed
1
2
3
4

上面这个例子中,我们把一个自定义的 Subscriber 作为 subscribe 方法的最后一个参数。 下边的例子是这个自定义的 Subscriber,这是一个对 Subscriber 的最简单实现:

SampleSubscriber 类继承自 BaseSubscriber,在 Reactor 中, 推荐用户扩展它来实现自定义的 Subscriber。这个类提供了一些 hook 方法,我们可以通过重写它们来调整 subscriber 的行为。 默认情况下,它会触发一个无限个数的请求,但是当你想自定义请求元素的个数的时候,扩展 BaseSubscriber 就很方便了。

扩展的时候通常至少要覆盖 hookOnSubscribe(Subscription subscription)hookOnNext(T value) 这两个方法。这个例子中, hookOnSubscribe 方法打印一段话到标准输出,然后进行第一次请求。 然后 hookOnNext 同样进行了打印,同时逐个处理剩余请求。

建议你同时重写 hookOnErrorhookOnCancel,以及 hookOnComplete 方法。 你最好也重写 hookFinally 方法。SampleSubscribe 确实是一个最简单的实现了 请求有限个数元素的 Subscriber

响应式流规范定义了另一个 subscribe(Subscriber<? super T> subscriber); 方法的签名,它只接收一个自定义的 Subscriber, 没有其他的参数

如果你已经有一个 Subscriber,那么这个方法签名还是挺有用的。况且,你可能还会用到它 来做一些订阅相关(subscription-related)的回调。比如,你想要自定义“背压(backpressure)” 并且自己来触发请求。

在这种情况下,使用 BaseSubscriber 抽象类就很方便,因为它提供了很好的配置“背压” 的方法。

使用 BaseSubscriber 来配置“背压”

void contextLoads() {
        Flux<String> source = Flux.just("1","a","c");
        source.map(String::toUpperCase)
                .subscribe(new BaseSubscriber<String>() { // BaseSubscriber 是一个抽象类,所以我们创建一个匿名内部类。
                    // BaseSubscriber 定义了多种用于处理不同信号的 hook。
                    // 它还定义了一些捕获 Subscription 对象的现成方法,这些方法可以用在 hook 中。
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        // request(n) 就是这样一个方法。它能够在任何 hook 中,
                        // 通过 subscription 向上游传递 背压请求。这里我们在开始这个流的时候请求1个元素值。
                        request(1);
                    }

                    @Override
                    protected void hookOnNext(String value) {
                        System.out.println(value);
                        // 随着接收到新的值,我们继续以每次请求一个元素的节奏从源头请求值。
                        request(1);
                    }

                    // 其他 hooks 有 hookOnComplete, hookOnError, hookOnCancel, and hookFinally
                    // (它会在流终止的时候被调用,传入一个 SignalType 作为参数)。
                });
        System.out.println("结束演示");
    }

当你修改请求操作的时候,你必须注意让 subscriber 向上提出足够的需求, 否则上游的 Flux 可能会被“卡住”。所以 BaseSubscriber 在进行扩展的时候要覆盖 hookOnSubscribeonNext,这样你至少会调用 request 一次。

BaseSubscriber 还提供了 requestUnbounded() 方法来切换到“无限”模式(等同于 request(Long.MAX_VALUE)

可编程式地创建一个序列(类似循环)

介绍如何通过定义相对应的事件(onNextonErroronComplete) 创建一个 FluxMono。所有这些方法都通过 API 来触发我们叫做 sink(池) 的事件。 sink 的类型不多,我们快速过一下。

Generate方法创建Flux序列

最简单的创建 Flux 的方式就是使用 generate 方法。

这是一种 同步地逐个地 产生值的方法,意味着 sink 是一个 SynchronousSink 而且其 next() 方法在每次回调的时候最多只能被调用一次。你也可以调用 error(Throwable) 或者 complete(),不过是可选的。

最有用的一种方式就是同时能够记录一个状态值(state),从而在使用 sink 发出下一个元素的时候能够 基于这个状态值去产生元素。此时生成器(generator)方法就是一个 BiFunction<S, SynchronousSink<T>, S>, 其中 <S> 是状态对象的类型。你需要提供一个 Supplier<S> 来初始化状态值,而生成器需要 在每一“回合”生成元素后返回新的状态值(供下一回合使用)。

void contextLoads() {
        // 例如我们使用一个 int 作为状态值。
        Integer state = 88;
        Flux<String> flux = Flux.generate(
                () -> {
                    System.out.println("初始化状态值"+state+"了, 只调用一次");
                    return state; // 1- 初始化状态值(state)为88
                },
                (state, sink) -> {
                    // 2- 我们基于状态值 state 来生成下一个值(state + 12)
                    state = 12+state;
                    sink.next(state+"Flux循环结束返回值, 不断的+12, 最终结果= " + state);
                    System.out.println("每次增加后的state="+state);
                    if (state > 130) {
                        // 4- 我们也可以用状态值来决定什么时候终止序列 或者sink.error();
                        System.out.println("结束序列(循环)");
                        sink.complete();
                    }
                    // 3- 返回一个状态值 state,用于下一次调用
                    return state;
                });
        System.out.println(flux.blockLast()); // 获取最终序列返回的next中结果
    }
// ---------------结果--------------------
初始化状态值88了, 只调用一次
每次增加后的state=100
每次增加后的state=112
每次增加后的state=124
每次增加后的state=136
结束序列(循环)
136Flux循环结束返回值, 不断的+12, 最终结果= 136

我们也可以使用可变(mutable)类型(译者注:如上例,原生类型及其包装类,以及String等属于不可变类型) 的 <S>。上边的例子也可以用 LongAdder 作为状态值,在每次生成后改变它的值。

void contextLoads() {
        // 例如我们使用对象作为状态值。
        LongAdder longAdder = new LongAdder();
        Flux<String> flux = Flux.generate(
                () -> {
                    longAdder.add(88);
                    System.out.println("初始化状态值"+longAdder.sum()+"了, 只调用一次");
                    return longAdder;
                },
                (state, sink) -> {
                    state.add(12);
                    sink.next(state+"Flux循环结束返回值, 不断的+12, 最终结果= " + state.sum());
                    System.out.println("每次增加后的state="+state);
                    if (state.sum() > 130) {
                        System.out.println("结束序列(循环)");
                        sink.complete();
                    }
                    return state;
                });
        flux.subscribe(); // 开始订阅消费
        System.out.println(longAdder); // 获取最终序列对象结果
    }
// ---------------结果--------------------
初始化状态值88了, 只调用一次
每次增加后的state=100
每次增加后的state=112
每次增加后的state=124
每次增加后的state=136
结束序列(循环)
136

如果状态对象需要清理资源(或者是序列(循环)结束回调一次),可以使用 generate(Supplier<S>, BiFunction, Consumer<S>) 这个签名方法来清理状态对象(译者注:Comsumer 在序列终止才被调用)。

void contextLoads() {
        // 例如我们使用一个 int 作为状态值。
        Integer state = 88;
        Flux<String> flux = Flux.generate(
                () -> {
                    System.out.println("初始化状态值"+state+"了, 只调用一次");
                    return state;
                },
                (state, sink) -> {
                    state = 12+state;
                    sink.next(state+"Flux循环结束返回值, 不断的+12, 最终结果= " + state);
                    System.out.println("每次增加后的state="+state);
                    if (state > 100) {
                        System.out.println("结束序列(循环)");
                        sink.complete();
                    }
                    return state;
                },
                (e)->{
                    // 看到最后一个状态值(112)会被这个 Consumer lambda 输出。
                    System.out.println("序列(循环)终止调用了"+e);
                });
        System.out.println(flux.blockLast());
    }
// ---------------结果--------------------
初始化状态值88了, 只调用一次
每次增加后的state=100
每次增加后的state=112
结束序列(循环)
序列(循环)终止调用了112
112Flux循环结束返回值, 不断的+12, 最终结果= 112

如果 state 使用了数据库连接或者其他需要最终进行清理的资源,这个 Consumer lambda 可以用来在最后关闭连接或完成相关的其他清理任务。

Create方法创建Flux序列

作为一个更高级的创建 Flux 的方式, create 方法的生成方式既可以是同步, 也可以是异步的,并且还可以每次发出多个元素。

该方法用到了FluxSink ,后者同样提供 nexterrorcomplete 等方法。 与 generate 不同的是,create 不需要状态值,另一方面,它可以在回调中触发 多个事件(即使是在未来的某个时间)。

create 有个好处就是可以将现有的 API 转为响应式,比如监听器的异步方法。

假设你有一个监听器 API,它按 chunk 处理数据,有两种事件:(1)一个 chunk 数据准备好的事件;(2)处理结束的事件。如下:

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}

// 你可以使用 create 方法将其转化为响应式类型 Flux<T>:
Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register(  // 4- 所有这些都是在 myEventProcessor 执行时异步执行的。
      new MyEventListener<String>() { // 1 -桥接 MyEventListener。

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s);  // 2 -每一个 chunk 的数据转化为 Flux 中的一个元素。
          }
        }

        public void processComplete() {
            sink.complete();  // processComplete 事件转换为 onComplete。
        }
    });
});

此外,既然 create 可以是异步地,并且能够控制背压,你可以通过提供一个 OverflowStrategy 来定义背压行为。

  • IGNORE: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException
  • ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号。
  • DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
  • LATEST:让下游只得到上游最新的元素。
  • BUFFER:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError)。

Mono 也有一个用于 create 的生成器(generator)—— MonoSink,它不能生成多个元素, 因此会抛弃第一个元素之后的所有元素。

推送(push)模式

create 的一个变体是 push,适合生成事件流。与 create类似,push 也可以是异步地, 并且能够使用以上各种溢出策略(overflow strategies)管理背压。每次只有一个生成线程可以调用 nextcompleteerror

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { // 1- 桥接 SingleThreadEventListener API。

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s);  // 2 - 在监听器所在线程中,事件通过调用 next 被推送到 sink。
          }
        }

        public void processComplete() {
            sink.complete();  // 3- complete 事件也在同一个线程中。
        }

        public void processError(Throwable e) {
            sink.error(e);  // 4 - error 事件也在同一个线程中。
        }
    });
});
推送/拉取(push/pull)混合模式

不像 pushcreate 可以用于 pushpull 模式,因此适合桥接监听器的 的 API,因为事件消息会随时异步地到来。回调方法 onRequest 可以被注册到 FluxSink 以便跟踪请求。这个回调可以被用于从源头请求更多数据,或者通过在下游请求到来 的时候传递数据给 sink 以实现背压管理。这是一种推送/拉取混合的模式, 因为下游可以从上游拉取已经就绪的数据,上游也可以在数据就绪的时候将其推送到下游。

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            sink.next(s);  // 3 后续异步到达的 message 也会被发送给 sink。
          }
        }
    });
    sink.onRequest(n -> {
        List<String> messages = myMessageProcessor.request(n);  // 1- 当有请求的时候取出一个 message。
        for(String s : message) {
           sink.next(s);  // 2 -  如果有就绪的 message,就发送到 sink。
        }
    });
清理(Cleaning up)

onDisposeonCancel 这两个回调用于在被取消和终止后进行清理工作。 onDispose 可用于在 Flux 完成,有错误出现或被取消的时候执行清理。 onCancel 只用于针对“取消”信号执行相关操作,会先于 onDispose 执行。

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel())  // 	1= onCancel 在取消时被调用。
        .onDispose(() -> channel.close())  //   2= onDispose 在有完成、错误和取消时被调用。
    });

Handle方法

handle 方法有些不同,它在 MonoFlux 中都有。然而,它是一个实例方法 (instance method),意思就是它要链接在一个现有的源后使用(与其他操作符一样)。

它与 generate 比较类似,因为它也使用 SynchronousSink,并且只允许元素逐个发出。 然而,handle 可被用于基于现有数据源中的元素生成任意值,有可能还会跳过一些元素。 这样,可以把它当做 mapfilter 的组合。handle 方法签名handle(BiConsumer<T, SynchronousSink<R>>)

举个例子,响应式流规范允许 null 这样的值出现在序列中。假如你想执行一个类似 map 的操作,你想利用一个现有的具有映射功能的方法,但是它会返回 null,这时候怎么办呢?

例如,下边的方法可以用于 Integer 序列,映射为字母或 null 。

public String alphabet(int letterNumber) {
        if (letterNumber < 1 || letterNumber > 26) {
                return null;
        }
        int letterIndexAscii = 'A' + letterNumber - 1;
        return "" + (char) letterIndexAscii;
}

// 可以使用 handle 来去掉其中的 null。

void contextLoads() throws Exception {
        Flux<Integer> just = Flux.just(1, 2, 3, 4);

        Mono<List<String>> listMono = just.handle((Integer integer, SynchronousSink<String> synchronousSink) -> {
            String string = this.toString(integer); // 1- 映射到字母。
            if (Objects.nonNull(string)){ // 2- 如果返回的是 null 
                // 值是null会报错,  这里可以转换成为任意对象返回给流的下一步类似map方法
                synchronousSink.next(string); // 3- 就不会调用 sink.next, 不会封装进序列流 从而过滤掉
            }
             // 与map方法不同的是, 这里可以进行操作终结流, 或者抛出错误
            if (string.equals("2")){
                synchronousSink.complete();
                //synchronousSink.error(new RuntimeException("异常终结Flux序列流"));
            }
        }).collectList();

        List<String> block = listMono.block();
        System.out.println(block);
    }
// -------------------
[1, 2]

调度器(Schedulers)

Reactor, 就像 RxJava,也可以被认为是 并发无关(concurrency agnostic) 的。意思就是, 它并不强制要求任何并发模型。更进一步,它将选择权交给开发者。不过,它还是提供了一些方便 进行并发执行的库。

在 Reactor 中, 执行模式以及执行过程取决于所使用的 SchedulerScheduler 是一个拥有广泛实现类的抽象接口。 Schedulers 类提供的静态方法用于达成如下的执行环境:

  • 当前线程(Schedulers.immediate()
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器(Scheduler)被废弃。如果你想使用专一的线程,就对每一个调用使用 Schedulers.newSingle()
  • 弹性线程池(Schedulers.elastic()。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。 Schedulers.elastic() 能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源,见 如何包装一个同步阻塞的调用?
  • 固定大小线程池(Schedulers.parallel())。所创建线程池的大小与 CPU 个数等同。

此外,你还可以使用 Schedulers.fromExecutorService(ExecutorService) 基于现有的 ExecutorService 创建 Scheduler。(虽然不太建议,不过你也可以使用 Executor 来创建)。你也可以使用 newXXX 方法来创建不同的调度器。比如 Schedulers.newElastic(yourScheduleName) 创建一个新的名为 yourScheduleName 的弹性调度器。

操作符基于非阻塞算法实现,从而可以利用到某些调度器的工作窃取(work stealing) 特性的好处。

一些操作符默认会使用一个指定的调度器(通常也允许开发者调整为其他调度器)例如, 通过工厂方法 Flux.interval(Duration.ofMillis(300)) 生成的每 300ms 打点一次的 Flux<Long>, 默认情况下使用的是 Schedulers.parallel(),下边的代码演示了如何将其装换为 Schedulers.single()

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOnsubscribeOn。 它们都接受一个 Scheduler 作为参数,从而可以改变调度器。但是 publishOn 在链中出现的位置 是有讲究的,而 subscribeOn 则无所谓。要理解它们的不同,你首先要理解 nothing happens until you subscribe()

在 Reactor 中,当你在操作链上添加操作符的时候,你可以根据需要在 FluxMono 的实现中包装其他的 FluxMono。一旦你订阅(subscribe)了它,一个 Subscriber 的链 就被创建了,一直向上到第一个 publisher 。这些对开发者是不可见的,开发者所能看到的是最外一层的 Flux (或 Mono)和 Subscription,但是具体的任务是在中间这些跟操作符相关的 subscriber 上处理的。

基于此,我们仔细研究一下 publishOnsubscribeOn 这两个操作符:

  • publishOn 的用法和处于订阅链(subscriber chain)中的其他操作符一样。它将上游 信号传给下游,同时执行指定的调度器 Scheduler 的某个工作线程上的回调。 它会 改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)。
  • subscribeOn 用于订阅(subscription)过程,作用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。所以,无论你把 subscribeOn 至于操作链的什么位置, 它都会影响到源头的线程执行环境(context)。 但是,它不会影响到后续的 publishOn,后者仍能够切换其后操作符的线程执行环境。

只有操作链中最早的 subscribeOn 调用才算数。

线程模型

FluxMono 不会创建线程。一些操作符,比如 publishOn,会创建线程。同时,作为一种任务共享形式, 这些操作符可能会从其他任务池(work pool)——如果其他任务池是空闲的话——那里“偷”线程。因此, 无论是 FluxMono 还是 Subscriber 都应该精于线程处理。它们依赖这些操作符来管理线程和任务池。

publishOn 强制下一个操作符(很可能包括下一个的下一个…)来运行在一个不同的线程上。 类似的,subscribeOn 强制上一个操作符(很可能包括上一个的上一个…)来运行在一个不同的线程上。 记住,在你订阅(subscribe)前,你只是定义了处理流程,而没有启动发布者。基于此,Reactor 可以使用这些规则来决定如何执行操作链。然后,一旦你订阅了,整个流程就开始工作了。

下边的例子演示了支持任务共享的多线程模型:

void contextLoads() throws Exception {
        Flux.range(1, 5) // 1- 创建一个有 5 个元素的 Flux。
                .publishOn(Schedulers.parallel()) // 2= 创建等同于 CPU 个数的线程(最小为4)。
                .subscribe(e-> // 3= subscribe() 之前什么都不会发生。
                    System.out.println(Thread.currentThread().getName()+"线程名称, 值"+e));
    }

Scheduler.parallel() 创建一个基于单线程 ExecutorService 的固定大小的任务线程池。 因为可能会有一个或两个线程导致问题,它总是至少创建 4 个线程。然后 publishOn 方法便共享了这些任务线程, 当 publishOn 请求元素的时候,会从任一个正在发出元素的线程那里获取元素。这样, 就是进行了任务共享(一种资源共享方式)。 Reactor 还提供了好几种共享资源的方式,请参考 Schedulers

Scheduler.elastic() 也能创建线程,它能够很方便地创建专门的线程(以便跑一些可能会阻塞资源的任务, 比如一个同步服务),请见 如何包装一个同步阻塞的调用?

内部机制保证了这些操作符能够借助自增计数器(incremental counters)和警戒条件(guard conditions) 以线程安全的方式工作。例如,如果我们有四个线程处理一个流(就像上边的例子),每一个请求会让计数器自增, 这样后续的来自不同线程的请求就能拿到正确的元素。

Flux.range(1, 5)
                .publishOn(Schedulers.parallel()) 
                .flatMap((in)->{
                    Integer data = in+88;
                    return Flux.just(data);
                })
                .subscribe(e->
                    System.out.println(Thread.currentThread().getName()+"线程名称, 值"+e));

处理错误

如果想了解有哪些可用于错误处理的操作符,请参考 the relevant operator decision tree。( 或者看扩展)

在响应式流中,错误(error)是终止(terminal)事件。当有错误发生时,它会导致流序列停止, 并且错误信号会沿着操作链条向下传递,直至遇到你定义的 Subscriber 及其 onError 方法。

这样的错误还是应该在应用层面解决的。比如,你可能会将错误信息显示在用户界面,或者通过某个 REST 端点(endpoint)发出。因此,订阅者(subscriber)的 onError 方法是应该定义的。

如果没有定义,onError 会抛出 UnsupportedOperationException。你可以接下来再 检测错误,并通过 Exceptions.isErrorCallbackNotImplemented 方法捕获和处理它。

Reactor 还提供了其他的用于在链中处理错误的方法,即错误处理操作(error-handling operators)。

在你了解错误处理操作符之前, 你必须牢记 响应式流中的任何错误都是一个终止事件。 即使用了错误处理操作符,也不会让源头流序列继续。而是将 onError 信号转化为一个 新的 序列 的开始。换句话说,它代替了被终结的 上游 流序列。

现在我们来逐个看看错误处理的方法。需要的时候我们会同时用到命令式编程风格的 try 代码块来作比较。

“异常处理”方法

你也许熟悉在 try-catch 代码块中处理异常的几种方法。常见的包括如下几种:

  1. 捕获并返回一个静态的缺省值。
  2. 捕获并执行一个异常处理方法。
  3. 捕获并动态计算一个候补值来顶替。
  4. 捕获,并再包装为某一个 业务相关的异常,然后再抛出业务异常。
  5. 捕获,记录错误日志,然后继续抛出。
  6. 使用 finally 来清理资源,或使用 Java 7 引入的 “try-with-resource”。

以上所有这些在 Reactor 都有相应的基于 error-handling 操作符处理方式。

在开始研究这些操作符之前,我们先准备好响应式链(reactive chain)方式和 try-catch 代码块方式(以便对比)。

当订阅的时候,位于链结尾的 onError 回调方法和 catch 块类似,一旦有异常,执行过程会跳入到 catch:

void contextLoads() throws Exception {
        Flux.just(1, 5,null,8)
                .publishOn(Schedulers.parallel())
                .map(String::valueOf) // 1=	执行 map 转换,有可能抛出异常。
                .map(Integer::valueOf) // 2= 如果没问题,执行第二个 map 转换操作。
                .subscribe(e-> // 3= 所有转换成功的值都打印出来。
                    System.out.println(Thread.currentThread().getName()+"线程名称, 值"+e),
                        (error)-> System.out.println("响应流发送异常拉:"+error) // 4= 一旦有错误,序列(sequence)终止,并打印错误信息。
                );
    }
// --------------
parallel-1线程名称, 值1
parallel-1线程名称, 值5
响应流发送异常拉:java.lang.NullPointerException
// 这与 try/catch 代码块是类似的:
try {
   xxxxxxxxx
} catch (Throwable t) {
    System.err.println("CAUGHT " + t); 
}

(捕获并返回一个静态的缺省值)对应的是 onErrorReturn

void contextLoads() throws Exception {
        Mono<Set<String>> mono = Flux.just(1, null, 3)
                .map(String::valueOf)
                .onErrorReturn("RECOVERED")
                .collect(Collectors.toSet());
        System.out.println(mono.block()); // 结果: [1, RECOVERED]
    }
// 还可以通过判断错误信息的内容,来筛选哪些要给出缺省值,哪些仍然让错误继续传递下去
Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");

如果你不只是想要在发生错误的时候给出缺省值,而是希望提供一种更安全的处理数据的方式, 可以使用 onErrorResume。这与第 (2) 条(捕获并执行一个异常处理方法)类似。

假设,你会尝试从一个外部的不稳定服务获取数据,但仍然会在本地缓存一份 可能 有些过期的数据, 因为缓存的读取更加可靠。可以这样来做:

捕获并返回自定义值onErrorResume

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k)) // 1 = 对于每一个 key, 异步地调用一个外部服务。
    .onErrorResume(e -> getFromCache(k)); // 2 = 如果对外部服务的调用失败,则再去缓存中查找该 key。注意,这里无论 e 是什么,都会执行异常处理方法。

就像 onErrorReturnonErrorResume 也有可以用于预先过滤错误内容的方法变体,可以基于异常类或 Predicate 进行过滤。它实际上是用一个 Function 来作为参数,还可以返回一个新的流序列。

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(error -> {  // 1 = 这个函数式允许开发者自行决定如何处理。
        if (error instanceof TimeoutException) // 2 = 如果源超时,使用本地缓存。
            return getFromCache(k);
        else if (error instanceof UnknownKeyException)  // 3 = 如果源找不到对应的 key,创建一个新的实体。
            return registerNewEntry(k, "DEFAULT");
        else
            return Flux.error(error);  // 4 = 否则, 将问题“重新抛出”。
    });

捕获并重新抛出业务异常, one rrorMap

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

记录错误日志, doOnError

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k)) // 对外部服务的调用失败
    .doOnError(e -> { // 记录错误日志
        failureStat.increment();
        log("uh oh, falling back, service failed for key " + k); 
    })
    .onErrorResume(e -> getFromCache(k)) // 然后回调错误处理方

关闭资源

最后一个要与命令式编程对应的对比就是使用 Java 7 “try-with-resources” 或 finally 代码块清理资源。这是第 (6) 条(使用 finally 代码块清理资源或使用 Java 7 引入的 “try-with-resource”)。在 Reactor 中都有对应的方法: usingdoFinally

AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); // 在订阅或执行流序列之后, isDisposed 会置为 true。
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};

Flux<String> flux =
Flux.using(
        () -> disposableInstance,  // 第一个 lambda 生成资源,这里我们返回模拟的(mock) Disposable。
        disposable -> Flux.just(disposable.toString()),  // 第二个 lambda 处理资源,返回一个 Flux<T>。
        Disposable::dispose // 第三个 lambda 在 2) 中的资源 Flux 终止或取消的时候,用于清理资源。
);

另一方面, doFinally 在序列终止(无论是 onCompleteonError还是取消)的时候被执行, 并且能够判断是什么类型的终止事件(完成、错误还是取消?)。

LongAdder statsCancel = new LongAdder(); 

Flux<String> flux =
Flux.just("foo", "bar")
    .doFinally(type -> {
        if (type == SignalType.CANCEL)  // doFinally 用 SignalType 检查了终止信号的类型。
          statsCancel.increment(); 
    })
    .take(1);  // take(1) 能够在发出 1 个元素后取消流。

重试

还有一个用于错误处理的操作符你可能会用到,就是 retry,见文知意,用它可以对出现错误的序列进行重试。

问题是它对于上游 Flux 是基于重订阅(re-subscribing)的方式。这实际上已经一个不同的序列了, 发出错误信号的序列仍然是终止了的。为了验证这一点,我们可以在继续用上边的例子,增加一个 retry(1) 代替 onErrorReturn 来重试一次。

void contextLoads() throws Exception {
        Flux.interval(Duration.ofMillis(250))
                .map(input -> {
                    if (input < 3) return "tick " + input;
                    throw new RuntimeException("boom");
                })
                .elapsed() // elapsed 会关联从当前值与上个值发出的时间间隔(下边输出的内容中的 259/249/251…)。
                .retry(1)
                .subscribe(System.out::println, System.err::println); // 我们还是要看一下 one rror 时的内容。

        Thread.sleep(2100); // 确保我们有足够的时间可以进行 4x2 次 tick。
    }
// ---------------
[274,tick 0]
[248,tick 1]
[251,tick 2]
[252,tick 0] // 异常发生重新订阅一次
[250,tick 1]
[250,tick 2]
java.lang.RuntimeException: boom

可见, retry(1) 不过是再一次从新订阅了原始的 interval,从 tick 0 开始。第二次, 由于异常再次出现,便将异常传递到下游了。

还有一个“高配版”的 retryretryWhen),它使用一个伴随(“companion”) Flux 来判断对某次错误是否要重试。这个伴随 Flux 是由操作符创建的,但是由开发者包装它, 从而实现对重试操作的配置。

这个伴随 Flux 是一个 Flux<Throwable>,它作为 retryWhen 的唯一参数被传递给一个 Function,你可以定义这个 Function 并让它返回一个新的 Publisher<?>。重试的循环 会这样运行:

  1. 每次出现错误,错误信号会发送给伴随 Flux,后者已经被你用 Function 包装。
  2. 如果伴随 Flux 发出元素,就会触发重试。
  3. 如果伴随 Flux 完成(complete),重试循环也会停止,并且原始序列也会 完成(complete)
  4. 如果伴随 Flux 产生一个错误,重试循环停止,原始序列也停止 完成,并且这个错误会导致 原始序列失败并终止。

了解前两个场景的区别是很重要的。如果让伴随 Flux 完成(complete)等于吞掉了错误。如下代码用 retryWhen 模仿了 retry(3) 的效果:

Flux<String> flux = Flux
    .<String>error(new IllegalArgumentException())  // 持续产生错误。
    .doOnError(System.out::println) // 在 retry 之前 的 doOnError 可以让我们看到错误。
    .retryWhen(companion -> companion.take(3)); // 这里,我们认为前 3 个错误是可以重试的(take(3)),再有错误就放弃。

事实上,上边例子最终得到的是一个 空的 Flux,但是却 成功 完成了。反观对同一个 Flux 调用 retry(3) 的话,最终是以最后一个 error 终止 Flux,故而 retryWhen 与之不同。

实现同样的效果需要一些额外的技巧:

    Flux<String> flux = Flux.<String>error(new IllegalArgumentException())
                .retryWhen(companion -> companion
                        .zipWith(Flux.range(1, 4), // 1-使用 zip 和一个“重试个数 + 1”的 range。
                                (error, index) -> { // 2-zip 方法让你可以在对重试次数计数的同时,仍掌握着原始的错误(error)
                                    if (index < 4) return index; // 3-允许三次重试,小于 4 的时候发出一个值。
                                    else throw Exceptions.propagate(error); // 4-为了使序列以错误结束。我们将原始异常在三次重试之后抛出。
                                })
                );
    }

类似的代码也可以被用于实现 exponential backoff and retry 模式 (译者加:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加),参考 FAQ

Processors既是一种特别的发布者(Publisher)又是一种订阅者(Subscriber

那意味着你可以 订阅一个 Processor(通常它们会实现 Flux),也可以调用相关方法来手动 插入数据到序列,或终止序列。

Processor 有多种类型,它们都有特别的语义规则,但是在你研究它们之前,最好问一下 自己如下几个问题:

我是否需要使用 Processor?

多数情况下,你应该进行避免使用 Processor,它们较难正确使用,主要用于一些特殊场景下。

如果你觉得 Processor 适合你的使用场景,请首先看一下是否尝试过以下两种替代方式:

  1. 是否有一个或多个操作符的组合能够满足需求?(见 我需要哪个操作符?
  2. “generator” 操作符是否能解决问题?(通常这些操作符 可以用来桥接非响应式的 API,它们提供了一个“sink”,在生成数据流序列方面, 概念上类似于 Processor

如果看了以上替代方案,你仍然觉得需要一个 Processor,阅读 现有的 Processors 总览 这一节来了解一下不同的实现吧。

使用 Sink 门面对象来线程安全地生成流

比起直接使用 Reactor 的 Processors,更好的方式是通过调用一次 sink() 来得到 ProcessorSink

FluxProcessor 的 sink 是线程安全的“生产者(producer)”,因此能够在应用程序中 多线程并发地生成数据。例如,一个线程安全的序列化(serialized)的 sink 能够通过 UnicastProcessor 创建:

UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<Integer> sink = processor.sink(overflowStrategy);

多个生产者线程可以并发地生成数据到以下的序列化 sink。

sink.next(n);

根据 Processor 及其配置,next 产生的溢出有两种可能的处理方式:

  • 一个无限的 processor 通过丢弃或缓存自行处理溢出。
  • 一个有限的 processor 阻塞在 IGNORE 策略,或将 overflowStrategy 应用于 sink

现有的 Processors 总览

Reactor Core 内置多种 Processor。这些 processor 具有不同的语法,大概分为三类。 下边简要介绍一下这三种 processor:

  • 直接的(direct)DirectProcessorUnicastProcessor):这些 processors 只能通过直接 调用 Sink 的方法来推送数据。
  • 同步的(synchronous)EmitterProcessorReplayProcessor):这些 processors 既可以 直接调用 Sink 方法来推送数据,也可以通过订阅到一个上游的发布者来同步地产生数据。
  • 异步的(asynchronous)WorkQueueProcessorTopicProcessor):这些 processors 可以将从多个上游发布者得到的数据推送下去。由于使用了 RingBuffer 的数据结构来 缓存多个来自上游的数据,因此更加有健壮性。

异步的 processor 在实例化的时候最复杂,因为有许多不同的选项。因此它们暴露出一个 Builder 接口。 而简单的 processors 有静态的工厂方法。

DirectProcessor

DirectProcessor 可以将信号分发给零到多个订阅者(Subscriber)。它是最容易实例化的,使用静态方法 create() 即可。另一方面,它的不足是无法处理背压。所以,当 DirectProcessor 推送的是 N 个元素,而至少有一个订阅者的请求个数少于 N 的时候,就会发出一个 IllegalStateException

一旦 Processor 终止(通常通过调用它的 Sinkerror(Throwable)complete() 方法), 虽然它允许更多的订阅者订阅它,但是会立即向它们重新发送终止信号。

UnicastProcessor

UnicastProcessor 可以使用一个内置的缓存来处理背压。代价就是它最多只能有一个订阅者。

UnicastProcessor 有多种选项,因此提供多种不同的 create 静态方法。例如,它默认是 无限的(unbounded) :如果你在在订阅者还没有请求数据的情况下让它推送数据,它会缓存所有数据。

可以通过提供一个自定义的 Queue 的具体实现传递给 create 工厂方法来改变默认行为。如果给出的队列是 有限的(bounded), 并且缓存已满,而且未收到下游的请求,processor 会拒绝推送数据。

在上边 有限的 例子中,还可以在构造 processor 的时候提供一个回调方法,这个回调方法可以在每一个 被拒绝推送的元素上调用,从而让开发者有机会清理这些元素。

EmitterProcessor

EmitterProcessor 能够向多个订阅者发送数据,并且可以对每一个订阅者进行背压处理。它本身也可以订阅一个 Publisher 并同步获得数据。

最初如果没有订阅者,它仍然允许推送一些数据到缓存,缓存大小由 bufferSize 定义。 之后如果仍然没有订阅者订阅它并消费数据,对 onNext 的调用会阻塞,直到有订阅者接入 (这时只能并发地订阅了)。

因此第一个订阅者会收到最多 bufferSize 个元素。然而之后, processor 不会重新发送(replay) 数据给后续的订阅者。这些后续接入的订阅者只能获取到它们开始订阅 之后 推送的数据。这个内部的 缓存会继续用于背压的目的。

默认情况下,如果所有的订阅者都取消了(基本意味着它们都不再订阅(un-subscribed)了), 它会清空内部缓存,并且不再接受更多的订阅者。这一点可以通过 create 静态工厂方法的 autoCancel 参数来配置。

ReplayProcessor

ReplayProcessor 会缓存直接通过自身的 Sink 推送的元素,以及来自上游发布者的元素, 并且后来的订阅者也会收到重发(replay)的这些元素。

可以通过多种配置方式创建它:

  • 缓存一个元素(cacheLast)。
  • 缓存一定个数的历史元素(create(int)),所有的历史元素(create())。
  • 缓存基于时间窗期间内的元素(createTimeout(Duration))。
  • 缓存基于历史个数和时间窗的元素(createSizeOrTimeout(int, Duration))。
TopicProcessor

TopicProcessor 是一个异步的 processor,它能够重发来自多个上游发布者的元素, 这需要在创建它的时候配置 shared (见 build()share(boolean) 配置)。

注意,如果你企图在并发环境下通过并发的上游 Publisher 调用 TopicProcessoronNextonComplete,或 onError 方法,就必须配置 shared。

否则,并发调用就是非法的,从而 processor 是完全兼容响应式流规范的。

TopicProcessor 能够对多个订阅者发送数据。它通过对每一个订阅者关联一个线程来实现这一点, 这个线程会一直执行直到 processor 发出 onErroronComplete 信号,或关联的订阅者被取消。 最多可以接受的订阅者个数由构造者方法 executor 指定,通过提供一个有限线程数的 ExecutorService 来限制这一个数。

这个 processor 基于一个 RingBuffer 数据结构来存储已发送的数据。每一个订阅者线程 自行管理其相关的数据在 RingBuffer 中的索引。

这个 processor 也有一个 autoCancel 构造器方法:如果设置为 true (默认的),那么当 所有的订阅者取消之后,源 Publisher(s) 也就被取消了。

WorkQueueProcessor

WorkQueueProcessor 也是一个异步的 processor,也能够重发来自多个上游发布者的元素, 同样在创建时需要配置 shared (它多数构造器配置与 TopicProcessor 相同)。

它放松了对响应式流规范的兼容,但是好处就在于相对于 TopicProcessor 来说需要更少的资源。 它仍然基于 RingBuffer,但是不再要求每一个订阅者都关联一个线程,因此相对于 TopicProcessor 来说更具扩展性。

代价在于分发模式有些区别:来自订阅者的请求会汇总在一起,并且这个 processor 每次只对一个 订阅者发送数据,因此需要循环(round-robin)对订阅者发送数据,而不是一次全部发出的模式。

无法保证完全公平的循环分发。

WorkQueueProcessor 多数构造器方法与 TopicProcessor 相同,比如 autoCancelshare, 以及 waitStrategy。下游订阅者的最大数目同样由构造器 executor 配置的 ExecutorService 决定。

你最好注意不要有太多订阅者订阅 WorkQueueProcessor,因为这 会锁住 processor。 如果你需要限制订阅者数量,最好使用一个 ThreadPoolExecutorForkJoinPool。这个 processor 能够检测到(线程池)容量并在订阅者过多时抛出异常。

扩展

标签:2021.11,Reactor,一个,Mono,元素,Flux,线程,上手,序列
来源: https://blog.csdn.net/weixin_44600430/article/details/121256580

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有