[Rx.NET] 8 – Observable 的分割和组合

[Rx.NET] 8 – Observable 的分割和组合

典型的应用程序通常由多个工作流程组成。 很多情况下,应用程序需要处理多个数据源并对其进行响应,例如UI事件、推送通知、远程过程调用等等。 假设你的应用程序需要使用来自各种来源(例如社交网络)的消息,并以相同的方式对所有消息做出反应。 或者说,你的应用程序处理的是发出各种通知(例如股票价格)流的源,它需要分别(独立)查看通知的每个子组(针对每种股票)。 你需要怎么实现呢?

有多种方式组合Observable并对它们发出的通知做出反应(例如,获取最新,配对或联接)。 并且,有多种方法可根据Observable(例如,按时间或条件)创建子组。 本章将通过使用Enumerables中已经了解的概念并将其应用于Observables世界,将你带入一个新的水平。

组合Observables

虽然使用单个可观察对象很好用; 但是,互联网是由多个独立发生的事件组成的。 为了对从多个Observable发出的通知做出反应,Rx提供了可轻松组合Observable的操作符。

配对Observable(压缩)

当需要将两个(或多个)Observable中相同索引中的值组合在一起时,就应该使用Zip操作符。 它需要的参数是需要配对的Observable和描述如何执行压缩的选择器。 它实现的功能是将每个Observable的第一项压缩在一起,将每个Observable的第二项压缩在一起,依此类推。

选择器函数的参数是要压缩的Observable推送的一组值,这些值在Observable推送时的索引一致。 然后,选择器函数从这些值返回计算结果。

假设一个房间内有两个温度传感器,它们大致同时发出数值,想要显示两个读数的平均温度,那么使用Zip运算子的方法如下:

IObservable<double> temp1 = ...
IObservable<double> temp2 = ...

temp1
    .Zip(temp2, (t1, t2) => (t1 + t2)/2)
    .SubscribeConsole("Avg Temp.");

输出如下图所示:

Zip操作符的问题在于它依赖于两个Observable推送的值的索引。 如果一个Observable的频率高于另一个Observable的频率,则Zip会在内存中累积已推送的值,直到从第二个Observable发出下一个值为止(这也意味着,如果第二个Observable从不推送或完成,则第一个Observable的值将永远不会使用,但是仍需保留在内存中)。 在许多情况下,只希望合并Observable发出的最新值。

合并最新推送值

要合并Observable最后发出的一组值,需要使用CombineLatest操作符。 与Zip操作符不同,当一个Observable返回一个值时,即使第二个Observable长时间不发射,CombinateLatest也会返回一个值。如下图所示:

考虑如下示例:有一个监测心率的传感器和一个监测速度的传感器。 无论每个传感器的更新率如何,都希望显示最新的值。 为了模拟这种情况,将Observable创建为可以控制的Subject:

Subject<int> heartRate = new Subject<int>();
Subject<int> speed = new Subject<int>();

speed.CombineLatest(heartRate,
                    (s, h) => String.Format("Heart:{0} Speed:{1}", h, s))
     .SubscribeConsole("Metrics");

现在,可以向每个Observable推送值,然后观察会发生什么。

heartRate.OnNext(150);
heartRate.OnNext(151);
heartRate.OnNext(152);
speed.OnNext(30);
speed.OnNext(31);
heartRate.OnNext(153);
heartRate.OnNext(154);

输出结果如下:

Metrics – OnNext(Heart:152 Speed:30)
Metrics – OnNext(Heart:152 Speed:31)
Metrics – OnNext(Heart:153 Speed:31)
Metrics – OnNext(Heart:154 Speed:31)

这里有两点值得注意。 首先,可以看到心率值152在开始时推送了两次。 这是因为speed连续推送了两个值,并且152是heartRate的最新值。 当heartRate推送值而最新的speed值为31(最后两行)时,也会发生相同的情况。

第二点是,当heartRate最初产生值150和151时,合并的Observable不发出任何内容。 当然,CombineLatest仅在所有Observable至少发出一次值时才发出值。 否则,所有Observable值都没有最新值。

解决值丢失问题,即在所有Observable都未推送值时合并的Observable也会推送的方法是,使用StartWith操作符在每个可Observable的开头添加一个值。 例如,将之前的代码段更改为以下代码,也会同时打印心率值150和151:

speed.StartWith(0)
     .CombineLatest(heartRate.StartWith(0),
                    (s, h) => String.Format("Heart:{0} Speed:{1}", h, s))
     .SubscribeConsole("Metrics");

合并Observable不仅限于从每个Observable中获取值并从中创建统一的结果。 正如接下来将看到的,另一种合并包括创建统一的Observable,并将每个Observable推送的值放入单个流中。

级联Observable

Concat操作符将两个或多个相同类型的可观察对象连接到单个流中,如下图所示:

