[Rx.NET] 9 – Rx中的并发与同步

[Rx.NET] 9 – Rx中的并发与同步

时间就是一切,至少有人这么说。不像集合(Enumerables),时间在Observable中起到很大作用。通知之间的间隔有大有小,这可能影响处理通知的方式。在上一章,已经介绍了缓存元素和创建随时间推移的滑动窗口的示例。还有执行发生在哪里的问题(例如,线程、任务、调度等等)。时间和执行上下文的概念是有关系的,并且为Rx的并发模型提供了基础。调度类型及其派生表达次模型。本章介绍Rx的调度层,以及如何使用它控制Rx的可观察管道,以及如何使用它和Rx的时间操作符一起使用。

用调度控制并发

在计算机科学中,并发是那些同时执行多个计算并可能相互交互的系统的属性。在前面的章节中谈到了并发,其中提到了不同的.NET异步类型。到目前为止,一直避免谈及Observable管道内如何处理并发。如果使用Interval操作符创建一个每10秒推送一次的Observable,那么在哪个线程中接收通知呢?Observer的订阅在哪个线程中进行?有时,例如使用UI框架时,控制这些执行上下文就很重要,因为对在哪个线程执行操作有限制。通常UI控件只能在UI线程中更改,否则就会引发异常。

Rx遵循此设计准则:引入并发的所有操作都必须使用Scheduler类型,该类型是Rx使用并发和时间的抽象层。

定义Scheduler

简单来说,Scheduler是表示时钟和执行上下文的单元。时钟维护当前时间并允许在特定时间(例如计时器)调度工作。执行上下文决定在哪里处理工作(例如,在当前线程或在当前SynchronizationContext对象中)。如下图所示:

Rx中的所有Scheduler都实现IScheduler接口,如下所示:

public interface IScheduler
{
    DateTimeOffset Now { get; }   // 调度器的当前时间

    IDisposable Schedule<TState>( // 调度一个需要执行的action,返回一个用于取消调度的IDisposable
                    TState state,
                    Func<IScheduler, TState, IDisposable> action);

    IDisposable Schedule<TState>( // 调度一个在给定时间后执行的action,返回一个用于取消调度的IDisposable
                    TState state,
                    TimeSpan dueTime,
                    Func<IScheduler, TState, IDisposable> action);

    IDisposable Schedule<TState>( // 调度一个在指定时间内执行的action,返回一个用于取消调度的IDisposable
                    TState state,
                    DateTimeOffset dueTime,
                    Func<IScheduler, TState, IDisposable> action);
}

Scheduler包含一个返回当前时间的Now属性。大多数Scheduler都返回DateTimeOffset.UtcNow,但是在更高级的情况下,Scheduler的抽象时间可以用来控制测试时间和重现过去的事件。

Scheduler接口与Now属性一起为Scheduler的方法提供了一些重载。这些重载可以控制动作是在绝对或相对时间还是立即执行。要调度动作,需要传递选择的状态对象,调度时间,和一个Func<IScheduler, TState, IDisposable>类型的动作。

当预设时间来临并且动作被调用时,将会接收到调用的Scheduler和提供的状态对象。状态对象用与维护Scheduler调用者和接下来要执行动作之间的上下文。状态对象的类型没有任何限制,所以可以选择任何数据类型。

被调度的动作必须返回一个Disposable对象,该对象作为取消令牌。释放它意味着触发取消正在运行的动作并清除那些作为其组成部分的资源。

下面看一个使用Scheduler的示例。将使用NewThreadScheduler(位于System.Reactive.Concurrency中)来调度在屏幕上打印当前时间的动作。这里希望在2秒后调度该动作,这里不使用与环境时钟或特定于平台的计时器,而是使用Rx中的Scheduler来完成。如下图所示:

IScheduler scheduler = NewThreadScheduler.Default;

IDisposable scheduling =
    scheduler.Schedule(
        Unit.Default,               // 这里不使用状态对象,Unit.Default相当于null
        TimeSpan.FromSeconds(2),    // 在调度被调用后2秒执行
        (scdlr, _) =>               // 接收用于递归调度的`Scheduler`和状态对象。
        {                           // 由于没有特定的资源要释放并且不需要取消,所以返回Disposable.Empty
            Console.WriteLine("Hello World, Now: {0}", scdlr.Now);
            Return Disposable.Empty;
        });

运行示例,等待2秒,输出如下:

Hello World, Now: 22/12/2015 13:45:00 +00:00

在上面的示例中,没有使用状态对象和返回的Disposable,但是通常用它们来控制调度动作内部的逻辑。

下面看一个更复杂的重复事件(每2秒钟),该事件计算其被调用的次数。将会使用状态对象以及创建递归调度以每2秒运行一次动作,该动作可以使用返回的Disposable来取消:

IScheduler scheduler = NewThreadScheduler.Default;
Func<IScheduler, int, IDisposable> action = null;
action = (scdlr, callNumber) =>
    {
        Console.WriteLine("Hello {0}, Now: {1}, Thread: {2}",
            callNumber,
            scdlr.Now,
            Thread.CurrentThread.ManagedThreadId);

        return scdlr.Schedule(callNumber + 1, TimeSpan.FromSeconds(2), 
                              action); // 重新以2秒调度另一个动作,增加状态对象(用作调用计数器)。返回`Schedule`函数的`Disposable`
    };

IDisposable scheduling =
    scheduler.Schedule(
        0,  // 第一次调度传递的初始状态对象(作为调用计数器),由于是第一次调用,所以传递0。
        TimeSpan.FromSeconds(2),
        action);

