[Rx.NET] 5 – 控制 Observer 与 Observable 之间的关系

[Rx.NET] 5 – 控制 Observer 与 Observable 之间的关系

假设你是一位拥有美妙声音,完美歌词,最佳表演的歌手,如果没有粉丝听众当然赚不到钱。对于 Observable 来说是一样的:如果没有 Observer 订阅的话,Observable 就什么也不做,仅仅呆在那里浪费资源。歌手和听众之间的关系何时开始何时结束也是值得研究的,尤其是当播放的是音乐专辑,听众可以跳过或结束播放时。

本篇介绍用于创建 Observer 的方法 和 Observer 必须要实现的重要操作。Observer 与 Observable 之间的订阅关系需要你来维护,你可以控制它何时开始,可以通过通知数量或时间长度控制持续时间,或者两者结合,或者其他一些复杂的逻辑。所有这些都会在本篇介绍一系列操作符时讲到。

创建 Observer

Observer 是 Observable 通知的消费者。可能有很多 Observer 订阅到一个 Observable,也可能是一个 Observer 订阅到很多个 Observable。如下图所示:

我们使用 Rx 的目的是简化事件处理,因此在这一部分将展示如何创建 Observer,你可以从中选取适合你的方法。

在这里回顾一下 Observer 在 Observer 与 Observable 之间的通信协议中扮演的角色。

Observable 与 Observer 之前的通信

通信协议如下图:

下面是说明:

1. 通信从 Observer 订阅 Observable 开始
2. Observable 返回了一个实现了IDisposable接口的订阅对象
3. 一旦订阅成功,Observable 就可以推送不同种类的通知。例如:
  • Observable 可以通过调用 Observer 的OnNext方法并将消息作为其参数来推送任意数量的通知
  • 当 Observable 完成时,即没有消息要推送,可以通过OnCompleted方法通知 Observer
  • 当 Observable 产生了一个错误,即也没有消息要推送,可以通过带有Exception对象参数的OnError方法通知 Observer
  • OnCompletedOnError互斥,所以在 Observer 上只能调用其中一个(或者都不调用)
4. 当订阅后,Observer 可以在任何时候取消订阅。Observable 必须保证在取消订阅后不会有任何通知推送到 Observer

在管道内创建 Observer

到目前为止,创建 Observer 最理想的方式是在管道内完成创建,这样将使一切都集中起来以便于阅读和维护。同样这也是创建 Observer 最简单直接的方式,如下图所示:

所用到的仅仅是System命名空间下ObservableExtensions静态类中Subscribe方法的一个重载。

下面是创建并订阅一个 Observer 用来在屏幕上打印结果的例子,这里将 Observer 的所有方法作为Subscribe方法的参数:

Observable.Range(1, 5)
    .Subscribe(
        x => Console.WriteLine("OnNext({0})", x),
        ex => Console.WriteLine("OnError: {0}", ex.Message),
        () => Console.WriteLine("OnCompleted")
    );

上面代码中,使用 lambda 表达式作为Subscribe方法的参数,其作用是当 Observer 收到通知时在屏幕上打印出来。

结果如下:

OnNext(1)
OnNext(2)
OnNext(3)
OnNext(4)
OnNext(5)
OnCompleted

Sunscribe的重载可以接收不同的方法作为参数,并且创建一个使用这些方法作为实现的 Observer。下面是一些重载接口:

IDisposable Subscribe<T>(this IObservable<T> source, 
    Action<T> onNext)

IDisposable Subscribe<T>(this IObservable<T> source,
    Action<T> onNext,
    Action<Exception> onError)

IDisposable Subscribe<T>(this IObservable<T> source,
    Action<T> onNext,
    Action onCompleted)

IDisposable Subscribe<T>(this IObservable<T> source,
    Action<T> onNext,
    Action<Exception> onError,
    Action onCompleted)

从上面的重载可以看到,可以指定 Observer 方法(OnNextOnCompletedOnError)的每一个实现,并且可以仅指定需要的方法(例如,创建一个仅实现OnNext的 Observer)。

使用Subscribe最大的优势是将管道与订阅它的 Observer 连接在了一起。

Subscribe方法简单而强大,尽管使用Subscribe很诱人(仅需要onNext参数),但是这会隐藏很多 BUG。