当第一个Observable完成时,Concat会将第二个Observable的值链接到结果Observable,即使第一个Observable还没完成之前很长时间第二个Observable就推送了值。 请务必注意,Concat操作符仅在第一个Observable完成后才订阅第二个Observable,因此,如果第二个Observable是“热”的,并且在订阅前发出了通知,那么这些值将会被丢弃。

当在异步操作中使用时,可能会对“冷”、“热”Observable以及Concat操作符产生混淆。例如,假设使用Concat来确保两次异步操作结果的推送顺序。要记得,当任务转换为Observable时,会创建一个AsyncSubject,因此异步计算的值不会丢失,而是将“热”操作变为“冷”Observable。

下面的示例模拟了两个异步操作,这些操作从Facebook和Twitter加载消息。 在本示例中,Facebook速度较慢(由于使用了Delay操作符),但是由于使用的是Concat,因此Facebook消息首先出现在输出中:

using System.Reactive.Threading.Tasks;

Task<string[]> facebookMessages = Task.Delay(10).ContinueWith(_=>new[]
    {"Facebook1", "Facebook2"});
Task<string[]> twitterStatuses = 
    Task.FromResult(new[] {"Twitter1", "Twitter2"});

Observable.Concat(facebookMessages.ToObservable(), twitterStatuses.ToObservable())
          .SelectMany(messages=>messages)
          .SubscribeConsole("Concat Messages");

输出如下:

Concat Messages – OnNext(Facebook1)
Concat Messages – OnNext(Facebook2)
Concat Messages – OnNext(Twitter1)
Concat Messages – OnNext(Twitter2)
Concat Messages – OnCompleted()

尽管来自Facebook的结果需要花费更长的时间(由于使用了Delay),但它们仍然首先推送。 但是,有时,Observable之间的顺序没有意义,并且想对Observable被推送时发出的通知做出反应。 为此,需要Merge操作符。

合并Observable

合并Observable意味着希望将来自源Observable的通知路由到单个Observable,以便当一个源发出通知时,合并的Observable也发送通知,如下图所示:

这就意味着,无论哪一个Observable推送了通知,都可以使尽快得到响应。

下面的示例模拟了两个异步操作,这些操作从Facebook和Twitter加载消息。 在本示例中,Facebook的速度较慢(由于使用了Delay操作符),但是由于使用的是Merge,因此首先显示Twitter消息,在 Facebook操作完成后,才显示其消息:

Task<string[]> facebookMessages =
    Task.Delay(10)
        .ContinueWith(_ => new[] { "Facebook1", "Facebook2" });
Task<string[]> twitterStatuses =
    Task.FromResult(new[] { "Twitter1", "Twitter2" });

Observable.Merge(
                 facebookMessages.ToObservable(),
                 twitterStatuses.ToObservable())
          .SelectMany(messages => messages)
          .SubscribeConsole("Merged Messages");

Console.ReadLine();

现在,即使Facebook异步操作第一个传递给Merge操作符,看到的第一个值还是来自Twitter的值,因为此操作先完成:

Merged Messages – OnNext(Twitter1)
Merged Messages – OnNext(Twitter2)
Merged Messages – OnNext(Facebook1)
Merged Messages – OnNext(Facebook2)
Merged Messages – OnCompleted()

Concat和Merge对于合并固定的一组Observable很有用,但是应用程序可能需要以更加动态的方式(例如,基于用户使用情况)创建Observable。 所以希望这些操作符的输入更具动态性,因为如前面所说,所有内容都是流。

动态级联和合并

Concat和Merge不仅可以传进多个想要合并的Observable,而且可以传一个需要合并的Observable。下面是借口:

IObservable<TSource> Concat<TSource>(this IObservable<IObservable<TSource>> sources)
IObservable<TSource> Merge<TSource>(this IObservable<IObservable<TSource>> sources)

这些重载的Merge或Concat在管道中有更大的作用。例如,当一个源Observable推送一个值,然后该值被转换为另一个Observable(例如表示异步的一个值)。假设想要从文本框的文本改变事件创建一个Observable,当文本改变时,调用远程搜索服务器然后显示所有的搜索结果。

IObservable<string> texts = new[] {"Hello", "World"}.ToObservable()

texts
    .Select(txt => Observable.Return(txt + "-Result"))
    .Merge()
    .SubscribeConsole("Merging from observable");

输出结果如下:

Merging from observable – OnNext(Hello-Result)
Merging from observable – OnNext(World-Result)
Merging from observable – OnCompleted()

与动态分配一样,如果没有动态操作,有时候需要设置一个限制;否则,会导致性能下降(就像超出分配导致的OutOfMemoryException或相应性下降)。幸运的是,Rx对此提供了控制。

控制并发

