rxdart学习笔记

rxdart学习笔记

rxdart是对流的操作,在dart里,有时如果可以用Future实现,会比用流来实现方便,毕竟流需要回调监听
例如使用dio发起多个并发请求:
response = await Future.wait([dio.post("/info"), dio.get("/token")]);

[TOC]

官方rxdart github地址

RxDart为Dart StreamsStreamControllers增加了额外的功能。

Dart提供了一个非常不错的开箱即用的 Streams API,RxDart并没有试图为这个API提供一个替代方案,而是在其上添加了反应性扩展规范的功能

RxDart不提供自己的可观察类作为Dart流的替代品。相反,它提供了许多额外的流类、操作符(流类上的扩展方法)和主题。

如果您熟悉其他语言的可观察性,请参阅 the Rx Observables vs Dart Streams comparison chart,以了解两者之间的显著区别

RxDart通过三种方式向Dart Streams添加功能:

Stream Classes——创建具有特定功能的流,例如将多个流合并在一起。

Extension Methods - 将源流转换为具有不同功能的新流,例如节流或缓冲事件

Subjects - 具有额外能力的流控制器

Stream Classes

Stream Classes提供了创建流的不同方法例如:Stream. fromiterableStream. periodic.RxDart为各种任务提供了额外的流类,比如将流合并在一起!

可以通过两种方式构建RxDart提供的流。下面的例子在功能上是等价的:

1.直接实例化流类。

例如:final mergedStream = MergeStream([myFirstStream, mySecondStream]);

2.使用来自Rx类的静态工厂,这对于发现RxDart提供的流类型非常有用。在内部,这些工厂只是调用相应的流构造函数。

例如:final mergedStream = Rx.merge([myFirstStream, mySecondStream]);

类/静态工厂的列表

ConcatStream / Rx.concat

连接所有指定的流序列,只要前一个流序列成功终止。它通过逐个订阅每个流、发送所有项并在订阅下一个流之前完成此操作

rxdart-concat.png
Rx.concat([
  Stream.value(1),
  Rx.timer(2, Duration(milliseconds: 1000)),
  Stream.value(3)
])
.listen(print); // prints 1, 2, 3

ConcatEagerStream / Rx.concatEager

连接所有指定的流序列,只要前一个流序列成功终止。在concatEager的情况下,不是订阅一个接一个的流,而是立即订阅所有流。然后,在前一个流完成项目发射之后,在正确的时间捕获和发射事件。如果有多个耗时的流,连接后的流完成时,总用时会比 Rx.concat 总用时少。

  Rx.concatEager([
    Stream.value(1),
    Rx.timer(2, Duration(days: 1)),
    Stream.value(3)
  ])
  .listen(print); // prints 1, 2, 3

DeferStream / Rx.defer

等待:延迟工厂将一直等待,直到观察者订阅它,然后它使用给定的工厂函数创建一个流。

在某些情况下,等到最后一分钟(即订阅时)才生成流可以确保该流包含最新的数据。

默认情况下,延迟流是单次订阅。然而,使它们可重用是可能的。

Rx.defer(() => Stream.value(1))
  .listen(print); //prints 1

MergeStream / Rx.merge

合并: 将给定流发出的项扁平化为单个流序列。

Rx.merge([
  Rx.timer(1, Duration(milliseconds: 1000)),
  Stream.value(2),
  Rx.timer(3, Duration(milliseconds: 1000)),
]).listen((value){
  print('value:$value\ttime:${DateTime.now().toIso8601String()}');
});
rxdart-merge.png

NeverStream / Rx.never

从不:返回一个不终止的流序列,可用于表示无限的持续时间。

never操作符具有非常具体和有限的行为。这些对于测试目的很有用,有时也可以用于与其他流结合,或者作为期望其他流作为参数的操作符的参数。

Rx.never().listen((value){
  print('never');
}); // Neither prints nor terminates
print('end');

RaceStream / Rx.race

给定两个或多个源流,仅从其中第一个流发出所有项,以发出一个项或通知。只发出最快执行完的流的值,其他流的就不会发出来了

Rx.race([
  Rx.timer(1, Duration(days: 1)),
  Rx.timer(2, Duration(days: 2)),
  Rx.timer(3, Duration(seconds: 1))
]).listen(print); // prints 3
rxdart-race.png

RepeatStream / Rx.repeat

重复:创建将重新创建并重新侦听源流指定次数的流,直到流成功终止。

如果没有指定count,则无限重复。