下图展示了上面周期性行为的时序图:

如果现在运行示例,将会在屏幕上持续打印信息。如果想要停止,可以调用schedulingDispose函数。

在内部,调度器连接了从初始Schedule函数返回的和后续所有返回的Disposable。因此,及时已经进行了内部深层的调度,调用最外层DisposableDispose函数也会起作用。

一些调度器,除了实现IScheduler接口外,还实现了Rx提供的另外两个接口:

  • ISchedulerPeriodic声明了SchedulePeriodic方法,用于调度定期运行的动作。
  • ISchedulerLongRunning声明了ScheduleLongRunning方法,用于调度长时间运行的动作。

大多数情况下,不会直接使用调度器。而是,将调度器传递给Rx操作符用于控制并发。

参数化并发

IScheduler接口提供了在程序中引入并发的抽象。IScheduler允许执行并发操作的操作符不必了解并发是怎么具体实现的,只需向操作符提供希望的Scheduler作为参数即可。要创建一个每秒推送一个通知并且要确保推送动作在当前线程上的Observable,可以像下面这样写:

Console.WriteLine("Before - Thread: {0}",
    Thread.CurrentThread.ManagedThreadId);

Observable.Interval(TimeSpan.FromSeconds(1), CurrentThreadScheduler.Instance)
          .Take(3)
          .Subscribe(x => Console.WriteLine("Inside - Thread: {1}",
                              x,
                              Thread.CurrentThread.ManagedThreadId));

可以看到,这里将CurrentThreadScheduler.Instance作为参数传给了Interval操作符。这能够确保Interval内部的计时器使用当前线程。输出如下所示(线程Id可能不同):

Before – Thread: 1
Inside – Thread: 1
Inside – Thread: 1
Inside – Thread: 1

Interval创建的是“冷”Observable。由于示例中使用了CurrentThreadScheduler,所以订阅的Observer都同步运行,因此该订阅操作就变成了阻塞操作,只能在整个Observable序列完成后才能继续。

如果不给Interval传递Scheduler,将会使用在另一个线程上运行计时器的默认Scheduler,因此推送将在那个线程上进行,并输出一下结果(线程Id可能不同):

Before – Thread: 1
Inside – Thread: 4
Inside – Thread: 4
Inside – Thread: 4

这时,调用OnNext方法的线程是4,不同于之前执行订阅操作的线程。也就是说,现在Observable是异步的。

就像Interval操作符,所有支持并发的Rx操作符都可以传入一个Scheduler(否则使用默认的)。前面介绍的操作符都隐藏了接收Scheduler参数的重载。

许多使用Rx的开发人员错误的认为Rx的所有内容都在后台运行。实际上,Rx操作符很清楚的表明了其通过接收IScheduler的重载来执行后台操作的意图。看一下下面Range操作符的示例,该示例创建了一个推送一系列数字的Observable,然后使用Repeat操作符来不断重新订阅。想象一下输出结果是什么:

var subscription =
    Observable.Range(1, 5)
              .Repeat()
              .SubscribeConsole("Range on another thread");

与许多人的错误认识不同,将会在控制台上无限的输出1-5,并且不会立即取消订阅。因为在一次推送完成时,Repeat操作符重新进行了订阅。该过程在调用线程一遍又一遍的重复执行,所以Dispose方法永远不会被调用。

要想解决这个问题,可以将Range的推送放到另一个线程里进行,如下所示:

Observable.Range(1, 5, NewThreadScheduler.Default)

现在调用线程不会被阻塞了,并且Dispose方法会被尽快调用。

Rx还提供了一些适用于不同目的的IScheduler接口的实现。

Scheduler类型

为了设置Observable管道的并发,Rx提供了一些Scheduler。Rx中所有的标准Scheduler都在System.Reactive.Concurrency命名空间下。为了展示每个Scheduler的不同效果,将使用下面的测试方法:

public static void TestScheduler(IScheduler scheduler)
{
    scheduler.Schedule(Unit.Default,
        (s, _) => Console.WriteLine("Action1 - Thread:{0}",
                      Thread.CurrentThread.ManagedThreadId));
    scheduler.Schedule(Unit.Default,
        (s, _) => Console.WriteLine("Action2 - Thread:{0}",
                      Thread.CurrentThread.ManagedThreadId));
}
NewThreadScheduler

顾名思义,NewThreadScheduler在新线程中运行调度的动作。默认情况下,NewThreadScheduler为每个调度操作创建一个新的Thread对象,不过可以传递一个Func<ThreadStart, Thread>类型的threadFactory来负责创建线程。

大多数情况下,不需要实例化Scheduler,而是使用NewThreadScheduler.Default静态属性的共享示例。

TestScheduler(NewThreadScheduler.Default);

输出结果如下(线程Id可能不同):

Action1 – Thread:7
Action2 – Thread:8

有一个问题通常会使在对Scheduler进行递归调用时使用NewThreadScheduler的开发人员赶到迷惑—不会打开新的线程。在内部,使用了使用同一线程的EventLoopScheduler

由于为每个调度创建新的线程效率不高,所以NewThreadScheduler主要用于长时间运行的操作。对于短暂的操作,建议使用ThreadPool

ThreadPoolScheduler