Concat仅在前一个Observable完成时才对下一个Observable进行订阅,但是Merge在操作开始时需要订阅所有Observable。 订阅许多Observable可能会对应用程序造成性能问题,也可能会给源Observable(例如远程服务)带来沉重负担。 所以,需要限制允许进行合并的并发订阅的数量。 下面是展示如何操作的示例:

IObservable<string> first =
    Observable.Interval(TimeSpan.FromSeconds(1))
              .Select(i=>"First"+i)
              .Take(2);

IObservable<string> second =
    Observable.Interval(TimeSpan.FromSeconds(1))
              .Select(i=> "Second" + i)
              .Take(2);

IObservable<string> third = 
    Observable.Interval(TimeSpan.FromSeconds(1))
              .Select(i=> "Third" + i)
              .Take(2);

new[] {first,second,third}.ToObservable()
                          .Merge(2)
                          .SubscribeConsole("Merge with 2 concurrent subscriptions");

Console.ReadLine();

在该示例中,有三个Observable同时推送通知。如果Merge订阅了所有Observable,将会看到三个Observable的通知交替推送。但是,输出的结果如下:

Merge with 2 concurrent subscriptions – OnNext(Second0)
Merge with 2 concurrent subscriptions – OnNext(First0)
Merge with 2 concurrent subscriptions – OnNext(Second1)
Merge with 2 concurrent subscriptions – OnNext(First1)
Merge with 2 concurrent subscriptions – OnNext(Third0)
Merge with 2 concurrent subscriptions – OnNext(Third1)
Merge with 2 concurrent subscriptions – OnCompleted()

可以看到,第三个Observable推送的通知是独立分开的。这是因为在前面两个Observable完成之后才会订阅第三个Observable。如果第一个Observable完成了,第二个Observable还在推送通知,那么将会看到第二个和第三个Observable的合并通知结果。

切换Observable

在前面的示例中,对于每一次文本改变,都调用了一次远程搜索服务,然后在屏幕上显示所有结果。更友好的方式是只显示最后的结果。

现在想想一下,当等待从后端传回结果时,另一个搜索被执行了(文本再次改变)。这时,需要取消订阅前面的一步查询操作,然后新建并订阅一个查询。

要完成切换订阅的任务,需要用到Switch操作符,如下图所示:

下面的简单程序,可以模拟文本改变的弹珠图。使用Delay操作符为R1推送增加一点儿延迟,以便在接收到R结果之前切换到R2。

var textsSubject = new Subject<string>();
IObservable<string> texts = textsSubject.AsObservable();

texts
    .Select(txt => Observable.Return(txt + "-Result")
    .Delay(TimeSpan.FromMilliseconds(txt == "R1" ? 10 : 0)))
    .Switch()
    .SubscribeConsole("Merging from observable");

textsSubject.OnNext("R1");
textsSubject.OnNext("R2");
Thread.Sleep(20);
textsSubject.OnNext("R3");
切换到第一个Observable推送

假设有多个Observable推送同一个通知序列,但是只需要其中最快的一个(第一个推送消息的)。

可能是服务器之间的切换,也可能是计算结果与缓存结果之间的切换。Switch在这里排派不上用场,因为它绑定第一个推送的Observable,然后切换到较慢的Observable。

Amb(ambiguity的缩写)操作符和Switch操作符的工作方式相似,但是与Switch不同的是,Amb仅仅是切换到第一个推送值的Observable。可以这样认为:所有的Observable的推送都一样,希望它们进行竞争,第一个推送的Observable留下来,其它的丢弃掉。

下面是一个例子:

var server1 =
    Observable.Interval(TimeSpan.FromSeconds(2))
              .Select(i => "Server1-" + i);

var server2 =
    Observable.Interval(TimeSpan.FromSeconds(1))
              .Select(I => "Server2-" + i);

Observable.Amb(server1, server2)
          .Take(3)
          .SubscribeConsole("Amb");

Console.ReadLine();

上面的代码中,server2首先推送,所以在输出中只看到Server2-前缀的结果。

上面的代码也可以这样写
server1.Amb(server2).Take(3).SubscribeConsole(“Amb”);

至此,已经了解了怎么合并和配对Observable。下面,将要介绍怎么分解Observable。

分组Observable

Observable推送的值可以根据特定条件进行分组。不像集合和数据集,对Observable推送值进行分组将创建一个大小不固定的组,其大小未知且可能无限大。因为不确定Observable会推送什么样的值。

要对Observable的值进行分组,需要将每一个组生成为一个Observable;也就是说,每组的Observable会推送该组的值。例如,使用GroupBy操作符可以将People的信息流分成Males和Females的分组,如下图所示:

基本的GroupBy如下所示:

IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector)

可以看到,返回类型是分好组的Observable。分组Observable包含一个Key属性,该属性描述哪一种值会被推送。