看下面一个示例,创建了一个管道,对每个数字进行一个简单的数学计算,并故意在两次通知后引发一个异常:

Observable.Range(1, 5)
          .Select(x=> x / (x - 3))
          .Subscribe(x => Console.WriteLine("{0}",x));

就像普通异常一样,当引发异常时会导致程序崩溃。程序崩溃很冷人讨厌,尤其是不知道为什么会崩溃。为避免这种问题,需要实现 Observer 的OnError方法,例如:

Observable.Range(1, 5)
          .Select(x => x / (x - 3))
          .Subscribe(x => Console.WriteLine("{0}", x),
              ex =>{/* do something with the exception */});

现在,异常不会导致程序崩溃了,错误处理代码会捕获异常,并且订阅会被取消。

当然,留空错误处理是可以的,就像一个空的catch块。但是这不是一个好的做法,因为在程序出现 BUG 时会更难定位问题,尤其是在使用了异步的时候。

不实现 OnError 和异步 Observable

在前面的章节,已经了解了如何创建异步 Observable。我们来看一下为上面的例子添加异步操作会发生什么:

Observable.Range(1, 5)
          .Select(x => Task.Run(() => x / (x - 3))) 
          .Concat()
          .Subscribe(x => Console.WriteLine("{0}", x));

Console.WriteLine("Press any key to continue...");
Console.ReadKey();

输出结果如下:

0
-2
Press any key to continue . . .

我们已经知道会有异常发生,但是并没有看到它,甚至都没有注意到它发生了。在这个过程中最令人费解的是,管道突然就停止了输出,这很不友好。

如果创建的 Task 运行时发生了异常(当然不是故意的),并且没有通过 Task 延续或用异常处理await的代码,即使该 Task 已经被系统中止了程序也会继续运行下去。