为每个调度动作创建一个新的线程效率不高;在操作系统中打开和关闭线程很费时间和内存。因此,.NET提供了ThreadPool类,该类可以重用线程,而不是每次都打开一个新的线程。ThreadPoolScheduler的工作方式和NewThreadScheduler相似,但是使用的是线程池而不是新建线程:

TestScheduler(ThreadPoolScheduler.Instance);

输出结果如下(线程Id可能不同):

Action1 – Thread:9
Action2 – Thread:10

从输出可以看出,两个动作是彼此独立地安排在不同的线程上的。

NewThreadScheduler不同,递归调度也是在线程池中排队的,因此不同的调度可能运行在不同的线程上。当需要在线程上调度时应首选ThreadPoolScheduler

TaskPoolScheduler

TaskPoolScheduler的工作方式和ThreadPoolScheduler相似,但是使用的不是ThreadPool,而是Task Parallel Library(TPL)任务池。在某些平台(例如WinRT)上,无法访问线程池,所以TaskPoolScheduler是完美的替代品。

CurrentThreadScheduler

CurrentThreadScheduler在调用Schedule方法的线程上执行调度动作。在调度动作内部的任何递归调度都会放入由Scheduler维护的按时间排列的队列中。一个调度动作完成后,Scheduler会从队列中选择下一个动作,并在该调度动作的dueTime来临时运行,如果已经错过那么会立即运行。

TestScheduler(CurrentThreadScheduler.Instance);

Calling thread: 1
Action1 – Thread:1
Action2 – Thread:1

该示例表明所有调度动作都在同一线程上运行,并且与调用者运行的线程相同。当编写递归调用的调度时,其都会在同一线程上运行。

ImmediateScheduler

就像CurrentThreadSchedulerImmediateScheduler在当前线程调度动作。但是与CurrentThreadScheduler一个接一个排队调度不同,ImmediateScheduler立即运行每个动作或者堵塞到dueTime到来:

var immediateScheduler = ImmediateScheduler.Instance;

Console.WriteLine("Calling thread: {0} Current time: {1}",
    Thread.CurrentThread.ManagedThreadId, immediateScheduler.Now);

immediateScheduler.Schedule(Unit.Default,
   TimeSpan.FromSeconds(2),
   (s, _) =>
   {
      Console.WriteLine("Outer Action - Thread:{0}",
         Thread.CurrentThread.ManagedThreadId);

      s.Schedule(Unit.Default,
          (s2, __) =>
          {
              Console.WriteLine("Inner Action - Thread:{0}",
                  Thread.CurrentThread.ManagedThreadId);

               return Disposable.Empty;
          });

      Console.WriteLine("Outer Action - Done");

      return Disposable.Empty;
});

Console.WriteLine("After the Schedule, Time: {0}",immediateScheduler.Now);

输出结果如下(线程Id可能不同):

Calling thread: 1 Current time: 24/12/2015 18:00:47 +00:00
Outer Action – Thread:1
Inner Action – Thread:1
Outer Action – Done
After the Schedule, Time: 24/12/2015 18:00:49 +00:00

上面的示例中有几处需要注意。首先,所有动作都运行在初始调用者的线程上。其次,内部动作是立即被调度的,而不是等到外部动作完成。第三,After the Schedule是在调用Schedule方法后2秒输出的。这是因为向Schedule方法传递了TimeSpan.FromSecond(2)参数,导致了其阻塞直到dueTime来临。当需要调度那些需要极少数时间的动作时(可以视为固定操作时间),可以使用ImmediateScheduler

EventLoopScheduler

EventLoopScheduler可以将所有动作绑定到一个单独线程。创建EventLoopScheduler时,将会创建一个线程(或者提供一个线程工厂)用来调度所有的动作,而不管这些动作运行在哪个线程上。

在内部,EventLoopScheduler有一个按时间排序的动作队列。每个被调度的动作都要入队,并且当一个动作完成时,下一个任务就会出队。

TestScheduler(new EventLoopScheduler());

输出结果如下(线程Id可能不同):

Calling thread: 1
Action1 – Thread:14
Action2 – Thread:14

从示例可以看出,所有调度动作都运行在同一个线程,但是该线程与创建EventLoopScheduler的线程不同。

SynchronizationContext上调度

在.NET中,SynchronizationContext是一个用于处理特定线程向下文工作的对象,例如WPF和WinForms中的UI线程或ASP.NET中的请求。通过使用SynchronizationContext,可以将工作从源线程调度到目标线程,并让SynchronizationContext处理细节。

Rx中的SynchronizationContextScheduler提供了Rx调度模型与.NET中SynchronizationContext模型之间的桥梁,所以每个调度动作都发布在SynchronizationContext上。创建SynchronizationContextScheduler时需要传递要使用的SynchronizationContext。例如:

var syncContextScheduler = new SynchronizationContextScheduler(
                                   SynchronizationContext.Current);

在WinForms和XAML平台中,SynchronizationContext有着巨大作用,因为在与UI线程不同的线程中操作UI组件会引发异常。因此,与UI相关的所有操作都需要通过正确的SynchronizationContext。在WinForms中,可以使用控件本身在正确的线程中执行操作:

control.BeginInvoke(() => {/* the action code */})

在XAML平台(例如WPF或WinRT)中,可以使用Dispatcher类:

Dispatcher.CurrentDispatcher.BeginInvoke(() => {/* the action code */});

为了在这些平台上更方便的使用,Rx提供了ControlSchedulerDispatcherScheduler,它们为WinForms和XAML平台封装了正确的同步上下文。要访问这些Scheduler需要添加对应平台的程序包—XAML平台为System.Reactive.Windows.Threading,WinForms平台为System.Reactive.Windows.Forms