GroupBy还有一些重载,可以传递elementSelector(可以在值推送前进行转换)和capacity(用来控制同时存在的组的最大数量)。

通过分组,可以对每个组进行单独的查询。例如,可以对Males和Females分别进行平均年龄的计算:

var genderAge =
    from gender in people.GroupBy(p => p.Gender)
    from avg in gender.Average(p => p.Age)
    select new {Gender=gender.Key, AvgAge=avg};

genderAge.SubscribeConsole("Gender Age");

还可对前面的例子使用GroupBy查询语法:

var genderAge =
    from p in people
    group p by p.Gender
    into gender
    from avg in gender.Average(p => p.Age)
    select new { Gender = gender.Key, AvgAge = avg };

接下来,介绍另一个概念,在集合中很清楚,但是在Observable里却很棘手的概念:连接。

联接Observable(基于巧合的合并)

合并Observable不仅限于使用推送的值来创建一个新类型的结果。另一个有趣的合并是寻找值之间的关联和逻辑关联。例如,试图得到在同一时间段内同时存在的值。

查询数据库表和集合项时,可以通过组合两个或多个共同字段来清晰的表示连接。那么如何在反应式流里表述这个关系呢?Rx的共性是基于发生的巧合的,也就是同一时间范围内推送的通知。

简而言之,基于在同一时间范围内存在的巧合来组合来自各种Observable的值,这就是联结。可以通过两种方法连接Observable。第一种是将联结对推送到单个平面流。第二个是创建相关组,然后将值推送到相关组中。

平面流联接

让我们从如何联接Observable的例子开始。假设正在进行一项统计研究,希望获得同一时间在同一房间内的男性和女性的情况。这是一个联接的经典案例,如下图所示:

要在Observable之间创建联接,需要使用Join操作符,该操作符根据重叠的持续时间来关联两个序列的值。Join的声明有点复杂,如下:

IObservable<TResult> Join<TLeft, TRight,
    TLeftDuration, TRightDuration,TResult>(
    IObservable<TLeft> left,                                          //联接的左Observable
    IObservable<TRight> right,                                        //联接的右Observable
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector ,    //决定左Observable的时间窗口
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector,  //决定右Observable的时间窗口
    Func<TLeft, TRight, TResult> resultSelector);                     //计算左右Observable重叠时间的结果

声明中最难的部分是持续时间选择器。这些函数接收推送的值然后返回一个Observable,该Observable推送一个时间段内的值。

假设有一个传感器,编码为DoorEvent的“热”Observable,用来监视房间内人员的进出。想要推送同一房间同一时间所有男性和女性:

IObservable<DoorOpened> doorOpened = doorOpenedSubject.AsObservable();

DoorEvent的定义如下:

class DoorOpened
{
    public string Name { get; set; }
    public OpenDirection Direction { get; set; }  // 表示进入房间或离开房间
    public Gender Gender { get; set; }
}

提取男性和女性进入房间的Observable如下:

var entrances = doorOpened.Where(o => o.Direction == OpenDirection.Entering);

var maleEntering = entrances.Where(x => x.Gender == Gender.Male);
var femaleEntering = entrances.Where(x => x.Gender == Gender.Female);

同样,提取离开房间的Observable:

var exits = doorOpened.Where(o => o.Direction == OpenDirection.Leaving);

var maleExiting = exits.Where(x => x.Gender == Gender.Male);
var femaleExiting = exits.Where(x => x.Gender == Gender.Female);

现在,需要将房间内出现的女性和男性联系在一起。为此,需要为每个(男性或女性进入)通知定义在房间内存在的时间段标记。对于反应式,定义时间段就是定义一个在时间段结束时推送(或完成)的Observable。具体方法如下:

maleEntering
    .Join(femaleEntering,
          male => maleExiting.Where(exit => exit.Name == male.Name),
          female => femaleExiting.Where(exit => female.Name == exit.Name),
          (m, f) => new {Male = m.Name, Female = f.Name})
.SubscribeConsole("Together At Room");

为了测试代码,需要创建一个作为Observable后端的Subject,然后按照上面图片中的顺序推送通知:

doorOpenedSubject.OnNext(
    new DoorOpened("Bob", Gender.Male, OpenDirection.Entering));
doorOpenedSubject.OnNext(
    new DoorOpened("Sara", Gender.Female, OpenDirection.Entering));
doorOpenedSubject.OnNext(
    new DoorOpened("John", Gender.Male, OpenDirection.Entering));
doorOpenedSubject.OnNext(
    new DoorOpened("Sara", Gender.Female, OpenDirection.Leaving));
doorOpenedSubject.OnNext(
    new DoorOpened("Fibi", Gender.Female, OpenDirection.Entering));
doorOpenedSubject.OnNext(
    new DoorOpened("Bob", Gender.Male, OpenDirection.Leaving));