RepeatStream((int repeatCount) =>
  Stream.value('repeat index: $repeatCount'), 3)
    .listen((i) => print(i); // Prints 'repeat index: 0, repeat index: 1, repeat index: 2'

RetryStream / Rx.retry

重试:创建将重新创建并重新侦听源流指定次数的流,直到流成功终止。

如果没有指定重试计数,它会无限地重试。如果满足重试计数,但流没有成功终止,将抛出一个RetryError。RetryError将包含导致失败的所有错误和堆栈跟踪。

Rx.retry(() { Stream.value(1); })
    .listen((i) => print(i); // Prints 1

Rx
   .retry(() {
     Stream.value(1).concatWith([Stream.error(Error())]);
   }, 1)
   .listen(print, onError: (e, s) => print(e); // Prints 1, 1, RetryError

RetryWhenStream / Rx.retryWhen

重试:创建一个流,该流将在通知器发出新值时重新创建并重新侦听源流。如果源流发出错误或它完成,则流终止。

如果retryWhenFactory发出错误,将抛出一个RetryError。RetryError将包含导致失败的所有错误和堆栈跟踪。

//Basic Example
RetryWhenStream<int>(
  () => Stream<int>.fromIterable(<int>[1]),
  (dynamic error, StackTrace s) => throw error,
).listen(print); // Prints 1
//Periodic Example
RetryWhenStream<int>(
  () => Stream<int>
      .periodic(const Duration(seconds: 1), (int i) => i)
      .map((int i) => i == 2 ? throw 'exception' : i),
  (dynamic e, StackTrace s) {
    return Rx.timer('random value', const Duration(milliseconds: 200);
  },
).take(4).listen(print); // Prints 0, 1, 0, 1
//Complex Example
//出错时streamFactory出错时,进入retryWhenFactory,
// 如果retryWhenFactory返回是一个普通流,则重新创建
//如果retryWhenFactory返回是一个Stream.error(e),则不重建,全部结束
bool errorHappened = false;
RetryWhenStream(
  () => Stream
      .periodic(const Duration(seconds: 1), (i) => i)
      .map((i) {
        if (i == 3 && !errorHappened) {
          throw 'We can take this. Please restart.';
        } else if (i == 4) {
          throw 'It\'s enough.';
        } else {
          return i;
        }
      }),
  (e, s) {
    errorHappened = true;
    if (e == 'We can take this. Please restart.') {
      return Stream.value('Ok. Here you go!');
    } else {
      return Stream.error(e);
    }
  },
).listen(
  print,
  onError: (e, s) => print(e),
); // Prints 0, 1, 2, 0, 1, 2, 3, RetryError

SequenceEqualStream / Rx.sequenceEqual

对比:确定两个流是否发出相同的项序列。您可以提供一个可选的equals处理程序来确定是否相等。

  Rx.sequenceEqual(
      Stream.fromIterable([1, 2, 3, 4, 5]),
      Stream.fromIterable([1, 2, 3, 4, 5])
  ).listen(print); // prints true
rxdart-sequenceEqual.png

SwitchLatestStream / Rx.switchLatest

将发出流的流(又名“更高阶流”)转换为发出这些流最近发出的项的单个流。

当新流从源流发出并订阅新流时,此流将取消对先前发出的流的订阅。

final switchLatestStream = SwitchLatestStream<String>(
  Stream.fromIterable(<Stream<String>>[
    Rx.timer('A', Duration(seconds: 2)),
    Rx.timer('B', Duration(seconds: 1)),
    Stream.value('C'),
  ]),
);

// Since the first two Streams do not emit data for 1-2 seconds, and the
// 3rd Stream will be emitted before that time, only data from the 3rd
// Stream will be emitted to the listener.
switchLatestStream.listen(print); // prints 'C'

TimerStream / Rx.timer

在一段指定的时间后发出给定的值。

Rx.timer('hi', Duration(minutes: 1))
    .listen((i) => print(i); // print 'hi' after 1 minute

CombineLatestStream (combine2, combine3... combine9) / Rx.combineLatest2...Rx.combineLatest9

每当任何源流序列发出一个项时,使用组合器函数将给定流合并为一个流序列。

在所有流至少发出一个项之前,流不会发出。

Basic Example

这个构造函数接受一个Iterable<Stream<T>>,并在任何值从源流更改时输出一个Stream<Iterable<T>>。这对于动态数量的源流非常有用!

CombineLatestStream.list<String>([
  Stream.fromIterable(['a']),
  Stream.fromIterable(['b']),
  Stream.fromIterable(['C', 'D'])])
.listen(print); //prints ['a', 'b', 'C'], ['a', 'b', 'D']

Example with combiner

如果您希望将值列表合并到您面前的新对象中

CombineLatestStream(
  [
    Stream.fromIterable(['a']),
    Stream.fromIterable(['b']),
    Stream.fromIterable(['C', 'D'])
  ],
  (values) => values.last
)
.listen(print); //prints 'C', 'D'

Example with a specific number of Streams

如果您希望将特定数量的流与每个流的值的适当类型信息组合在一起,请使用combine2 - combine9操作符。

CombineLatestStream.combine2(
  Stream.fromIterable([1]),
  Stream.fromIterable([2, 3]),
  (a, b) => a + b,
)
.listen(print); // prints 3, 4

Available Extensions

BufferExtensions ConcatExtensions

ConnectableStreamExtensions DebounceExtensions DefaultIfEmptyExtension DelayExtension

DistinctUniqueExtension DoExtensions EndWithExtension EndWithManyExtension

ExhaustMapExtension FlatMapExtension GroupByExtension IgnoreElementsExtension

IntervalExtension MapToExtension MaterializeExtension MaxExtension MergeExtension

MinExtension OnErrorExtensions PairwiseExtension SampleExtensions ScanExtension SkipUntilExtension StartWithExtension StartWithManyExtension SwitchIfEmptyExtension SwitchMapExtension TakeUntilExtension

TakeWhileInclusiveExtension ThrottleExtensions TimeIntervalExtension TimeStampExtension

WhereTypeExtension WindowExtensions WithLatestFromExtensions ZipWithExtension

ForkJoinStream (join2, join3... join9) / Rx.forkJoin2...Rx.forkJoin9

当您拥有一组流并且只关心每个流的最终发出值时,最好使用此操作符。一个常见的用例是,如果您希望在页面加载(或其他事件)上发出多个请求,并且只希望在收到所有响应时采取行动。

在这种情况下,它类似于你如何使用future。

请注意,如果提供给forkJoin错误的任何一个内部流,如果您没有在内部流上正确地捕捉到错误,那么您将丢失任何其他可能或已经完成的流的值。

如果您只关心所有内部流的成功完成,那么您可以从外部捕获错误。同样值得注意的是,如果您有一个排放多个项目的流,并且您关心以前的排放,那么forkJoin不是正确的选择。

在这些情况下,您最好使用像combineLatest或zip这样的操作符。

Basic Example

这个构造函数接受一个Iterable<Stream<T>>,并在任何值从源流更改时输出一个Stream<Iterable<T>>。这对于动态数量的源流非常有用!

ForkJoinStream.list<String>([
  Stream.fromIterable(['a']),
  Stream.fromIterable(['b']),
  Stream.fromIterable(['C', 'D'])])
.listen(print); //prints ['a', 'b', 'D']

Example with combiner

如果您希望将值列表合并到您面前的新对象中

CombineLatestStream(
  [
    Stream.fromIterable(['a']),
    Stream.fromIterable(['b']),
    Stream.fromIterable(['C', 'D'])
  ],
  (values) => values.last
)
.listen(print); //prints 'D'

Example with a specific number of Streams

如果您希望将特定数量的流与每个流的值的适当类型信息组合在一起,请使用combine2 - combine9操作符。

ForkJoinStream.combine2(
  Stream.fromIterable([1]),
  Stream.fromIterable([2, 3]),
  (a, b) => a + b,
)
.listen(print); // prints 4

Available Extensions

BufferExtensions ConcatExtensions ConnectableStreamExtensions

DebounceExtensions DefaultIfEmptyExtension DelayExtension

DistinctUniqueExtension DoExtensions EndWithExtension

EndWithManyExtension ExhaustMapExtension FlatMapExtension

GroupByExtension IgnoreElementsExtension IntervalExtension MapToExtension

MaterializeExtension MaxExtension MergeExtension

MinExtension OnErrorExtensions PairwiseExtension

SampleExtensions ScanExtension SkipUntilExtension

StartWithExtension StartWithManyExtension SwitchIfEmptyExtension

SwitchMapExtension TakeUntilExtension TakeWhileInclusiveExtension

ThrottleExtensions TimeIntervalExtension TimeStampExtension WhereTypeExtension

WindowExtensions WithLatestFromExtensions ZipWithExtension

RangeStream / Rx.range

返回一个流,该流发出指定范围内的整数序列。

Rx.range(1, 3).listen((i) => print(i); // Prints 1, 2, 3

Rx.range(3, 1).listen((i) => print(i); // Prints 3, 2, 1

ZipStream (zip2, zip3, zip4, ..., zip9) / Rx.zip...Rx.zip9

当所有的流序列在对应的索引处产生了一个元素时,使用给定的zippers函数将指定的流合并为一个流序列。

它严格按照顺序应用这个函数,因此新流发出的第一项将是应用于流#1发出的第一项和流#2发出的第一项的函数的结果;新ZipStream发出的第二项将是应用于流#1发出的第二项和流#2发出的第二项的函数的结果;等等。它将只发出与源流发出的项目数量相等的项目,而源流发出的项目数量最少。

rxdart-zip.png

Basic Example

ZipStream(
  [
    Stream.fromIterable(['A']),
    Stream.fromIterable(['B']),
    Stream.fromIterable(['C', 'D']),
  ],
  (values) => values.join(),
).listen(print); // prints 'ABC'

Example with a specific number of Streams

如果您希望压缩特定数量的流以及每个流值的适当类型信息,请使用zip2 - zip9操作符

ZipStream.zip2(
  Stream.fromIterable(['A']),
  Stream.fromIterable(['B', 'C']),
  (a, b) => a + b,
)
.listen(print); // prints 'AB'

Interval / Stream.periodic

创建一个流,在给定的持续时间之后发出流中的每个项。

Stream.periodic(Duration(microseconds: 500), (int i) {
  return i;
}).take(10).listen(print);

扩展方法

RxDart提供的扩展方法可以在任何流上使用。它们将源流转换为具有附加功能的新流,例如缓冲或节流事件。

Stream.fromIterable([1, 2, 3])
  .throttleTime(Duration(seconds: 1))
  .listen(print); // prints 3

扩展方法列表

buffer

创建一个流,其中每个项都是包含源序列项的列表。

每当窗口发出一个事件时,就会发出此列表。

Stream.periodic(Duration(milliseconds: 100), (i) => i)
  .buffer(Stream.periodic(Duration(milliseconds: 160), (i) => i))
  .listen(print); // prints [0, 1] [2, 3] [4, 5] ...

bufferCount

通过计数从源流中缓冲一些值,然后释放缓冲区并清除它,并启动一个新的缓冲区each startBufferEvery值。如果没有提供startBufferEvery,则在源的开始处以及在每个缓冲区关闭并发出时立即启动新的缓冲区。

count是发出的缓冲区的最大大小

RangeStream(1, 4)
  .bufferCount(2)
  .listen(print); // prints [1, 2], [3, 4] done!

如果startBufferEvery是2,那么将在源文件的每一个其他值上启动一个新的缓冲区。默认情况下,在源文件的开头处启动一个新的缓冲区。

RangeStream(1, 5)
  .bufferCount(3, 2)
  .listen(print); // prints [1, 2, 3], [3, 4, 5], [5] done!

bufferTest

创建一个流,其中每个项都是包含源序列项的列表,在测试通过时进行批处理。

Stream.periodic(Duration(milliseconds: 100), (int i) => i)
  .bufferTest((i) => i % 2 == 0)
  .listen(print); // prints [0], [1, 2] [3, 4] [5, 6] ...

bufferTime

创建一个流,其中每个项都是一个列表,其中包含源序列中的项,在带有持续时间的时间段内进行采样。

Stream.periodic(Duration(milliseconds: 100), (int i) => i)
  .bufferTime(Duration(milliseconds: 220))
  .listen(print); // prints [0, 1] [2, 3] [4, 5] ...

concatWith

返回一个流,该流从当前流中发出所有项,然后依次从给定流中发出所有项。

TimerStream(1, Duration(seconds: 10))
    .concatWith([Stream.fromIterable([2])])
    .listen(print); // prints 1, 2

debounce

消除抖動

转换流,以便在一个窗口已完成时仅从源序列发出项,而不由源序列发出另一个项。

此窗口是在发出最后一个取消声明事件之后创建的。您可以使用上一个被取消的事件的值来确定下一个窗口的长度。

窗口将一直打开,直到第一个窗口事件发出。

debounce过滤由源流发出的、随后被另一个发出的项快速跟随的项。

Stream.fromIterable([1, 2, 3, 4])
  .debounce((_) => TimerStream(true, Duration(seconds: 1)))
  .listen(print); // prints 4
rxdart-debounce.png

debounceTime

转换流,以便当持续时间定义的时间跨度经过时,仅从源序列发出项,而不会由源序列发出另一个项。

此时间跨度在发出最后一个被取消的事件之后开始。

debounceTime过滤掉源流发出的、紧接其后的另一个发出的项。

Stream.fromIterable([1, 2, 3, 4])
  .debounceTime(Duration(seconds: 1))
  .listen(print); // prints 4
rxdart-debounceTime.png

defaultIfEmpty

从源流发出项,或者如果源流不发出任何东西,则发出单个默认项。

Stream.empty().defaultIfEmpty(10).listen(print); // prints 10

delay

延迟运算符通过在发出源流的每个项之前暂停一个特定的时间增量(您指定的)来修改它的源流。这样做的效果是将流发出的整个项目序列按指定的增量在时间上向前移动。

Stream.fromIterable([1, 2, 3, 4])
  .delay(Duration(seconds: 1))
  .listen(print); // [after one second delay] prints 1, 2, 3, 4 immediately

dematerialize

将物化流中的onData、onDone和onError通知对象转换为正常的onData、onDone和onError事件。

当一个流被物化后,它会发出onData、onDone和onError事件作为通知对象。去物质化只是通过将通知对象转换回正常的事件流来逆转这种情况。

Example

Stream<Notification<int>>
    .fromIterable([Notification.onData(1), Notification.onDone()])
    .dematerialize()
    .listen((i) => print(i)); // Prints 1

Error example

Stream<Notification<int>>
    .fromIterable([Notification.onError(Exception(), null)])
    .dematerialize()
    .listen(null, onError: (e, s) { print(e) }); // Prints Exception

distinctUnique

警告:在其他Rx实现中通常称为distinct。创建一个流,如果之前已经发出数据事件,则跳过该流。

相等性由提供的equals和hashCode方法确定。如果省略了这些,则使用最后提供的数据元素上的'=='操作符和hashCode。

如果返回的流是广播流,则返回的流是广播流。如果广播流被侦听超过一次,每个订阅将分别执行equals和hashCode测试。

rxdart-distinctUnique.png

doOnCancel

在流订阅被取消时调用给定的回调函数。通常在其他实现中称为doOnUnsubscribe或doOnDispose。

final subscription = TimerStream(1, Duration(minutes: 1))
    .doOnCancel(() => print('hi')).listen(print);

subscription.cancel(); // prints 'hi'

doOnData

当流发出项时调用给定的回调函数。在其他实现中,这称为doOnNext。

Stream.fromIterable([1, 2, 3])
  .doOnData(print)
  .listen(null); // prints 1, 2, 3

doOnDone

在流完成发送项时调用给定的回调函数。在其他实现中,这称为doOnComplete(d)。

Stream.fromIterable([1, 2, 3])
  .doOnDone(() => print('all set'))
  .listen(null); // prints 'all set'

doOnEach

当流发出数据、发出错误或发出done时,调用给定的回调函数。回调接收一个通知对象。

通知对象包含事件的类型(OnData、onDone或OnError)以及所发出的项或错误。在onDone的情况下,没有数据作为通知的一部分发出。

Stream.fromIterable([1])
  .doOnEach(print)
  .listen(null);

//Notification{kind: Kind.OnData, value: 1, error: null, stackTrace: null}
//Notification{kind: Kind.OnDone, value: null, error: null, stackTrace: null}

doOnError

当流发出错误时调用给定的回调函数

Stream.error(Exception())
  .doOnError((error, stacktrace) => print('oh no'))
  .listen(null); // prints 'Oh no'
//oh no
//Unhandled exception:

doOnListen

在第一次侦听流时调用给定的回调函数。

Stream.fromIterable([1])
  .doOnListen(() => print('Is someone there?'))
  .listen(null); // prints 'Is someone there?'

doOnPause

在流订阅暂停时调用给定的回调函数。

 final subscription = Stream.fromIterable([1]).doOnPause((resumeSignal)  {
    print('Gimme a minute please resumeSignal:$resumeSignal');
  }).listen(print);

  subscription.pause(); // prints 'Gimme a minute please'

doOnResume

当流订阅恢复接收项时调用给定的回调函数。

final subscription = Stream.fromIterable([1])
  .doOnResume(() => print('Let's do this!'))
  .listen(null);

subscription.pause();
subscription.resume(); 'Let's do this!'

endWith

在关闭之前向源流追加一个值。

Stream.fromIterable([2]).endWith(1).listen(print); // prints 2, 1

endWithMany

在关闭之前,将一系列值作为最终事件追加到源流。

Stream.fromIterable([2]).endWithMany([1, 0]).listen(print); // prints 2, 1, 0

exhaustMap

使用给定映射器将项目从源流转换为流。它忽略源流中的所有项,直到新流完成。

当您有一个嘈杂的源流,并且只希望在之前的异步操作完成后进行响应时,该功能非常有用。

RangeStream(0, 2).interval(Duration(milliseconds: 50))
  .exhaust((i) =>
    TimerStream(i, Duration(milliseconds: 75)))
  .listen(print); // prints 0, 2

flatMap

使用给定的映射器函数将每个发出的项转换为流。将侦听新创建的流,并开始向下游发送项。

每个流发出的项将按照它们到达的顺序向下游发出。换句话说,序列被合并在一起。

RangeStream(4, 1)
  .flatMap((i) => TimerStream(i, Duration(minutes: i))
  .listen(print); // prints 1, 2, 3, 4

flatMapIterable

将每个项转换为流。流必须返回一个Iterable。然后,迭代器中的每个项将逐个发出。

用例:您可能有一个返回项目列表的API,例如流< list >。但是,您可能希望对单个项而不是列表本身进行操作。这就是flatMapIterable的工作。

RangeStream(1, 4)
  .flatMapIterable((i) => Stream.fromIterable([[i]])
  .listen(print); // prints 1, 2, 3, 4

groupBy

分组:GroupBy操作符将发出项的流划分为发出GroupByStream的流,每个流都发出来自原始源流的项的某个子集。

GroupByStream的作用类似于常规流,但添加了一个“key”属性,它从grouper函数接收其类型和值。

具有相同键的所有项都由相同的GroupByStream发出。

interval

间隔:创建一个流,在给定的持续时间之后发出流中的每个项。

mapTo

映射到:每当源流发出一个值时,就在输出流上发出给定的常数值。

Stream.fromIterable([1, 2, 3, 4])
  .mapTo(true)
  .listen(print); // prints true, true, true, true

materialize

将onData、on Done和onError事件转换为Notification对象,这些通知对象将传递到下游的onData侦听器中。

Notification对象包含事件的类型(OnData、onDone或OnError)以及所发出的项或错误。在onDone的情况下,没有数据作为通知的一部分发出。

例如:Stream.fromIterable(1) .materialize() .listen((i) => print(i));//打印onData和onDone通知

Stream<int>.error(Exception())
    .materialize()
    .listen((i) => print(i)); // Prints onError Notification

max

将流转换为Future ,该Future 将使用该流发出的最大项完成。

这类似于在列表中查找最大值,但是这些值是异步的。

final max = await Stream.fromIterable([1, 2, 3]).max();

print(max); // prints 3

mergeWith

将由多个流发出的项组合为单个的项流。这些项是按照它们由其源发出的顺序发出的。

TimerStream(1, Duration(seconds: 10))
    .mergeWith([Stream.fromIterable([2])])
    .listen(print); // prints 2, 1

min

将流转换为使用流发出的最小项完成的Future。

这类似于在列表中查找最小值,但是这些值是异步的!

final min = await Stream.fromIterable([1, 2, 3]).min();

print(min); // prints 1

onErrorResume

拦截错误事件并切换到由所提供的recoveryFn创建的恢复流。

onErrorResume操作符从源流中截取onError通知。它没有将错误传递给任何侦听器,而是将其替换为由recoveryFn创建的另一个项流。

recoveryFn接收发出的错误并返回一个流。您可以在recoveryFn中执行逻辑,根据发出的错误类型返回不同的流

如果您不需要根据发出的错误类型执行逻辑,请考虑使用onErrorResumeNext或onErrorReturn。

 Stream.error(StateError('state error'))
      .onErrorResume((dynamic e) =>
      Stream.fromIterable([e is StateError ? 1 : 0]))
          .listen(print); // prints 0

onErrorResumeNext

拦截错误事件,在这种情况下切换到给定的恢复流

onErrorResumeNext操作符从源流中截取一个onError通知。它不会将错误传递给任何侦听器,而是将其替换为另一个项流。

如果您需要根据发出的错误类型执行逻辑,请考虑使用onErrorResume。

ErrorStream(Exception())
  .onErrorResumeNext(Stream.fromIterable([1, 2, 3]))
  .listen(print); // prints 1, 2, 3

onErrorReturn

指示流在遇到错误时发出特定项,然后正常终止。

onErrorReturn操作符从源流中截取onError通知。它没有将其传递给任何观察者,而是将其替换为给定的项,然后正常终止。

如果您需要根据发出的错误类型执行逻辑,请考虑使用onErrorReturnWith

ErrorStream(Exception())
  .onErrorReturn(1)
  .listen(print); // prints 1

onErrorReturnWith

指示流在遇到错误时发出由returnFn创建的特定项,然后正常终止。

onErrorReturnWith操作符从源流中截取onError通知。它没有将其传递给任何观察者,而是将其替换为给定的项,然后正常终止。

returnFn接收发出的错误并返回一个流。您可以在returnFn中执行逻辑,根据发出的错误类型返回不同的流。

如果您不需要根据所发出的错误类型执行逻辑,请考虑使用onErrorReturn。

ErrorStream(Exception())
  .onErrorReturnWith((e) => e is Exception ? 1 : 0)
  .listen(print); // prints 1

pairwise

将第n和第n-1个事件成对发出。

RangeStream(1, 4)
  .pairwise()
  .listen(print); // prints [1, 2], [2, 3], [3, 4]

sample

发出自sampleStream上次发出以来源流最近发出的项(如果有的话)。

Stream.fromIterable([1, 2, 3])
  .sample(TimerStream(1, Duration(seconds: 1)))
  .listen(print); // prints 3

sampleTime

在循环时间范围内(由持续时间定义)发出源流自上次发出以来最近发出的项(如果有的话)

Stream.fromIterable([1, 2, 3])
  .sampleTime(Duration(seconds: 1))
  .listen(print); // prints 3

scan

对流序列应用累加器函数并返回每个中间结果。可选的种子值用作初始累加器值。

Stream.fromIterable([1, 2, 3])
   .scan((acc, curr, i) => acc + curr, 0)
   .listen(print); // prints 1, 3, 6

skipUntil

仅在给定流发出项后才开始发出项。

MergeStream([
    Stream.fromIterable([1]),
    TimerStream(2, Duration(minutes: 2))
  ])
  .skipUntil(TimerStream(true, Duration(minutes: 1)))
  .listen(print); // prints 2;

startWith

向源流添加一个值。

Stream.fromIterable([2]).startWith(1).listen(print); // prints 1, 2

startWithMany

在源流前添加一系列值。

Stream.fromIterable([3]).startWithMany([1, 2])
  .listen(print); // prints 1, 2, 3

switchIfEmpty

当原始流没有发出项时,此操作符订阅给定的回退流,并从该流发出项。

这在使用来自多个数据源的数据时特别有用。例如,在使用存储库模式时。假设您有一些数据需要加载,您可能希望从最快的接入点开始,然后一直下降到最慢的接入点。例如,首先查询内存中的数据库,然后查询文件系统上的数据库,如果数据不在本地机器上,则查询网络调用。

这可以通过switchIfEmpty非常简单地实现!

switchMap

使用给定的映射器函数将每个发出的项转换为流。新创建的流将被侦听并开始发送项,而之前创建的任何流将停止发送。

switchMap操作符类似于flatMap和concatMap方法,但是它只从最近创建的流发出项。

例如,当您只想要来自异步api的最新状态时,这可能非常有用。

takeUntil

返回源流序列中的值,直到另一个流序列生成一个值。

MergeStream([
    Stream.fromIterable([1]),
    TimerStream(2, Duration(minutes: 1))
  ])
  .takeUntil(TimerStream(3, Duration(seconds: 10)))
  .listen(print); // prints 1

takeWhileInclusive

发出源流所发出的值,只要每个值都满足给定的测试。当某个值不满足测试时,它将发出该值作为最终事件,然后完成。

Stream.fromIterable([2, 3, 4, 5, 6, 1, 2, 3])
  .takeWhileInclusive((i) => i < 4)
  .listen(print); // prints 2, 3, 4

throttle

当窗口打开时,只发出源流发出的第一项。

如果trailing为真,则会发射最后一项

您可以使用上一个被调节的事件的值来确定下一个window的长度。

Stream.fromIterable([1, 2, 3])
  .throttle((_) => TimerStream(true, Duration(seconds: 1)))

throttleTime

在持续时间范围内仅发出源流发出的第一项。

如果尾随为真,则会发射最后一项

Stream.fromIterable([1, 2, 3])
  .throttleTime(Duration(seconds: 1))

timeInterval

记录流序列中连续值之间的时间间隔。

Stream.fromIterable([1])
  .interval(Duration(seconds: 1))
  .timeInterval()
  .listen(print); // prints TimeInterval{interval: 0:00:01, value: 1}

timestamp

将源流发出的每个项包装在一个带有时间戳的对象中,该对象包括发出的项和发出项的时间。

Stream.fromIterable([1])
   .timestamp()
   .listen((i) => print(i)); // prints 'TimeStamp{timestamp: XXX, value: 1}';

whereType

这个转换器是Stream.where然后是Stream.cast缩写。

不匹配T的事件被过滤掉,结果流的类型为T。

Stream.fromIterable([1, 'two', 3, 'four'])
  .whereType<int>()
  .listen(print); // prints 1, 3
Stream.fromIterable([1, 'two', 3, 'four'])
  .where((event) => event is int)
  .cast<int>()
  .listen(print); // prints 1, 3

window

创建一个流,其中每个项都是包含源序列项的流。

每当窗口发出一个事件时,就会发出此列表。

Stream.periodic(Duration(milliseconds: 100), (i) => i)
  .window(Stream.periodic(Duration(milliseconds: 160), (i) => i))
  .asyncMap((stream) => stream.toList())
  .listen(print); // prints [0, 1] [2, 3] [4, 5] ...

windowCount

通过计数从源流中缓冲一些值,然后将该缓冲区作为流释放并清除它,并启动一个新的缓冲区each startBufferEvery值。如果没有提供startBufferEvery,则在源的开始处以及在每个缓冲区关闭并发出时立即启动新的缓冲区。

RangeStream(1, 4)
  .windowCount(2)
  .asyncMap((stream) => stream.toList())
  .listen(print); // prints [1, 2], [3, 4] done!

windowTest

创建一个流,其中每个项都是包含来自源序列的项的流,在测试通过时进行批处理。

Stream.periodic(Duration(milliseconds: 100), (int i) => i)
  .windowTest((i) => i % 2 == 0)
  .asyncMap((stream) => stream.toList())
  .listen(print); // prints [0], [1, 2] [3, 4] [5, 6] ...

windowTime

创建一个流,其中每个项都是一个流,包含来自源序列的项,在带有持续时间的时间框架中采样。

Stream.periodic(Duration(milliseconds: 100), (int i) => i)
  .windowTime(Duration(milliseconds: 220))
  .doOnData((_) => print('next window'))
  .flatMap((s) => s)
  .listen(print); // prints next window 0, 1, next window 2, 3, ...

withLatestFrom

扩展流类,使其能够将源流与另一个流中最后发出的项合并。

zipWith

返回一个流,该流使用给定的zippers函数将当前流与另一个流组合在一起。

Stream.fromIterable([1])
    .zipWith(Stream.fromIterable([2]), (one, two) => one + two)
    .listen(print); // prints 3

Subjects

Dart提供了StreamController类来创建和管理流。RxDart提供了两个额外的流控制器,它们具有额外的功能,称为Subjects:

BehaviorSubject-一个广播流控制器,缓存最新的增值或错误。当新的侦听器订阅流时,最新的值或错误将发送给侦听器。此外,您可以同步读取最后发出的值。

final subject = BehaviorSubject<int>();

subject.add(1);
subject.add(2);
subject.add(3);

subject.stream.listen(print); // prints 3
subject.stream.listen(print); // prints 3
subject.stream.listen(print); // prints 3
final subject = BehaviorSubject<int>.seeded(1);

subject.stream.listen(print); // prints 1
subject.stream.listen(print); // prints 1
subject.stream.listen(print); // prints 1

ReplaySubject-一个缓存添加值的广播流控制器。当新的侦听器订阅流时,缓存的值将发送到侦听器。

一个特殊的StreamController,它捕获已添加到控制器的所有项,并将它们作为第一个项发送给任何新侦听器。

此主题允许向侦听器发送数据、错误和done事件。当项目被添加到主题时,ReplaySubject将存储它们,当侦听流时,那些已记录的项将被发送到侦听器。在此之后,任何新事件都将被适当地发送到侦听器。可以通过设置maxSize值来限制存储事件的数量。

ReplaySubject,在默认情况下,是一个广播(aka hot)控制器,以履行Rx Subject contract.。这意味着主题的流可以被收听多次。

final subject = ReplaySubject<int>();

subject.add(1);
subject.add(2);
subject.add(3);

subject.stream.listen(print); // prints 1, 2, 3
subject.stream.listen(print); // prints 1, 2, 3
subject.stream.listen(print); // prints 1, 2, 3
final subject = ReplaySubject<int>(maxSize: 2);

subject.add(1);
subject.add(2);
subject.add(3);

subject.stream.listen(print); // prints 2, 3
subject.stream.listen(print); // prints 2, 3
subject.stream.listen(print); // prints 2, 3
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,830评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,992评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,875评论 0 331
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,837评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,734评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,091评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,550评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,217评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,368评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,298评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,350评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,027评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,623评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,706评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,940评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,349评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,936评论 2 341