使用时基操作符

Observable序列与传统枚举之间的主要区别为时间维度。对于Observable,两次通知之间的时间是动态的,并且是可以被Observer确定的。时间的长短会影响对通知的处理方式—忽略通知或者如果通知太快则延迟处理。

在前面的章节,已经看到一些时基操作符。在本节,将会介绍更深层的时基操作符。

获取通知时间戳

由于Observable在不同的时间推送通知,所以有必要知道每个通知被推送的时间。Rx提供了Timestamp操作符,而不是手动添加时间信息,该操作符可以为Observable序列中的每个通知添加UTC日期和时间的详细信息。下图描述了Timestamp操作符:

Timestamp操作符无需参数(可选的Scheduler除外),并且使用包含时间戳的Timestamped<T>封装通知对象:

IObservable<Timestamped<TSource>> Timestamp<TSource>(
                                  this IObservable<TSource> source);

IObservable<Timestamped<TSource>> Timestamp<TSource>(
                                  this IObservable<TSource> source,
                                  IScheduler scheduler)

下面的示例,创建了一个每秒推送一次通知的Observable,就像用软件监控的硬件推送的心跳通知。需要使用Timestamp操作符添加时间戳用于未来分析日志数据。由于不想示例永远运行下去,所以只取了三个通知:

IObservable<long> deviceHeartbeat =
    Observable.Interval(TimeSpan.FromSeconds(1));

deviceHeartbeat
    .Take(3)
    .Timestamp()
    .SubscribeConsole("Heartbeat");

Console.ReadLine();

运行示例,输出结果如下:

Heartbeat – OnNext(0@25/12/2015 22:29:24 +00:00)
Heartbeat – OnNext(1@25/12/2015 22:29:25 +00:00)
Heartbeat – OnNext(2@25/12/2015 22:29:26 +00:00)
Heartbeat – OnCompleted()

加粗的文本是Observable推送的通知。由于使用了Timestamped<T>,所以输出格式是这样的。Timestamped<T>保存了需要被标记时间戳的Observable推送的通知对象和通知被推送时的时间戳。其还实现了一个在调试时很有用的ToString方法。

Timestamp操作符在需要观察Observable序列内部情况以及时间维度对处理方式的影响时很有用。

获取通知之间的间隔

Timestamp操作符可能很有用,但是有时候只关心两个通知间的时间间隔。除了可以使用两次通知的时间戳相减来计算,还可以使用TimeInterval操作符,该操作符记录了Observable中连续元素之间的推送间隔。下图展示了TimeInterval的子弹图:

TimeInterval使用TimeInterval<T>来封装通知对象:

IObservable<TimeInterval<TSource>> TimeInterval<TSource>
    (this IObservable<TSource> source);

IObservable<TimeInterval<TSource>> TimeInterval<TSource>
    (this IObservable<TSource> source, IScheduler scheduler);

在下面的示例中,模拟了硬件设备向应用程序推送心跳信号。创建一个推送3个通知的Observable,每次推送的间隔为:1秒,2秒和4秒。使用TimeInterval来记录这些间隔。显然,当心跳之间的间隔很长时,就意味着被监控的设备不正常了。

var deviceHeartbeat = Observable
    .Timer(TimeSpan.FromSeconds(1))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(2)))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(4)));

deviceHeartbeat
    .TimeInterval()
    .SubscribeConsole("time from last heartbeat");

Console.ReadLine();

输出结果如下:

time from last heartbeat – OnNext(0@00:00:01.0120598)
time from last heartbeat – OnNext(0@00:00:02.0070871)
time from last heartbeat – OnNext(0@00:00:04.0029774)
time from last heartbeat – OnCompleted()

加粗的文本展示了被记录的时间间隔。当然,测量出的时间与设置的不同。这是因为调度通知和测量间隔涉及很多因素:抢占式操作系统,测量本身的时间等等。

即使这样,现在程序也可以通过检查TimeInterval类型中的时间间隔是否在限制时间内来警告用户设备是否有问题。TimeInterval结构包含Interval属性(TimeSpan类型)和保存推送通知的Value属性,并实现了一个在调试时很有用的ToString方法。

使用TimeInterval操作符可以根据推送值之间的间隔来做出决策。有时候,当动作的持续时间太长,需要取消该操作(或查询)。这称之为超时。

添加超时策略

如之前所述,Observable可以表示异步操作,也可以是涉及某种异步操作的可观察管道,例如对于远程服务的请求。

当执行异步操作时,需要知道经过多长时间才能将该操作判断为失败。当与异步服务进行合作时,通常会发生某些错误,从而接收不到任何响应。

为了更容易的处理该问题,Rx提供了Timeout操作符,顾名思义,该操作符可以处理超时问题。它监控Observable推送的通知,并且在设定的时间段内(从上一个通知推送以来)没有推送任何通知,将会引发一个异常,该异常将通过OnError方法传递给Observer。下图展示了TimeOut

下面的示例模拟了一个接一个连续发出4个远程请求的情况,然后等待它们的响应。设置超时时间为3秒,这意味着一旦有一个响应超过了3秒钟,就可以从Observable取消订阅。为此,创建了一个Observable,其推送2个间隔1秒的通知和2个间隔4秒的通知。在管道上添加一个超时时间为3秒的Timeout操作符:

var observable = Observable
    .Timer(TimeSpan.FromSeconds(1))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(1)))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(4)))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(4)));

