[Rx.NET] 7 – 使用基础查询操作符

[Rx.NET] 7 – 使用基础查询操作符

Observable 推送通知后,Observer 收到通知前通知通常会通过一个管道。本系列的大多数例子都展示了如何使用操作符来操纵控制生成的通知序列,然后最终由 Observer 接收到这些通知。本节将分类解释经常使用到的基础操作符。包括转换和映射,过滤和平展,求和、求平均的聚合运算,以及其他可量化结果的操作。如下图所示:

一些操作符在前面的章节已经介绍过了,但是还没有了解它们的定义及提供的功能。

选择重要数据(映射)

使用 Observable 推送通知, 目前为止,十分好用。有时接收到的通知并不完全符合要求,并且不会总是适合程序处理的格式。例如,在使用远端时,传输对象通常简单明了,包含最小量的数据。这通常叫做数据传输对象(DTO),DTO 只携带必要的数据(例如,标识符)用于执行程序逻辑。但是,当收到 DTO 时,使用自己的数据类型或从存储区中获取相应数据更加方便。为此,需要使用Select操作符进行转换。

Select操作符(在函数式编程中,通常叫Map)可以将管道中的通知转换为更加适合使用的数据格式,如下图所示:

下面的代码展示了如何使用Select操作符转换接收到的包含用户识别码的 ChatMessage。代码从数据库加载 User 并且创建了一个适用于 UI 布局的 ViewModel,代码如下:

IObservable < ChatMessage > messages = ...

IObservable < ChatMessageViewModel > messagesViewModels =
    messages.Select(m => new ChatMessageViewModel 
    {
        MessageContent = m.Content,
        User = LoadUserFromDb(m.Sender)
    });

示意图如下:

使用Select操作符来做的转换创建了一个推送ViewModel的新 Observable。

使用Select可以以声明和可读的形式来完成多次转换。Select操作符有两个重载,一个接收参数为通知数据的简单选择器,另一个增加了通知索引参数:

IObservable<TResult> Select<TSource, TResult>(
    IObservable<TSource> source,
    Func<TSource, TResult> selector);

IObservable<TResult> Select<TSource, TResult>(
    IObservable<TSource> source,
    Func<TSource, int, TResult> selector);

Select接收的转换函数(选择器)可以执行任何转换。可以根据通知创建一个对象,或从通知中提取一个值作为返回值,或者忽略通知并返回一个完全不同的值。

平展 Observable

有时候 Observable 推送的通知是一个 Enumerable 或者另一个 Observable,这时想要操作通知中的每一项数据就会有一点难度了。例如,通知为一个数字集合,并且想要过滤出其中的偶数数据。那么过滤集合的最佳方式是什么呢?为每个内部集合添加一个Where操作就能实现,但是这会降低代码的可读性。一种更好的解决方案是使用SelectMany平展内部集合,然后使用Where来过滤结果,下面来了解这一过程。

平展 Observable 中的集合

SelectMany操作符用来平展 Observable。如果一个 Observable 的通知是一系列集合,那么使用SelectMany操作符后的结果就是推送集合中的每一项数据。如下图所示:

SelectMany也叫FlatMap(别名),因为它将每个通知映射为一个集合,然后将这些集合平展到一个流中。假设有一个推送新闻的 Observable,该 Observable 推送的新闻有一个需要展示在屏幕上的图片集合,并且过滤掉那些 PG-13 级(儿童限制级)的图片。那么使用 Rx 可以像下面这样写:

IObservable<NewsItem> news = ...

news.SelectMany(n => n.Images)
    .Where(img => img.IsChildFriendly)
    .Subscribe(img => AddToHeadlines(img));

这里使用的SelectMany接口如下所示:

IObservable<TResult> SelectMany<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IEnumerable<TResult>> selector)

该接口接收源 Observable,然后使用 selector 函数为每个通知生成一个TResult类型的集合,最后的 Observable 结果将这些生成的集合平展,这也是为什么接口返回的类型是 IObservable<TResult> 的原因。

在上面的例子中可以看到,SelectMany的 selector 函数返回了 Image 的集合,SelectMany生成的 Observable 结果推送新闻中的所有图片。因此,可以使用Where查询作用在SelectMany同级操作上,而不是使用Where作用在每个集合上面。

