# Flux 原理

0..N 个元素经过一系列操作,最终得到处理的结果。每个元素的输入、处理和输出都是线性的。

Flux原理

# 构造 Flux,输入元素阶段

# 元素可枚举方式

介绍 Flux 构造方法前,先描述 Supplier 和 Publisher 的结构,defer 会用到。

package java.util.function;
@FunctionalInterface
public interface Supplier<T> {
    /**
     * 获取一个结果.
     *
     * @return 结果
     */
    T get();
}
package org.reactivestreams;
public interface Publisher<T> {
    /*
     * 可以订阅的发布者,比如:Flux, Mono
     */
    public void subscribe(Subscriber<? super T> s);
}
/*
 * 结构:<T> Flux<T> 方法名 (参数);
 *
 * 以下都省略返回参数,因为都是构造 Flux
 *
 * 使用:Flux. 方法名 (参数);
 * 例如:Flux.empty ();
 */
/* 基本 */
empty();
just(T data);
just(T... data);
/* from */
from(Publisher<? extends T> source);
fromArray(T[] array);
fromIterable(Iterable<? extends T> it);
fromStream(Stream<? extends T> s);
/*
 * 提供一个方法,订阅时,才执行并从其中获取元素
 *
 * 参考上面的 Supplier 和 Publisher
 *
 * supplier 提供一个 Publisher,如:Flux,Mono
 */
defer(Supplier<? extends Publisher<T>> supplier);

# 可编程方式

先描述 Consumer 和 FluxSink

负压、背压(backpressure),即消费者消费元素跟不上生产者的产生速度,因此通过不同策略,处理溢出的元素,帮消费者抗一部分压力。

package java.util.function;
import java.util.Objects;
@FunctionalInterface
public interface Consumer<T> {
    /**
     * 消费一个结果
     *
     * @param t 输入结果
     */
    void accept(T t);
    /**
     *
     */
    default Consumer<T> andThen(Consumer<? super T> after) {
        Objects.requireNonNull(after);
        return (T t) -> { accept(t); after.accept(t); };
    }
}
public interface FluxSink<T> {
	/**
   * 发送结束信号
   */
  void complete();
	/**
	 * 获取目前环境
	 */
	Context currentContext();
  /**
   * 发送错误信号
   */
  void error(Throwable e);
  /**
   * 发布元素信号
   */
  FluxSink<T> next(T t);
	/**
	 * 获取下游未处理的(outstanding)请求数目
	 */
	long requestedFromDownstream();
	/**
	 * 获取下游是否取消队列了
	 */
	boolean isCancelled();
	/**
	 * 请求
	 */
	FluxSink<T> onRequest(LongConsumer consumer);
	/**
	 * 下游取消时的处理回调
	 */
	FluxSink<T> onCancel(Disposable d);
	/**
	 * 接收第一个结束信号时的处理回调:取消、完成、错误信号
	 */
	FluxSink<T> onDispose(Disposable d);
	/**
	 * 负压的处理方式
	 */
	enum OverflowStrategy {
		/**
		 * 不使用负压,不考虑消费者的压力情况
     * 可能抛出 IllegalStateException 错误
		 */
		IGNORE,
		/**
		 * 下游跟不上抛出 IllegalStateException 错误
		 */
		ERROR,
		/**
		 * 丢弃输入元素
		 */
		DROP,
		/**
		 * 只获取上游最新的输入元素
		 */
		LATEST,
		/**
		 * 缓存所有输入元素,没有边界可能导致 OutOfMemoryError
		 */
		BUFFER
	}
}
public interface SynchronousSink<T> {
	/**
	 * 完成信号
	 */
	void complete();
	/**
	 * 当前环境
	 */
	Context currentContext();
	/**
	 * 错误信号
	 */
	void error(Throwable e);
	/**
	 * 发送元素
	 */
	void next(T t);
}
/**
 * create 与 push 一样,负压默认采用缓存方式
 *
 * Consumer 和  FluxSink 参考上面
 *
 * Flux 提供一个 FluxSink,Consumer 处理该 sink
 * next () 提供元素,以及 complete 完成或 error 错误
 */
create(Consumer<? super FluxSink<T>> emitter);
push(Consumer<? super FluxSink<T>> emitter);
/**
 * 支持负压策略
 */
create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure);
push(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure);
/**
 * generate 与前者不一样,使用 SynchronousSink,同步
 * 每次只能请求一次 next ()
 */
generate(Consumer<SynchronousSink<T>> generator);
/**
 * 支持状态:
 * 1. 状态对象创建函数,如:ArrayList::new
 * 2. 输入状态对象(可获取或设置)和同步 sink,
 *    返回状态对象
 */
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator);
/**
 * 支持状态,以及最后处理状态对象
 * 3. 处理状态对象的函数,
*     只有在生成器结束或下游取消后才执行
 */
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer);

# 拼接

concat