observable
    .Timeout(TimeSpan.FromSeconds(3))
    .SubscribeConsole("Timeout");

Console.ReadLine();

输出结果如下:

Timeout – OnNext(0)
Timeout – OnNext(0)
Timeout – OnError:
System.TimeoutException: The operation has timed out.

可以看到,由于设置了超时时间为3秒,并且不会发送通知,所以接收到一个TimeoutException

延迟通知

Observable可以以任何频率推送通知。大多数情况下,会在推送时立即处理这些通知。但是有时候,需要延迟处理这些通知。例如,当有不同优先级的请求时(基于客户服务级别协议或SLA),需要延迟处理低优先级的请求,并优先处理高优先级的请求。

Delay操作符可以将需要的延迟添加到所有通知中,也可以针对某一通知添加。下图展示了Delay操作符如何使用时间跨度影响通知。(也存在接受绝对时间的重载)。

如果想要为每个通知添加固定时间的延迟,那么就可以使用Delay操作符。

var observable = Observable
    .Timer(TimeSpan.FromSeconds(1))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(1)))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(4)))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(4)));

observable
    .Timestamp()
    .Delay(TimeSpan.FromSeconds(2))
    .Timestamp()
    .Take(5)
    .SubscribeConsole("Delay");

Console.ReadLine();

输出结果如下:

Delay – OnNext(0@26/12/2015 14:47:41 +00:00@26/12/2015 14:47:43 +00:00)
Delay – OnNext(0@26/12/2015 14:47:42 +00:00@26/12/2015 14:47:44 +00:00)
Delay – OnNext(0@26/12/2015 14:47:46 +00:00@26/12/2015 14:47:48 +00:00)
Delay – OnNext(0@26/12/2015 14:47:50 +00:00@26/12/2015 14:47:52 +00:00)
Delay – OnCompleted()

这里最关键的部分是两个时间戳。右边(加粗)的时间就是通知经过延迟后推送的时间,左边的时间是源Observable推送的时间。可以清楚的看到每个通知的两个时间戳间隔2秒。

添加可变延迟

当固定的延迟不满足需求时,可以还是用Delay的重载,来为每个通知添加不同的延迟:

IObservable<TSource> Delay<TSource, TDelay>(
    this IObservable<TSource> source,
    IObservable<TDelay> subscriptionDelay,
    Func<TSource, IObservable<TDelay>> delayDurationSelector);

还存在一个可以忽略subscriptionDelay的重载,该重载用于延迟对源Observable的订阅。

在下面的示例中,创建了一个整型Observable,并且使用每个整型定义每个通知的延迟时间。这些整型可以是程序中请求的优先级,也可以是用户请求的处理时间:

var observable = new[] {4, 1, 2, 3}.ToObservable();

observable
    .Timestamp()
    .Delay(x => Observable.Timer(TimeSpan.FromSeconds(x.Value)))
    .Timestamp()
    .SubscribeConsole("Delay");

Console.ReadLine();

输出结果如下:

Delay – OnNext(1@26/12/2015 15:10:11 +00:00@26/12/2015 15:10:12 +00:00)
Delay – OnNext(2@26/12/2015 15:10:11 +00:00@26/12/2015 15:10:13 +00:00)
Delay – OnNext(3@26/12/2015 15:10:11 +00:00@26/12/2015 15:10:14 +00:00)
Delay – OnNext(4@26/12/2015 15:10:11 +00:00@26/12/2015 15:10:15 +00:00)
Delay – OnCompleted()

由于是从整型集合创建的Observable,所以这些整型数字在同一时间被推送。每个通知都是独立延迟的,所以即使4是第一个被推送的,Observer也是最后一个接收到。

节流通知

很多情况下,处理彼此相近的通知没有意义。例如,如果用户以很高的频率(每秒3次)更新其详细信息,则由于前两条更新不再相关,所以处理前两个更新没有什么好处。

为了将这种行为添加到Observable管道,需要将预定义时间之前的通知删除,除非这期间没有通知推送。可以使用Throttle操作符,如下图所示:

在下面的示例中,模拟了多个更新推送,但是只有在经过2秒没有其它更新推送才允许继续进行更新:

var observable = Observable
    .Return("Update A")
    .Concat(Observable.Timer(TimeSpan.FromSeconds(2)).Select(_ => "Update B"))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => "Update C"))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => "Update D"))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => "Update E"));

observable.Throttle(TimeSpan.FromSeconds(2))
    .SubscribeConsole("Throttle");

Console.ReadLine();

输出结果为:

Throttle – OnNext(Update A)
Throttle – OnNext(Update D)
Throttle – OnNext(Update E)
Throttle – OnCompleted()

可以看到更新B和C被删除了,因为它们都跟随了一个小于2秒就推送的通知。

可变节流

Throttle操作符可以独立控制每个元素的节流持续时间。为此,需要传递一个为每个元素返回节流周期结束时推送信号的Observable函数。

IObservable<TSource> Throttle<TSource, TThrottle>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TThrottle>> throttleDurationSelector)

每个推送的通知都会导致Throttle操作符丢弃前面的值,并且使用新的返回的Observable开始计算持续时间。

在下面的示例中,扩展了节流的示例,除了常规更新消息,还有一种需要立即更新的新消息。可以使用Throttle操作符阻止处理快速消息,除非该消息是即时消息。在程序中,即时消息可能是十分重要的通知或者来自高优先级的源Observable