下图展示了可能接收到的新闻的一种情况:

输出结果如下:

News headline image: Item1Image1
News headline image: Item2Image1

需要注意的是,当使用SelectMany作用在 Observable 的集合上时,最后推送的通知项的顺序跟原来一致。所以在上面的例子中,当第一个新闻的所有图片都推送完毕后才会推送第二个新闻的图片。

SelectMany有一个重载可以获取源 Observable 通知的索引:

IObservable<TResult> SelectMany<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, int, IEnumerable<TResult>> selector)

可以使用该重载根据通知在序列中的位置来改变生成集合的方式。

SelectMany可以很好的处理 Observable 的平展,但是现在推送的数据失去了与源通知的联系。例如,从新闻项中生成的图片与该新闻项分离了。幸运的是,SelectMany提供了另一个重载可以处理这个问题。

例如对于每个图片的标题视图都添加一个指向新闻项的链接。下面是实现的代码:

IObservable<NewsItem> news = ...

news.SelectMany(n => n.Images,
    (newsItem, img) => new NewImageViewModel
    {
        ItemUrl = newsItem.Url,
        NewsImage = img
    })
    .Where(vm => vm.NewsImage.IsChildFriendly)
    .Subscribe(img => AddToHeadlines(img));

SelectMany操作符的重载接收一个resultSelector函数,对于每个集合的每一项都会执行该函数,并且将包含集合的源通知作为参数传入,函数的返回值就是最后推送的数据。resultSelector的另一个重载可以接收源通知在序列中的序列号。接口如下:

IObservable<TResult> SelectMany<TSource, TCollection, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IEnumerable<TCollection>> collectionSelector,
    Func<TSource, TCollection, TResult> resultSelector)

IObservable<TResult> SelectMany<TSource, TCollection, TResult>(
    this IObservable<TSource> source,
    Func<TSource, int, IEnumerable<TCollection>> collectionSelector,
    Func<TSource, int, TCollection, int, TResult> resultSelector)

C# 编译器提供的一个很好的特性是能够在手动完成时将SelectMany运算符添加到查询中,而无需过多的代码。为实现这个,需要在写查询代码时使用Let操作符。下面是查询的代码:

IObservable<NewsItem> news = ...

var newsImages =
    from n in news
    from img in n.Images
    where img.IsChildFriendly
    select new NewImageViewModel
    {
        ItemUrl = n.Url,
        NewsImage = img
    };

两个from声明(可以只用任意多个)会在编译时生成一个SelectMany将新闻项和图片封装为一个对象。之后,对新闻项和图片的所有引用都将从该对象中获取。

平展 Observable 中的 Observable

SelectMany也能用于平展推送 Observable 数据项的 Observable。如下图所示:

假设你的聊天程序支持多聊天室聊天。每个聊天室用 ChatRoom 表示,每个 ChatRoom 都有一个用于推送消息的 Observable。在程序中,希望有一个展示页面显示近期的消息(来自所有聊天室的消息)。如下图所示:

代码如下:

IObservable<ChatRoom> rooms = ...

rooms.Log("Rooms")
     .SelectMany(r => r.Messages)
     .Select(m => new ChatMessageViewModel(m))
     .Subscribe(vm => AddToDashboard(vm));

为模拟打开两个聊天室并推送消息的情况,使用下面的简单代码,代码中聊天室和消息都是用Subject实现:

var roomsSubject = new Subject<ChatRoom>();
IObservable<ChatRoom> rooms = roomsSubject.AsObservable();

var room1 = new Subject<ChatMessage>();
roomsSubject.OnNext(new ChatRoom {Id = "Room1", Messages = room1Messages});
room1.OnNext(new ChatMessage{Content = "First Message", Sender = "1"});
room1.OnNext(new ChatMessage{Content = "Second Message", Sender = "1"});

var room2 = new Subject<ChatMessage>();
roomsSubject.OnNext(new ChatRoom{Id = "Room2", Messages = room2Messages});
room2.OnNext(new ChatMessage{Content = "Hello World", Sender = "2" });

room1.OnNext(new ChatMessage{Content = "Another Message", Sender = "1" });

运行结果如下:

Rooms – OnNext(ChatRoom: Room1)
Room: Room1 , Message: “First Message” was sent by Id=1 Name:User1
Room: Room1 , Message: “Second Message” was sent by Id=1 Name:User1
Rooms – OnNext(ChatRoom: Room2)
Room: Room2 , Message: “Hello World” was sent by Id=2 Name:User2
Room: Room1 , Message: “Another Message” was sent by Id=1 Name:User1

前面的章节,我们创建了Log操作符用来在调用时打印 OnNext、 OnError 和 OnCompleted 方法的信息。在这里Log操作符用来在新聊天室打开时显示一条信息(输出结果中加粗部分)。从结果可以看到无论来自哪个聊天室的消息都会显示出来。

正如在平展集合时一样,在使用SelectMany平展 Observable 中的 Observable 时也可以指定resultSelector函数。如下图所示:

下面是使用方法的一个例子:

IObservable<ChatRoom> rooms = roomsSubject.AsObservable();

rooms.Log("Rooms")
     .SelectMany(r => r.Messages, (room, msg) => new ChatMessageViewModel(msg){Room = room.Id})
     .Subscribe(vm => AddToDashboard(vm));

不像集合,Observable 会异步推送通知,所以resultSelector的执行顺序是不确定的。这就意味着,SelectMany必须缓存从源 Observable 接收到的所有通知,以便于在返回的 Observable 中推送它们。当然这仅限于 Observable 完成前。因此,使用SelectMany可能会影响程序的内存占用量。

过滤 Observable

Observable 推送的通知并不都是对程序有意义的。因此,需要将无意义的通知从通知序列中过滤出去。这些无意义的值可能是那些过于某一阈值的值,或者是聊天应用中加入黑名单的用户发送的消息,或者是已经在其他新闻源获取到的新闻信息。

使用 Where 操作符过滤

几乎每个例子都是用了Where操作符,它是大多数查询语言的基本操作符之一。Where接收一个断言函数,每个推送的通知都会调用该函数并返回一个 Boolean 值,该 Boolean 值指示是否允许该值在管道中继续并由 Observer 接收。Where操作符也叫Filter操作符。使用方式如下图所示:

该示例展示了怎么过滤一个字符串 Observable,并且只有以 A 开头的字符串会被推送。代码如下:

var strings = new[] {"aa", "Abc", "Ba", "Ac"}.ToObservable();

strings.Where(s => s.StartsWith("A"))
       .SubscribeConsole();

结果如下:

  • OnNext(Abc)
  • OnNext(Ac)
  • OnCompleted()

Where操作符单独检查推送的每一个通知,它不会浏览使用当前通知之前的整个通知序列。这就使创建需要前面通知信息的断言函数变得困难,所以开发者需要保存通知然后再使用。例如,当想要一个没有重复值得 Observable 序列时就需要访问之前推送过得数据,幸运的是,Rx 提供了一系列的操作符来解决这些问题。

创建一个不重复序列

Distinct操作符对结果 Observable 执行一个限制协议,每个值只在序列中出现一次。例如,Observable 推送来自多个新闻源的新闻数据,但是只需要接收同一个新闻一次,Distinct操作符就能完成这个任务。下图是Distinct操作符的子弹图:

需要注意的是,Distinct操作符会在通知到来时立即推送它们(而不是在源 Observable 完成之后),除非已经推送过该通知。

var subject = new Subject<NewsItem>();

subject.Log()
       .Distinct(n=>n.Title)
       .SubscribeConsole("Distinct");

subject.OnNext(new NewsItem() {Title = "Title1"});
subject.OnNext(new NewsItem() {Title = "Title2"});
subject.OnNext(new NewsItem() {Title = "Title1"});
subject.OnNext(new NewsItem() {Title = "Title3"});

上面的程序输出中,带有Distinct前缀的行是删除重复项(使用Distinct操作符)后的结果,输出结果如下:

OnNext(Title1)
Distinct – OnNext(Title1)
OnNext(Title2)
Distinct – OnNext(Title2)
OnNext(Title1)
OnNext(Title3)
Distinct – OnNext(Title3)
OnCompleted()
Distinct – OnCompleted()

正如所见,第二次推送的Title1消息被过滤掉了。