// Publisher:如 Flux、Mono
concat(Publisher<? extends T>... sources);
// Publisher 的迭代器
concat(Iterable<? extends Publisher<? extends T>> sources);
// Publisher 的 Publisher,双层嵌套
// 如:Flux<Flux<String>>
concat(Publisher<? extends Publisher<? extends T>> sources);
// 可设置请求预取数量
concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch);
// 所有 sources 消费完成,才将错误抛出;而不是现在 source 就抛出
concatDelayError(Publisher<? extends T>... sources);
concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch);
concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch);

# combineLatest

输入 Publiser 列表或多个 Publiser,比如:List<Flux<Integer>> 。

遍历 List<Flux<Integer>> 列表,获取每个 Flux<Integer> 里的最后一个元素,然后合并。

combinelatest

// array
combineLatest(Function<Object[], V> combinator, Publisher<? extends T>... sources);
combineLatest(Function<Object[], V> combinator, int prefetch, Publisher<? extends T>... sources);
// BiFunction 是二元函数,比 Function 具体一点
combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends V> combinator);
// 还有几个类似的,省略:
//source1 到 source [3 到 6]
combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[], V> combinator);
// iterable
combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], V> combinator);
combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[], V> combinator);

# error 错误

创建一个以错误结束的 Flux

//subscribed 后立即抛出异常,whenRequested = false
error(Throwable error);
// 在第一个请求时,onError 报错,而不是 subscribed
error(Throwable throwable, boolean whenRequested);

# first

// 取数组的第一个
first(Publisher<? extends I>... sources);
// 取列表的第一个
first(Iterable<? extends Publisher<? extends I>> sources);

# interval

间隔一定时间发送元素,元素从 0 递增。

interval

延时

intervald

// 返回 Flux<Long>
interval(Duration period);
interval(Duration delay, Duration period);
// Scheduler 调度器,默认 parallel,固定 workders,单线程
interval(Duration period, Scheduler timer);
interval(Duration delay, Duration period, Scheduler timer);

# merge

按元素被 subscribe 顺序合并

merge

// 数组
merge(Publisher<? extends I>... sources);
merge(int prefetch, Publisher<? extends I>... sources);
// 列表
merge(Iterable<? extends Publisher<? extends I>> sources);
// 嵌套结构,
merge(Publisher<? extends Publisher<? extends T>> source);
merge(Publisher<? extends Publisher<? extends T>> source, int concurrency);
merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch);
// 延迟错误,直到处理完其它积压的元素
mergeDelayError(int prefetch, Publisher<? extends I>... sources);

严格按序列顺序合并

mergesequential

// 数组
mergeSequential(Publisher<? extends I>... sources);
mergeSequential(int prefetch, Publisher<? extends I>... sources);
// 列表
mergeSequential(Iterable<? extends Publisher<? extends I>> sources);
mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch);
// 嵌套
mergeSequential(Publisher<? extends Publisher<? extends T>> sources);
mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch);
// 延迟错误
mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch);
mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources);
mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch);

# never, range

// 不会发送任何数据,错误或完成信号
never();
// [start, start + count)
range(int start, int count);

# switch

先使用最新的 Publiser,如果有新的 Publiser 发布,则使用第二个

switchonnext

// 嵌套结构
// 先使用最新的 Publisher 的数据流
// 如果有新的 Publiser 发布,则使用第二个
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers);
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch);

# using

using

Flux.using(resourceSupplier,
           sourceSupplier,
           resourceCleanup, true)
              .doOnNext(doOnNext)
              .doOnTerminate(doOnTerminate)
              .subscribe();
// 顺序: eager = false
// resourceSupplier -> sourceSupplier ->
//  doOnNext -> doOnTerminate ->
//  resourceCleanup
// 顺序: eager = true
// xxx -> resourceCleanup -> doOnTerminate
using(
Callable<? extends D> resourceSupplier,
Function<? super D, ? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup
);
using(
Callable<? extends D> resourceSupplier,
Function<? super D, ? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup,
boolean eager
);

# zip

// source1 - source[2 - 6]
zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2);
zip(
Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
final BiFunction<? super T1, ? super T2, ? extends O> combinator
);
// 列表
zip(
Iterable<? extends Publisher<?>> sources,
final Function<? super Object[], ? extends O> combinator
);
zip(
Iterable<? extends Publisher<?>> sources,
int prefetch,
final Function<? super Object[], ? extends O> combinator
);
// 数组
zip(
final Function<? super Object[], ? extends O> combinator,
Publisher<? extends I>... sources
);
zip(
final Function<? super Object[], ? extends O> combinator,
int prefetch,
Publisher<? extends I>... sources
);
// 嵌套
zip(
Publisher<? extends Publisher<?>> sources,
final Function<? super TUPLE, ? extends V> combinator
);
更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

Cecil 微信支付

微信支付

Cecil 支付宝

支付宝

Cecil PayPal

PayPal