# Flux 原理
0..N 个元素经过一系列操作,最终得到处理的结果。每个元素的输入、处理和输出都是线性的。

# 构造 Flux,输入元素阶段
# 元素可枚举方式
介绍 Flux 构造方法前,先描述 Supplier 和 Publisher 的结构,defer 会用到。
package java.util.function; | |
@FunctionalInterface | |
public interface Supplier<T> { | |
/** | |
* 获取一个结果. | |
* | |
* @return 结果 | |
*/ | |
T get(); | |
} |
package org.reactivestreams; | |
public interface Publisher<T> { | |
/* | |
* 可以订阅的发布者,比如:Flux, Mono | |
*/ | |
public void subscribe(Subscriber<? super T> s); | |
} |
/* | |
* 结构:<T> Flux<T> 方法名 (参数); | |
* | |
* 以下都省略返回参数,因为都是构造 Flux | |
* | |
* 使用:Flux. 方法名 (参数); | |
* 例如:Flux.empty (); | |
*/ | |
/* 基本 */ | |
empty(); | |
just(T data); | |
just(T... data); | |
/* from */ | |
from(Publisher<? extends T> source); | |
fromArray(T[] array); | |
fromIterable(Iterable<? extends T> it); | |
fromStream(Stream<? extends T> s); | |
/* | |
* 提供一个方法,订阅时,才执行并从其中获取元素 | |
* | |
* 参考上面的 Supplier 和 Publisher | |
* | |
* supplier 提供一个 Publisher,如:Flux,Mono | |
*/ | |
defer(Supplier<? extends Publisher<T>> supplier); |
# 可编程方式
先描述 Consumer 和 FluxSink
负压、背压(backpressure),即消费者消费元素跟不上生产者的产生速度,因此通过不同策略,处理溢出的元素,帮消费者抗一部分压力。
package java.util.function; | |
import java.util.Objects; | |
@FunctionalInterface | |
public interface Consumer<T> { | |
/** | |
* 消费一个结果 | |
* | |
* @param t 输入结果 | |
*/ | |
void accept(T t); | |
/** | |
* | |
*/ | |
default Consumer<T> andThen(Consumer<? super T> after) { | |
Objects.requireNonNull(after); | |
return (T t) -> { accept(t); after.accept(t); }; | |
} | |
} |
public interface FluxSink<T> { | |
/** | |
* 发送结束信号 | |
*/ | |
void complete(); | |
/** | |
* 获取目前环境 | |
*/ | |
Context currentContext(); | |
/** | |
* 发送错误信号 | |
*/ | |
void error(Throwable e); | |
/** | |
* 发布元素信号 | |
*/ | |
FluxSink<T> next(T t); | |
/** | |
* 获取下游未处理的(outstanding)请求数目 | |
*/ | |
long requestedFromDownstream(); | |
/** | |
* 获取下游是否取消队列了 | |
*/ | |
boolean isCancelled(); | |
/** | |
* 请求 | |
*/ | |
FluxSink<T> onRequest(LongConsumer consumer); | |
/** | |
* 下游取消时的处理回调 | |
*/ | |
FluxSink<T> onCancel(Disposable d); | |
/** | |
* 接收第一个结束信号时的处理回调:取消、完成、错误信号 | |
*/ | |
FluxSink<T> onDispose(Disposable d); | |
/** | |
* 负压的处理方式 | |
*/ | |
enum OverflowStrategy { | |
/** | |
* 不使用负压,不考虑消费者的压力情况 | |
* 可能抛出 IllegalStateException 错误 | |
*/ | |
IGNORE, | |
/** | |
* 下游跟不上抛出 IllegalStateException 错误 | |
*/ | |
ERROR, | |
/** | |
* 丢弃输入元素 | |
*/ | |
DROP, | |
/** | |
* 只获取上游最新的输入元素 | |
*/ | |
LATEST, | |
/** | |
* 缓存所有输入元素,没有边界可能导致 OutOfMemoryError | |
*/ | |
BUFFER | |
} | |
} |
public interface SynchronousSink<T> { | |
/** | |
* 完成信号 | |
*/ | |
void complete(); | |
/** | |
* 当前环境 | |
*/ | |
Context currentContext(); | |
/** | |
* 错误信号 | |
*/ | |
void error(Throwable e); | |
/** | |
* 发送元素 | |
*/ | |
void next(T t); | |
} |
/** | |
* create 与 push 一样,负压默认采用缓存方式 | |
* | |
* Consumer 和 FluxSink 参考上面 | |
* | |
* Flux 提供一个 FluxSink,Consumer 处理该 sink | |
* next () 提供元素,以及 complete 完成或 error 错误 | |
*/ | |
create(Consumer<? super FluxSink<T>> emitter); | |
push(Consumer<? super FluxSink<T>> emitter); | |
/** | |
* 支持负压策略 | |
*/ | |
create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure); | |
push(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure); | |
/** | |
* generate 与前者不一样,使用 SynchronousSink,同步 | |
* 每次只能请求一次 next () | |
*/ | |
generate(Consumer<SynchronousSink<T>> generator); | |
/** | |
* 支持状态: | |
* 1. 状态对象创建函数,如:ArrayList::new | |
* 2. 输入状态对象(可获取或设置)和同步 sink, | |
* 返回状态对象 | |
*/ | |
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator); | |
/** | |
* 支持状态,以及最后处理状态对象 | |
* 3. 处理状态对象的函数, | |
* 只有在生成器结束或下游取消后才执行 | |
*/ | |
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer); |
# 拼接