doorOpenedSubject.OnNext(
    new DoorOpened("Dan", Gender.Male, OpenDirection.Entering));
doorOpenedSubject.OnNext(
    new DoorOpened("Fibi", Gender.Female, OpenDirection.Leaving));
doorOpenedSubject.OnNext(
    new DoorOpened("John", Gender.Male, OpenDirection.Leaving));
doorOpenedSubject.OnNext(
    new DoorOpened("Dan", Gender.Male, OpenDirection.Leaving));

// 其余的代码模拟参与者离开房间

运行结果如下:

Together At Room – OnNext({ Male = Bob, Female = Sara })
Together At Room – OnNext({ Male = John, Female = Sara })
Together At Room – OnNext({ Male = Bob, Female = Fibi })
Together At Room – OnNext({ Male = John, Female = Fibi })
Together At Room – OnNext({ Male = Dan, Female = Fibi })

用查询写Join

C#编译器支持使用LINQ查询的方式写联接,写法如下:

from [left] in [leftObservable]
join [right] in [rightObservable] on [leftDuration] equals [rightDuration]
select ...

上面例子的代码写法如下:

from male in maleEntering
join female in femaleEntering on maleEntering.Where(exit =>
exit.Name == male.Name) equals
femaleExiting.Where(exit => female.Name == exit.Name)
select new {Male = male.Name, Female = female.Name};

Join创建一个单独的Observable推送所有相关性。但是,有时分而治之的方式更容易使用。

按这种方式的话,就应该是每位男性接收到该男性与女性同时在房间内的所有情况。因此,每位男性成为了所有关联女性组的组密钥,并且该组是这些女性的Observable。因此,将会得到多个Observable(每组一个),而不是对于所有匹配的一个Observable。为此,需要使用GroupJoin操作符。

组联接

GroupJoin操作符可以基于重叠时间段来关联两个Observable序列元素,并且将与每个元素相关的元素关联到一个本身就是Observable的组中。如下图所示:

例如,在上面的统计研究实验中,想要为每位男性推送与他们在同一房间的女性。可以将与每位男性相关的女性视为一组Observable。

基于巧合,该组的动机是,对于每个组,都可以以更简单的方式定义更好的查询。 例如,女性群体的平均年龄是多少?

GroupJoin的声明如下:

IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
    this IObservable<TLeft> left,                                     // 联接的左Observable
    IObservable<TRight> right,                                        // 联接的右Observable
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,     // 决定左Observable的时间窗口
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector,  // 决定右Observable的时间窗口
    Func<TLeft, IObservable<TRight>, TResult> resultSelector)         // 根据右Observable序列计算每个左Observable元素在其重叠时间内的结果

假如想要扩展上一节中的示例(查找房间内所有成对的男性和女性)。现在,需要添加一个计数器,表示到目前为止与每位男性在房间内的的女性的数量。

和之前一样,需要表示男性和女性进入与离开房间的Observable:

var maleEntering = entrances.Where(x => x.Gender == Gender.Male);
var femaleEntering = entrances.Where(x => x.Gender == Gender.Female);
var maleExiting = exits.Where(x => x.Gender == Gender.Male);
var femaleExiting = exits.Where(x => x.Gender == Gender.Female);

现在,可以使用GroupJoin创建相关组。对于每位男性创建一个包含其名字和与其有关的女性的Observable的对象:

var malesAcquaintances =
    maleEntering
        .GroupJoin(femaleEntering,
                   male => maleExiting.Where(exit => exit.Name == male.Name),
                   female => femaleExiting.Where(exit => female.Name == exit.Name),
                   (m, females) => new {Male = m.Name, Females = females});

然后,就可以创建针对malesAcquaintancesObservable的查询,该查询计算每位男性在房间内遇到的女性的个数:

var amountPerUser =
    from acquaintances in malesAcquaintances
    from cnt in acquaintances.Females.Scan(0, (acc, curr) => acc + 1)
    select new {acquaintances.Male, cnt};

amountPerUser.SubscribeConsole("Amount of meetings for User");

按照上面图片中男性和女性进出房间的顺序运行示例,结果如下:

Amount of meetings per User – OnNext({ Male = Bob, cnt = 1 })
Amount of meetings per User – OnNext({ Male = John, cnt = 1 })
Amount of meetings per User – OnNext({ Male = Bob, cnt = 2 })
Amount of meetings per User – OnNext({ Male = John, cnt = 2 })
Amount of meetings per User – OnNext({ Male = Dan, cnt = 1 })

可以注意到,每次计数更改时都会推送通知。

用查询写GroupJoin

同样的,GroupJoin也可以用LINQ查询来写。写法如下:

from [left] in [leftObservable]
join [right] in [rightObservable] on [leftDuration] equals [rightDuration]
    into [correlationGroup]
select ...

上面示例的写法如下:

from male in maleEntering
join female in femaleEntering on maleExiting.Where(e => e.Name == male.Name)
equals femaleExiting.Where(e => female.Name == e.Name)
into maleEncounters
select new { Male = male.Name, Females = maleEncounters };

联接是Rx中强大的工具,因为它可以查找元素之间的关系并捕获巧合。

至此,对连接不同Observable有了很好的了解,并且对单个Observable拆分为子Observable(组)有了基本的了解。接下来,将要了解更多将Observable分解的技术。

缓冲和滑动窗

对于Observable,在大多数情况下都可以认为是无限元素(或通知)。处理无边界问题,无论是Observable还是集合,都是不容易的。作为人类,习惯于将其分解为多个有边界的问题,以便于分别处理,最终,将这些结果映射到无边界的“全局”解中。这就是所谓的分而治之。在响应式编程中,有两种方法来处理该问题:

  • 缓冲将Observable序列分解为叫做buffers的有界集合。
  • 窗口化将Observable分解为更精细的Observable,以定义其持续时间。

两者之间最重要的区别是,窗口化可以立即获得推送的元素,缓冲仅在关闭(因为缓冲区已满或缓冲时间结束)时获得推送的元素。如下图所示:

窗口的两种定义

窗一词在这里有点令人迷惑,因为它代表了两个不同 但相关的事物。
首先,窗口是元素在一定边界(持续时间或数量)下的子流。
其次,窗口是从流中收集元素的逻辑边界。
在上图中,创建的是三个元素的时间跨度(窗口)的缓冲区,每个元素都会被收集到缓冲区。
也可以说在那个时间段内推送的三个元素都被推送到叫做窗口的Observable。
当将它们看成容器时,可以定义三种类型的窗口:
· 浮动窗口是一系列固定大小,不重叠的连续时间间隔。
· 跳跃窗口是一系列向前“跳跃”固定时间的窗口。
· 滑动窗口是一种窗口宽度大于“跳跃”的跳跃窗口,所以有重叠部分。

缓冲

使用缓冲,可以将Observable推送的元素包装到一个缓冲区,然后创建一个集合的Observable,但是集合不能仅包含一个元素。可以使用时间、数量,或者任何通过Observable通知定义的逻辑持续时间来创建缓冲。

假设应用程序连接到自行车的速度计,该速度计以恒定的速率推送速度值。这里希望程序显示加速度变化。为此,需要获取两个连续的读数,并计算它们的插值。这里可以使用Buffer来实现,缓冲区为包含两个元素的滑动窗口。

下图为自子弹图:

代码如下:

IObservable<double> speedReadings = ...   // 每秒钟推送一次速度的Observable

Double timeDelta = 0.0002777777777777778; // 一秒在一小时中的占比

Var accelerations =
    From buffer in speedReadings.Buffer(count: 2, skip: 1) // 在Observable上应用Buffer,将推送的元素包装起来
    Where buffer.Count==2                                  // 指定缓冲区数量(Observable最后推送的元素将会被单独包装在缓存内)
    Let speedDelta = buffer[1] - buffer[0]                 // 计算缓冲区两个车速读数之间的差异
    Select speedDelta/timeDelta;

accelerations.SubscribeConsole("Acceleration");

在上面的例子中,是用来查询语法,因为可以使用let关键词引入新的子计算,从而使代码量更小。在speedReadingsObservable上应用Buffer后,可以得到一个具有两个连续元素的Observable。

除了使用缓存两个连续的读数的方法,还可以使用Zip操作符:speedReadings.Zip(speedReadings.Skip(1), (x,y)=> y-x)。这里压缩使用了Observable和其自己的偏移版本。

可以看到,在例子中使用了有两个参数的Buffer操作符,如下所示:

IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source,
    int count,
    int skip);

传递的第一个参数表示每个缓冲区需要的元素数,第二个参数(skip)表示在打开第一个缓冲区之前(打开另一缓冲区之前)需要推送的通知数。这两个参数的组合创建了各种类型的窗口(前面有解释),就像下图所示。

  • 浮动窗口:如果skip与缓冲区容量相等,则缓冲区在前一个缓冲区关闭时打开。
  • 跳跃窗口:如果skip比缓冲区容量大,那么在缓冲区关闭之后,下一个缓冲区将在推送count - skip(count减去skip)个元素后才打开。

  • 滑动窗口:如果skip比缓冲区容量小,那么缓冲区与下一个缓冲区重叠并共享一部分元素。

Buffer的一些重载可以使用时间跨度来设置缓冲区,或者同时使用时间跨度和数量来设置,以先发生为准:

IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, 
    TimeSpan timeSpan);

IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source,
    TimeSpan timeSpan,
    TimeSpan timeShift); // 两个连续缓冲区之间的间隔时间

IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source,
    TimeSpan timeSpan,
    Int count);