Distinct操作符有多个重载。如果推送的通知消息实现了Equals方法,那么可以使用无参数的Distinct操作符重载,该重载会根据Equals的实现来判断消息是否相等。此外,还可以提供一个EqualityComparer用来比较两个消息。

IObservable<TSource> Distinct<TSource>(this IObservable<TSource> source)

IObservable<TSource> Distinct<TSource>(
    this IObservable<TSource> source,
    IEqualityComparer<TSource> comparer)

IObservable<TSource> Distinct<TSource, TKey>(
    this IObservable<TSource> source, Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> comparer)

IObservable<TSource> Distinct<TSource, TKey>(
    this IObservable<TSource> source, Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> comparer)

移除连续重复值

假设有一个搜索表单,并且当用户每次修改搜索词时,都会向搜索服务器发送搜索请求。由于搜索请求在时间和服务器负载方面都会造成影响,因此当多次重复请求时,应该减少请求次数。如下图所示:

DistinctUntilChanged操作符返回一个推送连续不重复的 Observable。如果源 Observable 连续推送同一个通知,那么DistinctUntilChanged返回的 Observable 只会推送第一次收到的通知。但是,不像Distinct操作符,如果该值在其它值推送后再次推送,那么该值将会再次推送。如下图所示:

下面的例子使用DistinctUntilChanged来阻止向服务器发送相同的请求(如果已经发送过得话)。为了更加真实,使用了另一个Throttle操作符,该操作符只有在指定的时间内没有消息推送才会推送最后一次推送的消息。在下面的例子中,只会在 400ms 内没有其它请求需要发送时才会发送最后一次接收到的请求:

Observable.FromEventPattern(SearchTerm, "TextChanged")
          .Select(_ => SearchTerm.Text)
          .Throttle(TimeSpan.FromMilliseconds(400))
          .DistinctUntilChanged()
          .Subscribe(s => /* Sending the search term to the WebService */);

Distinct操作符一样,DistinctUntilChanged操作符同样提供了方法重载用来判断两个值是否相等。

IObservable<TSource> DistinctUntilChanged<TSource>(
this IObservable<TSource> source,
IEqualityComparer<TSource> comparer)

IObservable<TSource> DistinctUntilChanged<TSource, TKey>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector)

IObservable<TSource> DistinctUntilChanged<TSource, TKey>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
IEqualityComparer<TKey> comparer)

Rx 的查询结果通常是一系列值,但是偶尔,我们只需要一个值,像计算着一系列值的和或者它们的总个数。那么 Rx 的聚合操作也许能帮我们实现。

Observable 序列的聚合

Rx 可以对整个消息序列或到当前消息为止的消息进行聚合操作。该聚合操作包括,求和、计数、查找最大值、最小值等。

基本聚合操作

熟悉SQL和LINQ的人都知道,可以在查询中轻松包含基本聚合函数,底层系统将完成具体工作。Rx 操作符提供了相同的技术。

Sum操作符会计算整个消息序列值的和,然后在源 Observable 完成时推送计算出的和。如下图所示:

Sum操作符支持所有原始数字类型(int,float 等),同时还支持 null,当值为 null 时该值会被丢弃。下面是一个简单实例:

Observable.Range(1, 5)
          .Sum()
          .SubscribeConsole("Sum");

输出如下:

Sum – OnNext(15)
Sum – OnCompleted()

可以看到当源 Observable 完成后输出了结果(15)。Sum的重载提供了一个 selector 函数,该函数可以选择用于求和的操作数。也就是说可以从推送的对象中选择一个子属性来用于求和计算。下面是整形(int)求和的接口,同样对于其他类型的求和计算也是一样的:

IObservable<int> Sum<TSource>(this IObservable<TSource> source, Func<TSource, int> selector)

要想计算 Observable 推送的通知数量可以使用Count操作符,如下图所示:

当 Observable 完成时Count操作符就会推送计数结果:

Observable.Range(1, 5)
          .Count()
          .SubscribeConsole("Count");

结果如下:

Count – OnNext(5)
Count – OnCompleted()

Count操作符的一个重载可以接收一个断言函数用来指定那些通知可以用来计数,就像在无参Count操作符前有一个Where操作符一样。下面是一个例子:

Observable.Range(1, 5)
          .Count(x => x % 2 == 0)
          .SubscribeConsole("Count of even numbers");

输出如下:

Count of even numbers – OnNext(2)
Count of even numbers – OnCompleted()

Average操作符会在 Observable 完成时推送平均数通知。如下图所示:

Average操作符支持所有基础数字类型,当值为 null 时该值将被丢弃。

Observable.Range(1, 5)
          .Average()
          .SubscribeConsole("Average");

输出如下:

Average – OnNext(3)
Average – OnCompleted()

同样,Average操作符也有接收 selector 的重载,其中接收 int 的接口如下:

IObservable<double> Average<TSource>(this IObservable<TSource> source, Func<TSource, int> selector)

MaxMin操作符可以用来在 Observable 完成时获取通知序列中的最大值或最小值。如下所示:

例子如下:

Observable.Range(1, 5)
          .Max()
          .SubscribeConsole("Max");

Observable.Range(1, 5)
          .Min()
          .SubscribeConsole("Min");

输出如下:

Max – OnNext(5)
Max – OnCompleted()
Min – OnNext(1)
Min – OnCompleted()

MaxMin使用 .NET 的默认比较来决定数据的大小。当默认比较不能满足需要时,可以提供一个 IComparer 和(或) 选择器来自定义比较。相应重载如下:

IObservable<TSource> Max<TSource>(
    this IObservable<TSource> source,
    IComparer<TSource> comparer)

IObservable<TResult> Max<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, TResult> selector)

IObservable<TResult> Max<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, TResult> selector,
    IComparer<TResult> comparer)

需要注意的是,selector 选择器返回的是用于判断大小的值而不是源消息通知。

例如,假设有一个学生成绩的 Observable,并且想要查找最好成绩的学生,Max操作符可能不能实现这个功能,因为通过Max操作符获取的是最好成绩而不是学生对象,代码如下:

Subject<StudentGrade> grades = new Subject<StudentGrade>();
grades.Max(g => g.Grade)
      .SubscribeConsole("Maximal grade");

grades.OnNext(new StudentGrade() {Id = "1",Name = "A", Grade = 85.0});
grades.OnNext(new StudentGrade() {Id = "2",Name = "B", Grade = 90.0});
grades.OnNext(new StudentGrade() {Id = "3",Name = "C", Grade = 80.0});
grades.OnCompleted();

输出结果如下:

Maximal grade – OnNext(90)
Maximal grade – OnCompleted()

正如所见,Max操作符推送的是 90 而不是拥有最好成绩的 StudentGrade 对象。如果想要打印最好成绩的学生姓名,那么就不能这个样来操作了。如果想要实现这个功能就需要MaxBy/MinBy操作符。

按条件查找最大值和最小值项

MaxByMinBy操作符可以在 Observable 中查找具有最大值或最小值的消息通知,并且在 Observable 完成时推送它们。如下图所示:

通过 keySelector 函数从 Observable 推送的消息中选择用于比较的值。

由于可能存在多个消息具有最大值或最小值,所以MaxByMinBy推送的结果是一个列表,接口如下:

IObservable<IList<TSource>> MaxBy<TSource, TKey>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector)

下面的例子用来查找最好成绩的学生:

Subject<StudentGrade> grades = new Subject<StudentGrade>();
grades.MaxBy(s => s.Grade)
      .SelectMany(max => max)
      .SubscribeConsole("Maximal object by grade");

grades.OnNext(new StudentGrade() { Id = "1", Name = "A", Grade = 85.0 });
grades.OnNext(new StudentGrade() { Id = "2", Name = "B", Grade = 90.0 });
grades.OnNext(new StudentGrade() { Id = "3", Name = "C", Grade = 80.0 });
grades.OnCompleted();

输出结果如下:

Maximal object by grade – OnNext(Id: 2, Name: B, Grade: 90)
Maximal object by grade – OnCompleted()

使用 Aggregate 和 Scan 自定义聚合

在 Rx 中,可以自定义应用于 Observable 序列的聚合函数。每个推送的消息通知都会调用聚合函数,然后将结果保留用于下一次消息通知的计算。