var observable = Observable
    .Return("Msg A")
    .Concat(Observable.Timer(TimeSpan.FromSeconds(2)).Select(_ => "Msg B"))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => "Immediate Update"))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => "Msg D"))
    .Concat(Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => "Msg E"));

observable
    .Throttle(x => x == "Immediate Update"
                      ? Observable.Empty<long>()
                      : Observable.Timer(TimeSpan.FromSeconds(2)))
    .SubscribeConsole("Variable Throttling");

输出结果如下:

Variable Throttling – OnNext(Msg A)
Variable Throttling – OnNext(Immediate Update)
Variable Throttling – OnNext(Msg D)
Variable Throttling – OnNext(Msg E)
Variable Throttling – OnCompleted()

在示例中,会检查所有元素。如果是即时消息,将会返回一个立即推送完成(OnCompleted通知)的Observable。否则,将返回一个在2秒后推送通知的Observable。这就是为什么,即使当即时消息在上一个消息推送后2秒之内推送时也能被推送的原因。

间隔采样Observable

处理快速Observable的另一种方式是减慢对通知的反应速度,并以预定义的时间间隔对推送的值进行采样。Sample操作符可以设定时间间隔,在时间间隔结束时推送源Observable最后推送的值。下图展示了Sample操作符的子弹图:

下一个示例展示了如何获取一个每秒推送一次通知,并每3.5秒采样一次的Observable。(示例限制为只获取三个间隔。)在实际情况下,源Observable可能以很快的速度推送通知,但是收集所有值没有太大意义。例如,数字信号处理(DSP)程序通常以足够高的速率对视频信号的音频进行采样,以便于更清晰的解析信号,即使丢失了一些数据也可以理解。(每秒显示24帧就足以欺骗我们的大脑来观看动态图像。)

Observable.Interval(TimeSpan.FromSeconds(1))
    .Sample(TimeSpan.FromSeconds(3.5))
    .Take(3)
    .SubscribeConsole("Sample");

Console.ReadLine();

输出结果如下:

Sample – OnNext(2)
Sample – OnNext(5)
Sample – OnNext(9)
Sample – OnCompleted()

间隔时间不必是固定的。下面的Sample重载可以传递一个在间隔结束时推送的Observable来控制每个时间间隔的长短:

IObservable<TSource> Sample<TSource, TSample>(
    this IObservable<TSource> source,
    IObservable<TSample> sampler)

sampler每次推送后,源Observable中最后一个采样间隔内最新元素(如果有的话)将被推送到结果序列。

在本章中介绍的所有操作符(以及在其它章节中介绍的操作符)都可以接收引入并发的IScheduler。但是
对于不引入并发的操作符,不能传递Scheduler。那么,如果想在Observable管道中间更改执行上下文怎么办?可以使用Rx提供的操作符来添加同步。

同步推送

Observer的角度来说,Observable的推送可以在任何线程上发生,因此,Observer的响应也可以发生在任何线程上。在很多情况下,这并不重要,但是在处理某些框架或库时,可能需要在特定的执行上下文(例如,UI线程)上执行某些操作。此外,有时需要同步处理来自不同Observable的不同Observer,即使它们全部执行在同一线程上,或者使用并发原语(例如,互斥量、信号量等)。幸运的是,不需要自己编写这些底层代码;可以使用Rx的同步操作符。

改变观察的执行上下文

如果想要控制执行上下文(Observer对元素的观察),Rx提供了ObserveOn操作符,该操作符可以接收推送将被调度的Scheduler。可以(在某种程度上)指定OnNext/OnError/OnCompleted函数在哪一个线程上运行。下图是ObserveOn操作符的子弹图:

ObserveOn的经典用法是,当Observer需要修改UI控件(例如,改变按钮宽度)时,需要保证Observer运行在UI线程上。UI线程由XAML平台的DispatcherScheduler或WinForms平台的ControlScheduler管理。

下面的示例从TextBox.TextChanged事件创建一个Observable,并且对其节流。通过节流的文本将添加到ListBox中。由于Throttle操作符使用默认的Scheduler(通常是ThreadPool),可以使用ObserveOn操作符来确保ListBox在UI线程上被修改。

Observable.FromEventPattern(TextBox, "TextChanged")
    .Select(_ => TextBox.Text)
    .Throttle(TimeSpan.FromMilliseconds(400))
    .ObserveOn(DispatcherScheduler.Current)
    .Subscribe(t => ThrottledResults.Items.Add(t));

由于对Dispatcher的观察经常发生,所以可以使用ObserveOnDispatcher简写操作符来进行同样的操作。ObserveOn操作符还有一些重载,可以接收要进行观察的SynchronizationContext或者WinForms控件。在后台,ObserveOn操作符在Observable管道中创建一个拦截器,该拦截器拦截Observer上的每个调用,并在指定Scheduler上执行该调用。

改变订阅/取消订阅的执行上下文

除了控制观察的执行上下文,还可以控制订阅和取消订阅的执行上下文,也就是ObservableSubscribe方法和订阅的Dispose方法的线程。

如果Observable的工作必须在特定线程上进行(例如,在Silverlight中,控件事件的注册必须在UI线程上进行,但是通知的处理可以在任何地方进行)

考虑这么一个Observable,其推送值之前会进行大量处理,例如连接到速度较慢的硬件设备,如下所示:

var observable =
    Observable.Create<int>(o =>
    {
        Thread.Sleep(TimeSpan.FromSeconds(5)); // 模拟在订阅时的长时间操作
        o.OnNext(1);
        o.OnCompleted();
        return Disposable.Empty;
    });