如果想要对缓冲区的开启和关闭有更多的控制,可以使用接受Observable作为开启或关闭触发器的重载。

如果关闭缓冲区触发开启下一个缓冲区,可以使用下面的重载:

IObservable<IList<TSource>> Buffer<TSource, TBufferBoundary>(
    IObservable<TSource> source,
    IObservable<TBufferBoundary> bufferBoundaries); // 通过推送通知来关闭当前缓冲区和打开下一个缓冲区

如果使用单个Observable来控制缓冲区关闭(然后打开下一个缓冲区)不能满足需求。并且需要对每个打开的缓冲区创建特定的持续时间,可以使用下面的重载:

IObservable<IList<TSource>> Buffer<TSource, TBufferClosing>(
    IObservable<TSource> source,
    Func<IObservable<TBufferClosing>> bufferClosingSelector); // 在缓冲区打开是调用,返回一个通过推送通知来关闭缓冲区的Observable

如果缓冲区可以独立打开和关闭,可以使用下面的重载:

IObservable<IList<TSource>> Buffer<TSource, TBufferOpening, TBufferClosing>(
    IObservable<TSource> source,
    IObservable<TBufferOpening> bufferOpenings,                               // 推送通知时触发缓冲区打开
    Func<TBufferOpening, IObservable<TBufferClosing>> bufferClosingSelector); // 返回一个Observable,在推送通知时触发缓冲区关闭

假设正在编写一个可以快速接收消息的聊天应用程序。由于不想阻塞UI界面,需要在短时间内保护其不受太多更新的影响。所以应该在消息接收之间有个短暂的停顿,并将所有接收到的消息同时显示在屏幕上。为此,可以缓存聊天信息,然后使用另一个Observable的推送来控制缓存:

IObservable<string> messages = ... // 推送消息,也许会高频接收

messages.Buffer(messages.Throttle(TimeSpan.FromMilliseconds(100))) // 将数据缓存到一个间隔100ms的缓存区
        .SelectMany((b, i) => b.Select(m => string.Format("Buffer {0} - {1}", i, m)))
        .SubscribeConsole("Hi-Rate Messages");

Console.ReadLine();

为了模拟高频率的消息,创建一个间隔50ms推送4个数据的Observable,然后暂停200ms后继续推送4个数据。(可以看到,这里将“冷”Observable转换为了“热”Observable,以获得真实的结果)

var coldMessages = Observable.Interval(TimeSpan.FromMilliseconds(50))
    .Take(4)
    .Select(x => "Message " + x);

IObservable<string> messages =
    coldMessages.Concat(coldMessages.DelaySubscription(TimeSpan.FromMilliseconds(200)))
                .Publish()
                .RefCount();

//如示例所示,下面为使用Bufferc操作符的代码片段

运行结果为:

Hi-Rate Messages – OnNext(Buffer 0 – Message 0)
Hi-Rate Messages – OnNext(Buffer 0 – Message 1)
Hi-Rate Messages – OnNext(Buffer 0 – Message 2)
Hi-Rate Messages – OnNext(Buffer 0 – Message 3)
Hi-Rate Messages – OnNext(Buffer 1 – Message 0)
Hi-Rate Messages – OnNext(Buffer 1 – Message 1)
Hi-Rate Messages – OnNext(Buffer 1 – Message 2)
Hi-Rate Messages – OnNext(Buffer 1 – Message 3)
Hi-Rate Messages – OnCompleted()

借助Buffer的不同重载,可以控制缓冲何时开始,何时关闭。不过,仅在缓冲区关闭时才能接受到数据,这可能会需要一段时间(取决于实现逻辑)。

如果需要对缓冲区内的元素执行一些操作(如求和或过滤),只能在缓冲区关闭时进行操作。对于这种需要更加“实时”的情况,可以使用Window操作符。

窗口化

Window操作符可以将Observable序列按照时间边界或容量分成多个窗口。窗口就是一个Observable,其推送时间边界内的元素。如下图所示:

Window操作符和Buffer操作符很像,但是不是将所有元素都包进缓冲区然后在缓冲区关闭时推送所有元素,Window会在元素到达时立即推送。

假设有一个呼叫中心的应用程序用来收集捐款。该工作每一小时轮班一次,想要查看每次轮班的捐款情况。

这时,仅使用Buffer是不够的,因为只能在一小时窗口结束时才能获得捐款总额。如果使用Window操作符,那么就能对值进行实时求和并显示在屏幕上:

IObservable<decimal> donations = ...

var windows = donations.Window(TimeSpan.FromHours(1));

var donationsSums =
    from window in windows.Do(_ => Console.WriteLine("New Window"))    // 在一个窗口打开(前一个窗口关闭)时打印一条信息
    from sum in window.Scan((prevSum, donation) => prevSum + donation) // 计算总和,当值改变时推送该值
    select sum;