想要捕获在 Task 中引发的未捕获的异常,可以使用`TaskScheduler.UnobservedTaskException`事件。也可以通过修改 app.config 或 web.config 文件配置来使当该异常发生中止程序(详情请看:http://mng.bz/57Fv )。

强烈建议实现OnError,至少在里面打个日志以便于以后好调试。

用 CancellationToken 代替 IDisposable

Subscribe方法的一个重载接收CancellationToken参数,使用该重载可以用取消机制代替之前的取消订阅机制。例如:

void Subscribe<T>(
    this IObservable<T> source,
    /* OnNext,OnCompleted,OnError 的方法参数 */,
    CancellationToken token)

该重载不返回IDisposable,而是返回void,所以只能用CancellationToken来取消订阅。Rx会监控CancellationToken是否取消了,当取消发生时,它会使用内部的订阅来断开ObserverObservable。如下图所示:

下面的例子,使用CancellationToken在订阅后 5 秒取消订阅:

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Subscription canceled"));

Observable.Interval(TimeSpan.FromSeconds(1))
          .Subscribe(x => Console.WriteLine(x), cts.Token);

cts.CancelAfter(TimeSpan.FromSeconds(5));

使用CancellationToken适用于与系统中的其他使用CancellationToken的部分(例如其他 Task)协调同步工作。

创建 Observer 实例

在前面的部分,使用Subscribe在后台创建 Observer 实例,并没有直观感受创建 Observer。

有时,可能会访问 Observer 实例,也许将 Observer 作为参数传递给方法,也许要在方法内部创建一个 Observer 作为返回值。在 Microsoft StreamInsight (高吞吐量,事件处理工具)中,可以将创建的 Observable 部署到远程服务器,然后就可以创建 Observer 作为参数将其传递给能将 Observer 订阅到远程服务器 Observable 的方法。

为创建一个IObserver接口的实现,最直接的方法是创建一个实现IObserver接口的类,并实现其所有方法,但这是一个容易出错和令人生畏的方法(就像前面创建 Observable 时一样)。

除了创建一个新类,更加值得推荐的方式是使用System.Reactive命名空间下的Observer.Create方法,该方法更简单易用,不容易犯错:

IObserver<T> Create<T>(
     Action<T> onNext,
     Action<Exception> onError,
     Action onCompleted)

就像Observable.Subscribe的那些重载,Observer.Create需要OnNextOnErrorOnCompleted(或者其中一些)方法参数,并返回一个实现了IObserver<T>的对象。

下面是一个简单的例子,仅仅实现了订阅两个 Observable 并打印OnNext的通知信息:

var observer = Observer.Create<string>(x => Console.WriteLine(x));

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(x=>"X"+x)
          .Subscribe(observer);

Observable.Interval(TimeSpan.FromSeconds(2))
          .Select(x => "YY" + x)
          .Subscribe(observer);

运行 5 秒的结果如下:

X0
YY0
X1
X2
YY1
X3

上面的例子里有两个 Observable,一个每 1 秒推送一个值(以 X 开头),一个每 2 秒推送一个值(以 YY 开头)。然后用同一个 Observer 订阅这两个 Observable。弹珠图如下:

大多数情况下不需要创建 Observer 的实例,当需要创建时可以使用 Observer.Create方法,这可以使实现更简单易用并避免出错。

当创建 Observer 并订阅到 Observable 后,可能需要在某一时间取消订阅,此外,还有可能希望更好的控制什么时候开始订阅,对此,Rx 提供了控制 Observer 生命周期的工具。

控制订阅的生命周期

使用Subscribe方法将 Observer 订阅到 Observable 十分简单,同样取消订阅也十分简单,只要销毁(dispose)订阅即可。但是,可以调整 Observable 管道,使订阅推送在指定时间(指定条件)开始,在指定时间(指定条件)结束。如下图:

延迟订阅

Subscribe方法会立即将 Observer 订阅到 Observable,并开始推送数据。有时候,可能需要延迟这种订阅,例如,当你计划旅行时,希望订阅航班期间的天气情况而不是订票时的天气情况。

DelaySubscription操作符需要一个TimeSpanDateTimeOffset类型参数来提供订阅时间点,下面是一个例子:

Console.WriteLine("Creating subscription at {0}", DateTime.Now);

Observable.Range(1, 5)
          .Timestamp()
          .DelaySubscription(TimeSpan.FromSeconds(5))
          .SubscribeConsole();

结果如下:

Creating subscription at 06/09/2015 00:00:00
OnNext(1@05/09/2015 00:00:05 +00:00)
OnNext(2@05/09/2015 00:00:05 +00:00)
OnNext(3@05/09/2015 00:00:05 +00:00)
OnNext(4@05/09/2015 00:00:05 +00:00)
OnNext(5@05/09/2015 00:00:05 +00:00)
OnCompleted()

可以看到,尽管创建订阅发生在 00:00:00,但是到 00:00:05 才开始推送,这就意味着订阅发生在 00:00:05。

下面是DelaySubscription的接口,一个用于相对时间,一个用于绝对时间:

IObservable<TSource> DelaySubscription<TSource>(
    this IObservable<TSource> source,
    TimeSpan dueTime)

IObservable<TSource> DelaySubscription<TSource>(
    this IObservable<TSource> source,
    DateTimeOffset dueTime)

需要注意的是TimeSpan是在订阅后开始计时,不是在调用DelaySubscription后开始计时的,下面的例子可以说明:

Console.WriteLine("Creating the observable pipeline at {0}", DateTime.Now);

 var observable =
    Observable.Range(1, 5)
              .Timestamp()
              .DelaySubscription(TimeSpan.FromSeconds(5));

Thread.Sleep(TimeSpan.FromSeconds(2));

Console.WriteLine("Creating subscription at {0}", DateTime.Now);
observable.SubscribeConsole();

结果如下:

Creating the observable pipeline at 06/09/2015 00:00:10
Creating subscription at 06/09/2015 00:00:12
OnNext(1@05/09/2015 00:00:17 +00:00)
OnNext(2@05/09/2015 00:00:17 +00:00)
OnNext(3@05/09/2015 00:00:17 +00:00)
OnNext(4@05/09/2015 00:00:17 +00:00)
OnNext(5@05/09/2015 00:00:17 +00:00)
OnCompleted()

可以看到在调用Subscribe方法 5 秒后输出结果,而不是在调用DelaySubscription方法后。另一个需要说明的重点是,可以控制何时结束接收通知,下面开始介绍。

定时停止推送

如果想在某一时刻结束订阅,除了使用Time,还有一个更简单的方法,就是使用TakeUntil操作符。

TakeUntil操作符需要一个DateTimeOffset类型的绝对时间参数。当时间一到,Observable 就会通知 Observer 其已经完成。没有相对时间的重载,但是可以使用绝对时间来实现该功能,例子如下:

Observable.Timer(DateTimeOffset.Now,TimeSpan.FromSeconds(1))
          .Select(t => DateTimeOffset.Now)
          .TakeUntil(DateTimeOffset.Now.AddSeconds(5))
          .SubscribeConsole("TakeUntil(time)");

结果如下:

TakeUntil(time) – OnNext(07/09/2015 10:00:10 +03:00)
TakeUntil(time) – OnNext(07/09/2015 10:00:11 +03:00)
TakeUntil(time) – OnNext(07/09/2015 10:00:12 +03:00)
TakeUntil(time) – OnNext(07/09/2015 10:00:13 +03:00)
TakeUntil(time) – OnNext(07/09/2015 10:00:14 +03:00)
TakeUntil(time) – OnCompleted()

操作符接口如下:

IObservable<TSource> TakeUntil<TSource>(
    this IObservable<TSource> source,
    DateTimeOffset endTime)

当其他 Observable 推送时完成

Rx 可以组合 Observable 来创建具有强大功能的管道。TakeUntil的一个重载就具有这种能力,该重载可以使 Observable 在当外部 Observable 推送数据时停止推送数据并完成。如下图所示:

如果想要 Observable 在一段时间(不使用绝对时间)之后停止推送,可以使用下面代码的方法:

Observable.Timer(DateTimeOffset.Now,TimeSpan.FromSeconds(1))
          .Select(t => DateTimeOffset.Now)
          .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5)))
          .SubscribeConsole("TakeUntil(observable)");