observable.SubscribeConsole("LongOperation");

当运行这段代码时,调用线程将阻塞5秒钟,然后才显示消息。添加ObserveOn也不起作用,因为长时间操作是订阅的一部分。需要做的是让订阅本身在另一个线程中运行。

SubscribeOn操作符可以接收Scheduler来调度订阅和取消订阅。该操作符在Observable管道中创建拦截器,该拦截器将拦截Subscribe方法的调用,并使这些调用运行在指定的Scheduler上。然后,拦截器封装由Subscribe方法返回的Disposable对象,以便于Dispose方法也会在指定的Scheduler中运行。下图展示了SubscribeOn操作符:

取消订阅的拦截可能会引起混乱,因为根据使用的Scheduler,在调用Dispose方法时,可能需要等一会儿才会生效。在下面的示例中,创建了一个每秒推送一次的Observable,并使用EventLoopScheduler进行订阅。然后经过长时间的调度工作后取消订阅。
取消订阅需要一段时间才能完成,同时,通知仍在管道内进行处理。

var eventLoopScheduler = new EventLoopScheduler();
var subscription =
    Observable.Interval(TimeSpan.FromSeconds(1))
              .Do(x => Console.WriteLine("Inside Do")) // 仅当取消订阅后才停止调用Do
              .SubscribeOn(eventLoopScheduler)         // 设置订阅和取消订阅运行在事件循环上
              .SubscribeConsole();

eventLoopScheduler.Schedule(1,
    (s, state) =>
    {   // 模拟发生在事件循环中的长时间操作
        Console.WriteLine("Before sleep");
        Thread.Sleep(TimeSpan.FromSeconds(3));
        Console.WriteLine("After sleep");
        return Disposable.Empty;
    });

subscription.Dispose();

Console.WriteLine("Subscription disposed");

输出结果如下:

Subscription disposed
Before sleep
Inside Do
Inside Do
After sleep
Inside Do

可以看到调用Dispose是立即执行的;但是,由于真正的取消订阅发生在事件循环中,这需要等待事件循环中长时间操作的完成,所以可以看到Do操作度内的信息。

同时使用SubscribeOnObserveOn

根据订阅的Observable,执行订阅的线程可能和处理通知的线程相同,也可能完全不同。可以结合使用SubscribeOnObserveOn操作符来更好的控制Observable管道中每个步骤在哪个线程中运行。并且了解这些操作符运行顺序以及其在哪里发挥作用很重要。

为解决该问题,创建了这个简单的LogWithThread操作符,用来提供有关订阅和推送运行线程的信息。

public static IObservable<T> LogWithThread<T>(
    this IObservable<T> observable,
    string msg = "")
{
    return Observable.Defer(() =>
    {
        Console.WriteLine("{0} Subscription happened on Thread: {1}", msg,
            Thread.CurrentThread.ManagedThreadId);

        return observable.Do(
            x => Console.WriteLine("{0} - OnNext({1}) Thread: {2}", msg, x,
                Thread.CurrentThread.ManagedThreadId),
            ex =>
            {
                Console.WriteLine("{0} – OnError Thread:{1}", msg,
                    Thread.CurrentThread.ManagedThreadId);
                Console.WriteLine("\t {0}", ex);
            },
            () => Console.WriteLine("{0} - OnCompleted() Thread {1}", msg,
                Thread.CurrentThread.ManagedThreadId));
    });
}

每当订阅和推送通知,LogWithThread操作符会将信息打印到控制台。对于每条日志信息,都包含事件发生的线程。

现在,看看将SubscribeOnObserveOnLogWithThread一起使记录详细信息的情况。在下面的示例中,创建了一个简单的,推送三个通知(每秒推送一个)的Observable,并使用SubscribeOnObserveOn操作符控制执行上下文。示例中创建了一个推送五个数字的Observable,并添加了一些操作符:

new[] {0,1,2,3,4,5}.ToObservable()
    .Take(3).LogWithThread("A")
    .Where(x => x%2 == 0).LogWithThread("B")
    .SubscribeOn(NewThreadScheduler.Default).LogWithThread("C")
    .Select(x => x*x).LogWithThread("D")
    .ObserveOn(TaskPoolScheduler.Default).LogWithThread("E")
    .SubscribeConsole("squares by time");
Console.ReadLine();

输出结果如下:

E Subscription happened on Thread: 1
D Subscription happened on Thread: 1
C Subscription happened on Thread: 1
B Subscription happened on Thread: 3
A Subscription happened on Thread: 3
A – OnNext(0) Thread: 3
B – OnNext(0) Thread: 3
C – OnNext(0) Thread: 3
D – OnNext(0) Thread: 3
E – OnNext(0) Thread: 4
A – OnNext(1) Thread: 3
A – OnNext(2) Thread: 3
squares by time – OnNext(0)
B – OnNext(2) Thread: 3
C – OnNext(2) Thread: 3
D – OnNext(4) Thread: 3
E – OnNext(4) Thread: 4
squares by time – OnNext(4)
A – OnCompleted() Thread 3
B – OnCompleted() Thread 3
C – OnCompleted() Thread 3
D – OnCompleted() Thread 3
E – OnCompleted() Thread 4
squares by time – OnCompleted()

下图展示了输出的子弹图:

下面是输出的关键点:

  • 订阅的顺序自下而上(E先执行,A最后执行)。这是因为最后一个LogWithThread操作符返回的ObservableObserver正在订阅的对象。
  • 订阅在线程1上执行,直到调用SubscribeOn,然后订阅转到线程3(步骤B)。
  • 通知是自上而下(A是第一个,E是最后一个)。
  • 通知在线程3(订阅发生的线程)上推送,直到调用ObserveOn(就在E之前),然后再线程4上推送通知。
  • 在线程4上观察到通知时,线程3可以自由观察下一个通知。这就是为什么可以同时观察到0和推送2(粗现)的原因。

接下来,将要讨论如何在Observable管道中以及Observable之间同步处理通知。

同步通知

假设Observer观察到的通知以串行方式到达。《Rx设计指南》规定,所有Rx操作符应该完全相信其输入是序列化的。它们不会同时接收到通知,而是会一个接一个地接收。如果没有这个假设,那么几乎所有操作符和每个Observer都应该以线程安全的方式编写,并且使用各种锁来保证其操作的有效性。这样就会对性能造成没必要的重大影响。

但是无法控制所有Observable的订阅。有些Observable可能来自第三方,也可能是建立在未以串行方式运行的源上。对于这些Observable,应该在Observable管道内同步它们的推送。

假设从第三方组件的事件创建了一个Observable

class Messenger
{
    public event EventHandler<string> MessageReceived;

    //Rest of the Messenger code
}

下面是如何创建Observable

var messenger = new Messenger();
var messages =
    Observable.FromEventPattern<string>(
        h => messenger.MessageReceived += h,
        h => messenger.MessageReceived -= h);

下面是如何订阅:

messages
    .Select(evt => evt.EventArgs)
    .Subscribe(msg =>
    {
        Console.WriteLine("Message {0} arrived", msg);
        Thread.Sleep(1000);
        Console.WriteLine("Message {0} exit", msg);
    });

运行示例,然后在多个线程接收到三个消息,如下所示:

Message msg2 arrived
Message msg1 arrived
Message msg0 arrived
Message msg1 exit
Message msg0 exit
Message msg2 exit

很明显,消息是以非序列化的方式接收的。要序列化在Observer(或任何操作符)中收到的通知,需要使用Synchronize操作符:

messages
    .Select(evt => evt.EventArgs)
    .Synchronize()  // 序列化通知,以使它们以序列化的方式接收
    .Subscribe(msg =>
    {
        Console.WriteLine("Message {0} arrived", msg);
        Thread.Sleep(1000);
        Console.WriteLine("Message {0} exit", msg);
    });

现在,无论从哪个线程推送,通知都以序列化的方式接收。在内部,Synchronize操作符为每个通知给Observer创建一个锁。锁被用在内部叫做gate的对象上。

同步多个Observable

Synchronize操作符有一个重载可以传递用于锁的gate对象:

IObservable<TSource> Synchronize<TSource>(
    IObservable<TSource> source,
    object gate);

该重载在需要在多个已订阅的Observable之间共享锁时非常有用。假设Messenger类公开了另一个好友请求的事件(FriendRequestReceived)。创建Observable后,需要同步两种类型的通知(好友请求和消息)的处理。需要怎么做:

var gate = new object();

messages
    .Select(evt => evt.EventArgs)
    .Synchronize(gate)
    .Subscribe(msg => { /* processing the text message */ });

friendRequests
    .Select(evt => evt.EventArgs)
    .Synchronize(gate)
    .Subscribe(request => { /* processing the friend request */ });

现在,好友请求和消息会以序列化的方式接收。

本章讨论了Rx世界的许多高级应用。下面总结一下。

总结

在本章中,介绍了Rx对时间和并发的处理方式,以及用于控制Observable管道和执行上下文的技术。

  • 在Rx中,Scheduler是表示时钟和执行上下文的单元。
  • 使用Scheduler,可以在指定时间将工作调度到指定的执行上下文。
  • Rx中的所有Scheduler实现IScheduler接口。
  • 有些Scheduler还实现了ISchedulerPeriodicISchedulerLongRunning接口。
  • 引入并发的Rx操作符可以接收IScheduler类型的参数,用来控制引入并发的方式。
  • Rx有一些开箱即用的SchedulerNewThreadSchedulerThreadPoolSchedulerTaskPoolSchedulerCurrentThreadSchedulerImmediateSchedulerEventLoopScheduler
  • 根据使用的框架,还有绑定到同步上下文的其他Scheduler(例如,ControlSchedulerDispatcherScheduler)。
  • Timestamp操作符为每个推送的通知添加时间戳。
  • TimeInterval操作符为两个相邻的通知添加时间间隔。
  • Timeout操作符在经过设定的时间后没有推送通知将会推送一个错误通知。
  • Delay操作符可以推移通知一段时间。
  • Throttle操作符在Observable经过指定时间不推送通知时推送最后的那个通知。
  • Sample操作符用指定的时间间隔采集Observable序列,推送时间间隔内最后一个通知。
  • ObserveOn操作符强制Observer函数运行在指定Scheduler上。
  • SubscribeOn操作符强制订阅和取消订阅动作运行在指定Scheduler上。
  • Synchronize操作符通过创建锁来使通知以序列化的方式被接收。

本章中的主题有点高级且复杂,但是本系列中的许多操作符都包含这些。使用它们可以更好的控制Observable管道实现目标。下一章将介绍大家都不喜欢但是必须要注意的问题:错误。由于是不可避免的,所以将展示如何为Observable查询添加错误处理和恢复。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注