donationsSums.SubscribeConsole("donations in shift");

捐款Observable被分成每一个小时一个的非重叠窗口。然后,拿到每个窗口,并应用Scan操作符将所有捐款值相加。Scan在值改变时推送和(与Aggregate相反,Aggregate在Observable完成时推送)。

donationsSums是一个平面Observable,它从所有窗口推送和。由于已经在windows添加了Do操作符,因此可以在每个窗口间看到一条信息。下面是两轮捐赠结果的输出:

Shift 1—50$, 55$, 60$
Shift 2—49$, 48$, 45$
Output:
New Window
donations in shift – OnNext(50)
donations in shift – OnNext(105)
donations in shift – OnNext(165)
New Window
donations in shift – OnNext(49)
donations in shift – OnNext(97)
donations in shift – OnNext(142)
donations in shift – OnCompleted()

Window操作符有一些重载可以控制窗口的开启和关闭。可以根据容量或持续时间打开关闭窗口。还可以指定在它们之间跳过的元素数或暂停的时间。

下面是重载的一小部分(可以看到,它们和之前的Buffer十分相似):

IObservable<IObservable<TSource>> Window<TSource>(
    IObservable<TSource> source,
    int count,
    int skip);

IObservable<IObservable<TSource>> Window<TSource>(
    IObservable<TSource> source,
    TimeSpan timeSpan,
    TimeSpan timeShift);

IObservable<IObservable<TSource>> Window<TSource>(
    IObservable<TSource> source,
    TimeSpan timeSpan,
    int count);

如果跳过的元素数(或者时间偏移)小于窗口的容量(或窗口持续时间),将创建一个滑动窗口,并且两个窗口之间存在重叠,如下图所示:

动态窗口

窗口可以根据逻辑动态的打开和关闭。通过一个为每个窗口创建Observable的函数,来为每个窗口定义不同的关闭策略,该Observable在完成时推送通知来决定窗口关闭:

IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(
    IObservable<TSource> source,
    Func<IObservable<TWindowClosing>> windowClosingSelector);

可以用类似的方式控制打开窗口。可以为Window操作符提供一个Observable,当其推送通知时触发窗口的打开:

IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(
    IObservable<TSource> source,
    IObservable<TWindowOpening> windowOpenings,
    Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector);```

如果想要创建一个自己控制边界且不重叠的窗口,可以使用下面的重载:

```csharp
IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(
    IObservable<TSource> source,
    IObservable<TWindowBoundary> windowBoundaries);

windowBounderies是一个通过推送通知来控制窗口开启和关闭的Observable。

窗口和缓冲是将一个大问题分解为多个可以独立解决的小问题的两种方法。通过将Observable分成多个部分,可以先洞悉不同部分,然后反应出整体。适用于对在一定时间内落入的元素子集进行聚合或其他操作。

在本章,介绍了很多响应式编程逻辑的高级技术。是时候总结一下,以便将来有个参考来加深记忆。

总结

在本章,了解了构建Observable不仅限于单个Observable,还可以创建依赖于多个Observable的关系和组合的查询:

  • Zip操作符将两个或多个Observable的相同索引的元素进行配对。
  • CombineLatest操作符将每个Observable推送的最后一个元素进行组合。
  • Concat操作符将在前一个Observable完成后推送下一个Observable的值。
  • Concat仅在前一个Observable完成后才订阅先一个Observable。
  • Merge操作符订阅所有的Observable,并且在获取到通知时立马推送。
  • 可以通过传递允许并发订阅数的参数来限制Merge操作符的并发订阅数。
  • Switch操作符创建一个Observable,该Observable推送最近的Observable元素。
  • Amb操作符与Switch操作符很像,但是其切换到第一个推送的Observable。
  • 在Rx中,分组就是创建拥有共同键值元素的Observable,通过GroupBy操作符来完成。
  • 在Rx中,联接两个Observable就是推送在同一时间段内的元素对。
  • Join操作符合并两个Observable在同一时间段内的推送项,并且将匹配的项推送到平面Observable。
  • GroupJoin操作符基于重叠的持续时间来关联两个Observable的元素,然后对于第一个Observable的每一个元素,将第二个Observable中与其相关的元素归为一组,最后为第一个Observable的每一个元素创建Observable。
  • 可以使用LINQ查询语句的方式写JoinGroupJoin
  • Buffer操作符可以将Observable分解为有界集合,并创建这些集合的Observable。
  • Window操作符可以将Observable分解为更精细的Observable。
  • BufferWindow都可以控制持续时间或容量,并允许创建滑动窗口。

在本系列看到的许多示例中,从创建Observable到查询和合并,有些添加了时间和执行上下文(线程、任务等)。下一章将会介绍Rx中的时间和并发性,以及如何使用它们来控制查询的执行。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注