// Publisher:如 Flux、Mono | |
concat(Publisher<? extends T>... sources); | |
// Publisher 的迭代器 | |
concat(Iterable<? extends Publisher<? extends T>> sources); | |
// Publisher 的 Publisher,双层嵌套 | |
// 如:Flux<Flux<String>> | |
concat(Publisher<? extends Publisher<? extends T>> sources); | |
// 可设置请求预取数量 | |
concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch); | |
// 所有 sources 消费完成,才将错误抛出;而不是现在 source 就抛出 | |
concatDelayError(Publisher<? extends T>... sources); | |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources); | |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch); | |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch); |
# combineLatest
输入 Publiser 列表或多个 Publiser,比如:List<Flux<Integer>> 。
遍历 List<Flux<Integer>> 列表,获取每个 Flux<Integer> 里的最后一个元素,然后合并。

// array | |
combineLatest(Function<Object[], V> combinator, Publisher<? extends T>... sources); | |
combineLatest(Function<Object[], V> combinator, int prefetch, Publisher<? extends T>... sources); | |
// BiFunction 是二元函数,比 Function 具体一点 | |
combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends V> combinator); | |
// 还有几个类似的,省略: | |
//source1 到 source [3 到 6] | |
combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[], V> combinator); | |
// iterable | |
combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], V> combinator); | |
combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[], V> combinator); |
# error 错误
创建一个以错误结束的 Flux
//subscribed 后立即抛出异常,whenRequested = false | |
error(Throwable error); | |
// 在第一个请求时,onError 报错,而不是 subscribed | |
error(Throwable throwable, boolean whenRequested); |
# first
// 取数组的第一个 | |
first(Publisher<? extends I>... sources); | |
// 取列表的第一个 | |
first(Iterable<? extends Publisher<? extends I>> sources); |
# interval
间隔一定时间发送元素,元素从 0 递增。

延时

// 返回 Flux<Long> | |
interval(Duration period); | |
interval(Duration delay, Duration period); | |
// Scheduler 调度器,默认 parallel,固定 workders,单线程 | |
interval(Duration period, Scheduler timer); | |
interval(Duration delay, Duration period, Scheduler timer); |
# merge
按元素被 subscribe 顺序合并

// 数组 | |
merge(Publisher<? extends I>... sources); | |
merge(int prefetch, Publisher<? extends I>... sources); | |
// 列表 | |
merge(Iterable<? extends Publisher<? extends I>> sources); | |
// 嵌套结构, | |
merge(Publisher<? extends Publisher<? extends T>> source); | |
merge(Publisher<? extends Publisher<? extends T>> source, int concurrency); | |
merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch); | |
// 延迟错误,直到处理完其它积压的元素 | |
mergeDelayError(int prefetch, Publisher<? extends I>... sources); |
严格按序列顺序合并

// 数组 | |
mergeSequential(Publisher<? extends I>... sources); | |
mergeSequential(int prefetch, Publisher<? extends I>... sources); | |
// 列表 | |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources); | |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch); | |
// 嵌套 | |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources); | |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch); | |
// 延迟错误 | |
mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch); | |
mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources); | |
mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch); |
# never, range
// 不会发送任何数据,错误或完成信号 | |
never(); | |
// [start, start + count) | |
range(int start, int count); |
# switch
先使用最新的 Publiser,如果有新的 Publiser 发布,则使用第二个

// 嵌套结构 | |
// 先使用最新的 Publisher 的数据流 | |
// 如果有新的 Publiser 发布,则使用第二个 | |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers); | |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch); |
# using

Flux.using(resourceSupplier, | |
sourceSupplier, | |
resourceCleanup, true) | |
.doOnNext(doOnNext) | |
.doOnTerminate(doOnTerminate) | |
.subscribe(); | |
// 顺序: eager = false | |
// resourceSupplier -> sourceSupplier -> | |
// doOnNext -> doOnTerminate -> | |
// resourceCleanup | |
// 顺序: eager = true | |
// xxx -> resourceCleanup -> doOnTerminate | |
using( | |
Callable<? extends D> resourceSupplier, | |
Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, | |
Consumer<? super D> resourceCleanup | |
); | |
using( | |
Callable<? extends D> resourceSupplier, | |
Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, | |
Consumer<? super D> resourceCleanup, | |
boolean eager | |
); |
# zip
// source1 - source[2 - 6] | |
zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2); | |
zip( | |
Publisher<? extends T1> source1, | |
Publisher<? extends T2> source2, | |
final BiFunction<? super T1, ? super T2, ? extends O> combinator | |
); | |
// 列表 | |
zip( | |
Iterable<? extends Publisher<?>> sources, | |
final Function<? super Object[], ? extends O> combinator | |
); | |
zip( | |
Iterable<? extends Publisher<?>> sources, | |
int prefetch, | |
final Function<? super Object[], ? extends O> combinator | |
); | |
// 数组 | |
zip( | |
final Function<? super Object[], ? extends O> combinator, | |
Publisher<? extends I>... sources | |
); | |
zip( | |
final Function<? super Object[], ? extends O> combinator, | |
int prefetch, | |
Publisher<? extends I>... sources | |
); | |
// 嵌套 | |
zip( | |
Publisher<? extends Publisher<?>> sources, | |
final Function<? super TUPLE, ? extends V> combinator | |
); |