结果如下:

TakeUntil(observable) – OnNext(07/09/2015 18:39:18 +03:00)
TakeUntil(observable) – OnNext(07/09/2015 18:39:19 +03:00)
TakeUntil(observable) – OnNext(07/09/2015 18:39:20 +03:00)
TakeUntil(observable) – OnNext(07/09/2015 18:39:21 +03:00)
TakeUntil(observable) – OnNext(07/09/2015 18:39:22 +03:00)
TakeUntil(observable) – OnCompleted()

可以看到在首次推送 5 秒之后就完成了。

接口如下:

IObservable<TSource> TakeUntil<TSource, TOther>(
    this IObservable<TSource> source,
    IObservable<TOther> other)

由于 other 可以使用任何类的 Observable,所以可以用任何一个 Observable 来控制。

当然,可以使用更复杂的场景来确定何时停止推送,例如,可以指定当收到某个控制消息时聊天消息 Observer 就会停止”收听”:

IObservable<string> messages = ...
IObservable<string> controlChannel = ...

messages.TakeUntil(controlChannel.Where(m => m == "STOP"))
        .Subscribe(
            msg => {/* add to message screen */ },
            ex => { /* error handling */},
            () => { /* completion handling */});

将 Observable 作为操作符的参数在 Rx 中很常见,并且是一种灵活的控制机制。

跳过通知

有时想要将 Observer 订阅到 Observable,然后在某一条件发生时才开始接收通知。例如,在聊天消息中,可以指定当某一控制消息发送后才开始接收消息。或者,在搜索时如果事先知道第一批数据无效,就可能希望跳过指定数量的结果。尽管Where操作符可以过滤通知,但是在这里并不适用。在条件满足后就会忽略判定条件,后续通知无论是否满足条件都会继续推送下去。

SkipUntil操作符可以控制何时开始接收通知,如下图所示:

假设只有在发送特定控制消息后才会在屏幕上显示指定用户的消息,就像交警只有在接到上级命令时才允许车辆通行一样。例子如下:

IObservable<string> messages = ...
IObservable<string> controlChannel = ...

messages.SkipUntil(controlChannel.Where(m => m == "START"))
        .Subscribe(
            msg => {/* add to message screen */ },
            ex => { /* error handling */},
            () => { /* completion handling */});

SkipUntil操作符有两个重载,一个接收DateTimeOffset绝对时间,另一个使用外部 Observable 触发。

IObservable<TSource> SkipUntil<TSource>(
    this IObservable<TSource> source,
    DateTimeOffset startTime)

IObservable<TSource> SkipUntil<TSource, TOther>(
    this IObservable<TSource> source,
    IObservable<TOther> other)

如果想要跳过指定数量的通知,就需要用到Skip操作符了。

IObservable<TSource> Skip<TSource>(this IObservable<TSource> source, int count)

下面是跳过前 2 个通知的例子:

Observable.Range(1, 5)
          .Skip(2)
          .SubscribeConsole("Skip(2)");

结果如下:

Skip(2) – OnNext(3)
Skip(2) – OnNext(4)
Skip(2) – OnNext(5)
Skip(2) – OnCompleted()

根据条件接收或停止

如果没有用于开始和结束通知的操作符可用,可以使用可配置的TakeWhileSkipWhile的重载,它们可以接收一个条件来指定什么时候开始或结束。

IObservable<TSource> TakeWhile<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)

IObservable<TSource> TakeWhile<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int, bool> predicate)

IObservable<TSource> SkipWhile<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)

IObservable<TSource> SkipWhile<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int, bool> predicate)

示意图如下:


可以组合使用这两个操作符:

Observable.Range(1, 10)
          .SkipWhile(x => x < 2)
          .TakeWhile(x => x < 7)
          .SubscribeConsole();

结果如下:

OnNext(2)
OnNext(3)
OnNext(4)
OnNext(5)
OnNext(6)
OnCompleted()

重新订阅

当一个 Observable 完成后,Observer 就不会再收到通知了,但是有时希望重新订阅到 Observable。

在 Rx 中想要自动重新订阅,可以使用Repeat操作符。例如,可以创建一个推送 1 至 3 两次的 Observable:

Observable.Range(1, 3)
          .Repeat(2)
          .SubscribeConsole("Repeat(2)");

结果如下:

Repeat(2) – OnNext(1)
Repeat(2) – OnNext(2)
Repeat(2) – OnNext(3)
Repeat(2) – OnNext(1)
Repeat(2) – OnNext(2)
Repeat(2) – OnNext(3)
Repeat(2) – OnCompleted()

Repeat操作符有两个重载,当源 Observable 完成时,就会有一个 Observer 再次订阅它:

IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source)

IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source, int repeatCount)

需要注意的是,Repeat方法根据指定次数重复调用Subscribe方法,如果Subscribe有边际效应(例如连接外部源),这也会重复执行。

在管道上添加边际效应

对边际效应最重要的一点是希望其是可见的和可被发现的。目前为止,唯一可以改变状态或调用操作的地方是 Observer 的方法,这也意味着只有在通知最后被接收时才能对其作出响应。这存在着局限性,如果想要做一些简单的事情,例如在操作符间写日志应该怎么办?

为在管道上实现这种操作,Rx 提供了Do操作符,如下图所示:

下面是使用Do操作符写日志的例子:

Observable.Range(1, 5)
          .Do(x=> { Console.WriteLine("{0} was emitted",x); })
          .Where(x => x % 2 == 0)
          .Do(x => { Console.WriteLine("{0} survived the Where()", x); })
          .Select(x => x * 3)
          .SubscribeConsole("final");

上面的例子创建了一个推送 1 至 5 的 Observable,过滤偶数,并给乘以 3。在每个操作符之间都用Do来打印,结果如下:

1 was emitted
2 was emitted
2 survived the Where()
final – OnNext(6)
3 was emitted
4 was emitted
4 survived the Where()
final – OnNext(12)
5 was emitted
final – OnCompleted()

就想SubscribeDo操作符也有一些重载:

IObservable<TSource> Do<TSource>(
    this IObservable<TSource> source,
    Action<TSource> onNext)

IObservable<TSource> Do<TSource>(
    this IObservable<TSource> source,
    Action<TSource> onNext,
    Action onCompleted)

IObservable<TSource> Do<TSource>(
    this IObservable<TSource> source,
    Action<TSource> onNext,
    Action<Exception> onError)

IObservable<TSource>Do<TSource>(
    this IObservable<TSource>source,
    Action<TSource> onNext,
    Action<Exception> onError,
    Action onCompleted)

IObservable<TSource> Do<TSource>(
    this IObservable<TSource> source,
    IObserver<TSource> observer)

更进一步,我们可以扩展一个Log操作符用于打印通知信息:

public static IObservable<T> Log<T>(this IObservable<T> observable, string msg="")
{
    return observable.Do(
        x => Console.WriteLine("{0} - OnNext({1})", msg, x),
        ex =>
        {
            Console.WriteLine("{0} - OnError:", msg);
            Console.WriteLine("\t {0}", ex);
        },
        () => Console.WriteLine("{0} - OnCompleted()", msg));
}