可以使用下面两个操作符来创建聚合函数:

  • Aggregate — 应用于 Observable 推送的每一个消息通知,然后在 Observable 完成时推送计算结果
  • Scan — 应用于 Observable 推送的每一个消息通知,然后立即推送当前计算结果

下图展示了两个不同的操作符:

下面是计算所有值乘积的例子:

Observable.Range(1, 5)
          .Aggregate(1, (accumulate, currItem) => accumulate * currItem)
          .SubscribeConsole("Aggregate");

Observable.Range(1, 5)
          .Scan(1, (accumulate, currItem) => accumulate * currItem)
          .SubscribeConsole("Scan");

输出结果如下:

Aggregate – OnNext(120)
Aggregate – OnCompleted()
Scan – OnNext(1)
Scan – OnNext(2)
Scan – OnNext(6)
Scan – OnNext(24)
Scan – OnNext(120)
Scan – OnCompleted()

在上面的例子中,Observable 推送 1 至 5 的一系列值,Aggregate 推送最后的计算结果 120,Scan 推送了每一步的计算结果 1! 2! 3! 4! 5!

除了种子值和累加器函数之外,Aggregate运算符还提供了一个重载,可以使用该重载来使用自定义类型来计算,接口如下:

IObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(
    this IObservable<TSource> source,
    TAccumulate seed,
    Func<TAccumulate, TSource, TAccumulate> accumulator,
    Func<TAccumulate, TResult> resultSelector)

假设需要查找 Observable 中的第二大项,可以使用Aggregate来实现,如下所示:

Subject<int> numbers = new Subject<int>();
numbers.Aggregate(new SortedSet<int>(),
                 (largest, item) =>
                 {
                     largest.Add(item);
                     if (largest.Count > 2)
                     {
                         largest.Remove(largest.First());
                     }
                     return largest;
                 },
                 largest => largest.FirstOrDefault())
      .SubscribeConsole();

numbers.OnNext(3);
numbers.OnNext(1);
numbers.OnNext(4);
numbers.OnNext(2);
numbers.OnCompleted();

上面的例子使用空的 SortedSet 作为种子,该类用于保持数据项排序,并且确保集合中没有重复值。对于源 Observable 推送的每一个值,累加器函数就会在集合中添加一项,然后压缩该集合确保集合中最多有两个值。

当源 Observable 完成,resultSelector 函数就会返回集合中第一个值(如果存在的话)。由于 SortedSet 是有序的,并且集合里最多只有两个值,所以第一个值就是想要的第二大值。

输出结果如下:

OnNext(3)
OnCompleted()

使用AggregateScan可以创建自己强大的聚合功能。从某种意义上说,它们等价于响应式的循环,可以应用于集合然后从中产生一个单值。

总结

Rx提供的查询功能丰富而广泛。有时可能会让人难以理解或理解起来很复杂,因此为了便于理解Rx查询的一些最常用操作符的使用,本章介绍了下面的内容:

  • Select操作符将推送的通知转换为另一种形式。包括仅获取子属性或创建新的对象(例如,一个ViewModel)。
  • Observable推送其它Observable或其它集合(或包含它们的项)。SelectMany操作符可以将内部的Observable(或集合)合并为平面流;通过传递collectionSelector函数就能实现。
  • SelectMany操作符还可以接收resultSelector函数,可以为每个推送的项及其源项调用该函数。
  • SelectMany是使用Let运算符手动编写查询是背后的强大支撑。
  • Where操作符过滤推送的通知,通过接收一个判断函数来测试每个通知。
  • Distinct操作符可以获取一个没有重复项的Observable
  • DistinctUntilChanged操作符可以获取一个没有重复且连续的Observable
  • Rx提供了可用于Observable的通用统计聚合函数。就是SumCountAverageMaxMin
  • MaxByMinBy操作符基于子属性获取最大和最小项。
  • 可以使用AggregateScan操作符来自定义聚合逻辑。
  • Aggregate操作符仅当在Observable完成时才会推送聚合结果。
  • Scan操作符在Observable每次推送通知时都会推送聚合结果。

在本章中,介绍的仅仅是在单个Observable范围内起作用的操作符。下一章将介绍用于将Observable精细分解(组)和合并多个Observable

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注