用在上面的例子中如下:

Observable.Range(1, 5)
          .Log("range")
          .Where(x => x%2 == 0)
          .Log("where")
          .Select(x => x*3)
          .SubscribeConsole("final");

结果如下:

range – OnNext(1)
range – OnNext(2)
where – OnNext(2)
final – OnNext(6)
range – OnNext(3)
range – OnNext(4)
where – OnNext(4)
final – OnNext(12)
range – OnNext(5)
range – OnCompleted()
where – OnCompleted()
final – OnCompleted()

组合使用

已经介绍过的所有操作符都可以用来简化复杂事件,尤其是在组合使用时。你是否曾想过可以把鼠标看做 Observable?它是一个能够推送按钮状态和鼠标位置的 Observable。

可以利用此功能创建一个响应式绘图应用程序。程序通过添加鼠标在窗口中的位置来划线,并且在鼠标按钮按下时开始,在鼠标按钮松开时结束。如下图所示:

下面是使用 WPF 来实现的例子。

首先,需要将MouseDownMouseUpMouseMove事件转换为 Observable:

var mouseDowns =
    Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseDown");

var mouseUp =
    Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseUp");

var movements =
    Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove");

现在,对于每一次移动,需要将鼠标位置添加到绘制折线中。

movements
    .Select(m => m.EventArgs.GetPosition(this))
    .Subscribe(pos => line.Points.Add(pos));

现在创建了一个连续绘制永不停止的程序。所以现在需要添加当鼠标按钮释放后停止的条件,可以使用TakeUntil:

movements
    .Select(m => m.EventArgs.GetPosition(this))
    .TakeUntil(mouseUp)
    .Subscribe(pos => line.Points.Add(pos));

这里使用了mouseUp作为参数,当鼠标按键释放时就会使订阅完成。

还需要添加当鼠标按下才开始绘制的操作,可以使用SkipUntil

movements
    .SkipUntil(mouseDowns)
    .Select(m => m.EventArgs.GetPosition(this))
    .TakeUntil(mouseUp)
    .Subscribe(pos => line.Points.Add(pos));

当鼠标按钮按下时,鼠标的移动就会在窗口绘制线,当鼠标按钮释放时绘制停止。

该过程仅仅发生一次。对于该程序,可能希望重复绘制线条。这是就需要使用Repeat

movements
    .SkipUntil(mouseDowns)
    .Select(m => m.EventArgs.GetPosition(this)) 
    .TakeUntil(mouseUp)
    .Repeat()
    .Subscribe(pos => line.Points.Add(pos));

现在就完成了一个绘制线的程序,但是该绘制总是将点添加到同一个折线上,只绘制一条线的程序没有什么用处。

需要修改的地方是当鼠标按下的时候新建一个折线,并将后面的点添加到新折线上。这是就要用到边际效应了,可以使用Do操作符:

movements
    .SkipUntil(mouseDowns.Do(_ =>
    {
        line = new Polyline() {Stroke = Brushes.Black, StrokeThickness = 3};
            canvas.Children.Add(line);
    }))
    .TakeUntil(mouseUp)
    .Select(m => m.EventArgs.GetPosition(this))
    .Repeat()
    .Subscribe(pos => line.Points.Add(pos));

该示例展示了在 Rx 中组合使用操作的优点。仅需要几行代码就可以构建一个绘图程序,而且可以轻松修改构建的管道。

总结

  • 可以使用Subscribe的扩展方法,传入OnNextOnCompletedOnError方法来创建 Observer 并订阅到 Observable
  • 如果要多次使用 Observer,可以使用Observer.Create来创建并手动订阅
  • 可以使用CancellationToken代替销毁订阅来取消订阅,只需要将其传给Subscribe
  • 可以使用DelaySubscription来延迟订阅,TimeSpan用于相对时间,DateTimeOffset用于绝对时间
  • TakeUntil操作符通过指定时间或其他 Observable 来控制何时停止推送
  • SkipUntil操作符通过指定时间或其他 Observable 来控制何时开始推送
  • Skip操作符可以跳过指定数量的通知,Take操作符可以在接收到指定数量的通知后停止推送
  • SkipWhileTakeWhile可以通过条件来控制开始和停止推送
  • Repeat可以重复订阅一个 Observable 指定次数或无限次
  • 可以用Do来实现边际效应

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注