[Rx.NET] 4 – 从异步类型创建 Observable

[Rx.NET] 4 – 从异步类型创建 Observable

相信大多数人都不喜欢排队,特别是排长队,总感觉这是在浪费时间,我们完全可以做些其他事情(例如读书)。我比较喜欢那些留下排队号然后可以去做些其他事情(例如购物)的餐馆,当有位置可用或快轮到你的时候能通过短信或蜂鸣器通知你,这种服务就很人性化。

同样,我们的代码有时也会排队来等待某些事情,即我们常叫的同步方式;同样,我们的代码也可以在任务完成并能获取结果时被通知,即我们通常叫的异步方式。编写异步代码对程序的响应(并及时做出反应)至关重要,这也是响应的关键特征。本篇将介绍 .NET 的异步执行模式及其与 Observable 的关系,还会介绍从异步类型创建 Observable 的方法及执行此操作可能遇到的问题。

使用 Rx 桥接 .NET 异步类型

Rx 可以很好的处理异步类型。Observable 和 Observer 接口允许生产者(Observable)以同步或异步的形式运行在任何地方,而消费者(Observer)可以接收和处理它的消息通知。这样做有利于测试和更加灵活,因为可以轻松创建一个虚拟的 Observable 来模拟应用场景,并且很容易在生产者方做出调整而不影响消费者。这种关系如下图所示:

从上图可以看出,订阅到 Observable 的 Observer 不知道通知从哪里推送并且不知道 Observable 是否用是异步方式来推送的。后面我们会讲到有时在指定线程里处理这些通知是必须的,例如,在 WPF 和 WinForm 程序中,只能在 UI 线程中修改 UI 控件,所以对于那些需要修改 UI 控件的 Observer 来说在 UI 线程里处理通知是必须的。

Rx 提供了用于控制 Observable 管道执行的操作符,这些会在后面的篇幅中进行介绍。下面我们介绍将同步代码通过 Rx 转换为异步代码的例子。

将同步方法转换为异步方法

假设有一个神奇的素数发生器。该发生器能够生成任意多的素数,但是生成素数会花费很长时间。首先,尝试创建一个同步版本,MagicalPrimeNumbersGenerator,代码如下:

class MagicalPrimeGenerator
{
    public IEnumerable<int> Generate(int amount){. . .}
}

Generate 方法需要一个 int 类型参数来指定想要生成的素数个数,返回值为 Enumerable 的素数结果。

现在写一段程序打印结果:

var generator = new MagicalPrimeGenerator();

foreach (var prime in generator.Generate(5))
{
    Console.Write("{0}, ", prime);
}

由于素数生成器生成每个素数都会花费时间(例如 2 秒),所以主线程会阻塞 5 * 2 = 10 秒时间。即使用yield来修饰每个返回值,也会在每个素数间阻塞 2 秒。

另一种方案是将素数的生成整个用异步来实现,接口如下:

Task<IReadOnlyCollection<int>> GenerateAsync(int amount);

现在方法返回值为Task<IReadOnlyCollection<int>>,这意味着该方法会在返回前生成所有所需的素数。

这种改变不会阻塞调用线程,但是站在用户的角度来说会变得更加糟糕,因为在开始处理结果前需要等待很长时间来生成所有素数。例如,前面打印 5 个素数的程序将会在打印第一个数之前等待 10 秒钟。不管怎么说,我们已经实现了程序的首个版本。

在这里使用迭代显然不适合,所以接下来将用推模式实现。

创建素数 Observable

如果即使用异步方式的拉模式都不适合,那么就需要使用推模式了。这样,就不必等待所有计算完成才能查看进度了。既然不能创建 Enumerable 的素数集合,那么就应该创建一个每生成一个素数就推送的 Observable。接口如下:

public IObservable<int> GeneratePrimes(int amount)

该方法仍然接收一个指定素数数量的参数,但是返回结果为IObservable<int>类型。

那么下面就是第一次尝试实现:

public IObservable<int> GeneratePrimes(int amount)
{
    return Observable.Create<int>(o =>
    {
        foreach (var prime in Generate(amount))
        {
            o.OnNext(prime);
        }

        o.OnCompleted();

        return Disposable.Empty;
    });
}

在实现 Observable 时使用了Create操作符。由于Create内的代码仍然是同步的,所以 Observer 订阅时会收到所有素数,并且Subscribe方法在生成完成前不会返回。可以通过下面的代码来查看,这里通过Timestamp操作符来显示每个通知的时间戳。

var generator = new MagicalPrimeGenerator();

var subscription = generator
    .GeneratePrimes(5)
    .Timestamp()
    .SubscribeConsole("primes observable");

Console.WriteLine("Generation is done");
Console.ReadLine();

结果如下:

primes observable – OnNext(2@01/08/2015 12:50:02 +00:00) primes observable – OnNext(3@01/08/2015 12:50:04 +00:00) primes observable – OnNext(5@01/08/2015 12:50:06 +00:00) primes observable – OnNext(7@01/08/2015 12:50:08 +00:00) primes observable – OnNext(11@01/08/2015 12:50:10 +00:00) primes observable – OnCompleted()
Generation is done

Timestamp操作符用System.Reactive.Timestamped<TSource>封装了 Observable 推送的通知,该类型有两个属性,一个是保存了通知内容的Value,另一个是保存了通知生成时间的Timestamp。并且该类重写了ToString方法来返回 Value@Timestamp 格式的字符串。

可以看到 Generation is done 消息在生成所有素数后打印,同时可以看到两个素数间的间隔有 2 秒。让我们修复 Observable 来使其不阻塞,为此需要将内部生成代码运行在 Task 中。

由于 Observable 的代码将会并发运行,那么 Observer 取消订阅的能力就十分重要了。之前,当 Observer 订阅 Observable 时推送是立即开始的(即同步),所以 Observer 没有机会取消订阅。现在 Observer 可以在任何时候取消订阅,为此,需要将每次迭代都进行检查是否取消的CancellationToken附加到返回的Disposable上,代码如下:

public IObservable<int> GeneratePrimes(int amount)
{
    var cts = new CancellationTokenSource();

    return Observable.Create<int>(o =>
    {
        Task.Run(() =>
        {
            foreach (var prime in Generate(amount))
            {
                cts.Token.ThrowIfCancellationRequested();
                o.OnNext(prime);
            } 
        }, cts.Token);

        return new CancellationDisposable(cts);
    });
}

GeneratePrimes版本中,添加了 Task。现在代码在后台运行,并且可以取消订阅。为此,新建了CancellationTokenSource实例,并且用CancellationDisposable封装后返回,当用户释放时,CancellationTokenSource会同时被取消,由于在每次迭代都检查是否取消,所以会停止循环。

这种实现模式具有一定的通用性,所以 Rx 提供了一个Observable.Create的重载来简化该操作,如下所示:

IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> 
  subscribeAsync);

Create的异步版本有一个Func类型的对于每次订阅都执行一遍的方法参数,即subscribeAsync。该方法有两个参数:订阅的 Observer 和用于取消订阅的CancellationToken。其返回值为包含异步代码的 Task。

下面是使用该方法简化GeneratePrimes的代码:

public IObservable<int> GeneratePrimes (int amount)
{
    return Observable.Create<int>((o, ct) =>
    {
        return Task.Run(() =>
        {
            foreach (var prime in Generate(amount))
            {
                ct.ThrowIfCancellationRequested();
                o.OnNext(prime);
            }

            o.OnCompleted();
        });
    });
}

上面的代码和之前的很相似,只是这次不用创建CancellationTokenDisposable返回值。

复习一下前面的知识:
1. Create操作符有一个异步方法参数subscribeAsync
2. subscribeAsync方法在每次订阅时都会被执行,并且该方法必须返回一个表示异步操作的 Task
3. subscribeAsync方法有一个用来表示取消订阅的CancellationToken参数
4. subscribeAsync函数内部是用来推送消息的代码

接下来将通过组合使用 Observable 和async-await来展示 Rx 的强大。

在创建 Observable 时使用 async-await

Create的异步版本可以在subscribeAsync内部代码内使用 async-await。通过组合使用 Observable 和async-await,才能体会到 Rx 真正的价值。可以在获取到有意义的结果时就推送信息,而不是要等到两个或多个独立的异步操作完成后将它们的结果组合为一个整体来使用。

下图展示了 Observable 从两个搜索引擎用 async 调用获取结果并推送的例子:

代码如下:

IObservable<string> Search(string term)
{
    return Observable.Create<string>(async o => 
    {
        var searchEngineA = ...
        var searchEngineB = ...

        var resultsA = await searchEngineA.SearchAsync(term);
        foreach (var result in resultsA)
        {
            o.OnNext(result);
        }

        var resultsB = await searchEngineB.SearchAsync(term);
        foreach (var result in resultsB)
        {
            o.OnNext(result);
        }

        o.OnCompleted();
    });
}

上面的代码,Search方法用Observable.Create的异步版本创建了 Observable。用 lambda 表达式来写的subscribeAsync使用了 async-await并且返回一个 Task。代码很简单,依次调用两个搜索引擎,然后将结果推送出去。

将 Task 转换为 Observable

前面使用Observable.Create异步订阅方法的的例子有待改进。不难发现例子中存在着重复代码,对于每个搜索引擎,都调用了查询方法然后遍历结果。

为改进代码,可以使用 Task 与 Observable 之间转换的优势;可以将任务看做只有一个数据项的 Observable(不返回时没有数据项)。为此,只需要使用 Task 上的扩展方法ToObservable

每个搜索引擎的搜索方法返回值都为Task<IEnumerable <string>>。所以将其转换为 Observable 后结果为IObservable<IEnumerable <string>>,仍然不是最后需要的IObservable<string>类型,因此我们需要多做一步。

下面是转换的代码:

IObservable<string> Search (string term)
{
    var searchEngineA = new SearchEngineA();
    var searchEngineB = new SearchEngineB();

    IObservable<IEnumerable<string>> resultsA =
        searchEngineA.SearchAsync(term).ToObservable();

    IObservable<IEnumerable<string>> resultsB =
        searchEngineB.SearchAsync(term).ToObservable();

    return resultsA
        .Concat(resultsB)
        .SelectMany(results => results);
}

下面解释一下该方法,该方法首先调用了搜索方法并将其转换为 Observable,这里比较简单。

现在已经有了两个 Observable,那么就可以用它们做很多事情。由于想要保持顺序执行,即当推送完第一个搜索引擎的结果后再推送第二个搜索引擎的结果,所以需要使用Concat操作符:

IObservable<TSource> Concat<TSource>(
    this IObservable<TSource> first,
    IObservable<TSource> second)

Concat接收两个或更多(根据重载的不同) Observable 然后返回一个 Observable,该 Observable 将输入 Observable 串联起来,根据输入顺序推送它们的结果。推送完第一个 Observable 的全部结果,然后推送第二个 Observable 的全部结果,以此类推。如下图所示:

串联 Observable 后,得到了IObservable<IEnumerable <string>>类型的结果,这里需要展开这些 Enumerable 类型的结果,并且依次推送它们的数据项,最终得到IObservable<string>类型的 Observable。这时就需要用到SelectMany操作符。用到的接口如下:

IObservable<TResult> SelectMany<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IEnumerable<TResult>> selector)

SelectMany将 Observable 的每个元素映射为可枚举序列,然后将可枚举序列连接为一个 Observable。SelectMany需要一个Func<TSource, IEnumerable<TResult>>类型的选择器,该选择器被 Observable 的每一项调用,然后为该项返回一个集合,集合内的数据项最终会在结果 Observable 中被推送。如下图所示:

返回我们的例子,串联了从搜索引擎创建的 Observable,并且每个 Observable 都携带了一个搜索结果的集合。当使用SelectMany(results => results)后,其返回 Observable 的就变成了推送搜索结果的 Observable。

SelectMany除了可以展开 Observable 内的集合。在将异步代码作为 Observable 管道的一部分来运行时也有很重要的作用。

将异步代码作为管道的一部分来运行

运行异步代码不仅对 Observable 有利,并且对组成管道的内部操作符的运行也很有帮助。如下图所示:

这样,就不用在生成推送数据时阻塞管道了,并且能使 Observable 在推送数据的同时在后台继续运行。

正如上图所示,数据项从管道的Operator 1开始,然后经过运行异步操作的Operator 2,最后由Operator 3来处理那些异步处理结束后的结果。

下面的例子假设想要用Where操作符来通过异步服务过滤并获取那些是素数的结果:

var svc = new PrimeCheckService();
var subscription = Observable.Range(1, 10)
    .Where(async x => await svc.IsPrimeAsync(x)) //这里不会编译通过
    .SubscribeConsole("AsyncWhere");

Where操作符从参数方法中获取一个 Boolean 值用于判断该项是否能够通过并推送到后面的 Observable。但是IsPrimeAsync方法返回一个Task<bool>类型值,所以需要等待方法完成。不幸的是,Where操作符(及其他大多数操作符)都不支持 Task,这就是为什么代码不能通过编译的原因。不要失望,我们可以解决这个问题。

你应该还记得,Task 可以转换为 IObservable,所以 Task 能够转换为 IObservable,这就能使Where操作符正常工作了。

下面就是这个神奇的解决方式:
1. 每个数字都需要运行IsPrimeAsync方法
2. 将返回的 Task 结果转换为 Observable
3. 合并创建的 Observable,同时保存每个 Observable 的源(被检查的数字)
4. 仅仅允许推送 true(即数字为素数) 结果的 Observable 到后面的管道

听起来有点复杂,但是使用SelectMany会简化该过程。在前面的例子中,SelectMany操作符通过selector选择器函数展开集合。下图展示了SelectMany的另一个重载,虽然乍一看有点复杂,但是在这里很有用。

SelectMany操作符重载与之前的功能相同,但是在这里不是展开 Observable 的集合,而是将自己映射到 Task,使用源数据和 Task 的返回结果调用结果选择器,并将结果合并到 Observable。

IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(
    this IObservable<TSource> source,
    Func<TSource, Task<TTaskResult>> taskSelector,
    Func<TSource, TTaskResult, TResult> resultSelector)

使用该重载,之前的例子可以像下面这样写:

subscription =
    Observable.Range(1, 10)
        .SelectMany((number) => svc.IsPrimeAsync(number),
                    (number, isPrime) => new {number, isPrime})
        .Where(x => x.isPrime)
        .Select(x => x.number)
        .SubscribeConsole("primes")

结果如下:

primes – OnNext(1)
primes – OnNext(2)
primes – OnNext(3)
primes – OnNext(5)
primes – OnNext(7)
primes – OnCompleted()

这个优雅的实现方式需要花点时间来理解。让我们一步步的回顾一下。

在例程中,创建了一个推送 1 至 10 的简单 Observable。SelectMany接收到的每项数据都调用异步方法IsPrimeAsync。该方法返回一个跟其他查询无关的 Task,但是我们需要的是 Task 的返回值,所以SelectMany在后台等待 Task 的结果。当结果准备好后,SelectMany的转换函数就会被调用。在这里,转换函数接收用于检查的数字和检查结果这两个参数,然后在函数内将这两个值组合在一个对象里。该对象会在返回的 Observable 里推送,然后由WhereSelect操作符接收。

下图为整个过程:

关于上图有一点需要指出,我们将每个IsPrimeAsync方法返回的 Task 描述为了一个 Observable,该 Observable 仅推送一个数据项,然后就完成了,不再推送任何数据。

坦诚的说:不是每个人都喜欢这样的写法。幸运的是该过程可以用查询表达式来写,如下:

IObservable<int> primes =
    from number in Observable.Range(1, 10)
    from isPrime in svc.IsPrimeAsync(number)
    where isPrime
    select number;

primes.SubscribeConsole("primes");

该查询与之前的相同,并会转换为相同的方法链。理解内部原理十分重要,这会使你更好的控制它们,下面我们介绍更复杂的方法。此外,并非所有东西都可以转换为查询表达式,所以了解如何使用SelectMany操作符十分有用。

现在我们开始讨论另一点,当IsPrimeAsync方法对于检查不同的数字花费不同的时间时,你能够预测打印的结果吗?例如,如果检查 3 的时间比检查 4 的时间要长,你能预测结果吗?

控制结果顺序

SelectMany按照数据推送的顺序映射到 Task,但是在返回的 Observable 中数据是按照 Task 完成顺序推送的,这就造成了结果顺序与源数据顺序的不一致。

例如,运行前面的素数检查时检查数字 3 的时间比其他数字长几秒钟。

//配置服务,当检查数字 3 时延迟返回
var svc = new VariableTimePrimeCheckService(numberToDelay: 3);

IObservable<int> primes =
    from number in Observable.Range(1, 10)
    from isPrime in svc.IsPrimeAsync(number)
    where isPrime
    select number;

primes.SubscribeConsole("primes - unordered");

输出结果如下:

primes – unordered – OnNext(1)
primes – unordered – OnNext(2)
primes – unordered – OnNext(7)
primes – unordered – OnNext(5)
primes – unordered – OnNext(3)
primes – unordered – OnCompleted()

数字 3 在最后输出。

可能不总是希望结果的顺序与源数据的顺序相同,但是当有这种需求时,SelectMany操作符没有提供该功能。反而,之前使用过的Concat的高级用法可以实现。

Concat有一个重载作用于 Task 类型的 Observable,并且按照 Task 在 Observable 中的顺序推送其结果。第一个 Task 的结果先推送,然后是第二个 Task 的结果,以此类推,无论第一个 Task 在第二个 Task 完成之后多长时间完成都不会改变顺序。这正是我们需要的,接口如下:

static IObservable<TSource> Concat<TSource>(this IObservable<Task<TSource>> sources)

接下来就是创建用于填充Concat操作符参数的 Task 类型的 Observable。Select操作符就能胜任这个工作,它能将数据项映射为其他的形式,现在可以用该操作符返回IsPrimeAsync(number)的结果:

IObservable<Task<bool>> observable =
    Observable.Range(1, 10)
              .Select(number => svc.IsPrimeAsync(number));

但是有一个问题,Observable 的类型为IObservable<Task<bool>>,也就是丢失了原始数据。

为保留原始数据,将 Task 和原始数据封装为一个新的 Task 来推送。最终代码如下:

IObservable<int> primes =
    Observable.Range(1, 10)
             .Select(async (number) => new 
             { 
                 number, 
                 IsPrime = await svc.IsPrimeAsync(number) 
             })
             .Concat()
             .Where(x => x.IsPrime)
             .Select(x => x.number);

结果如下:

primes – OnNext(1)
primes – OnNext(2)
primes – OnNext(3)
primes – OnNext(5)
primes – OnNext(7)
primes – OnCompleted()

lambda 表达式使用了 async-await 模式,在内部,创建了一个有两个属性(原始数字和用IsPrime异步检查的结果)的匿名类型。所以,lambda 表达式的返回结果为内部创建的 Task 类型的匿名类型。

现在Concat操作符作用于推送新匿名类型的 Observable 上。在 Visual Studio IntelliSense 中新匿名类型为'a,如下图所示:

可以看到Concat操作符作用在IObservable<Task<'a>>上,而返回的结果是IObservable<'a>。看起来已经解决了问题,但是如果有一个任务永远不会完成会怎么样?该系统会发生什么?

在内部,Concat运算符必须保存那些 Task 已经完成但是还没有轮到其推送的结果。例如,源 Observable 推送了 5 个 Task,并且后面 4 个先完成了,但是第一个 Task 需要花费 1 个小时才能完成,那么后面的 4 个结果就需要保存在内存里直到第一个 Task 完成。

如果其中一个 Task 永远不会完成(例如陷入死循环或死锁),Concat就会造成内存泄漏。所以在使用异步执行时最好不要依赖于结果顺序。

现在,已经可在 Observable 管道内添加异步执行了。当不需要按顺序获取结果时,可以使用SelectMany;当需要按顺序获取结果时,可以使用Concat

创建周期性 Observable

初始使用 Rx 来开发时常见的一个需求是创建一个周期性(例如每 2 秒)推送结果的 Observable。在命令式编程中,可以使用 Timer。

Rx 提供了两个操作符用来创建周期性和定时性 Observable。

根据时间间隔推送

Interval操作符用于创建按照时间间隔定时产生值的 Observable:

static IObservable<long> Interval(TimeSpan period)

Interval操作符创建一个定时推送下一个 long 类型数据(从 0 开始)的 Observable,时间间隔由period参数设定。下图是间隔 1 秒的弹珠图:

间隔 1 秒意味着第一个值在 1 秒后推送,然后第二个值在推送第一个值后 1 秒推送,以此类推。

下面代码展示了怎么使用Interval操作符创建一个每分钟从网络服务上获取信息并显示在 ListBox 中的 WPF 窗口。

public partial class MessagesWindow : Window
{
    private IDisposable _subscription;

    public MessagesWindow()
    {
        InitializeComponent();

        var updatesWebService = new UpdatesWebService();

        _subscription = Observable
            .Interval(TimeSpan.FromMinutes(1))
            .SelectMany(_ => updatesWebService.GetUpdatesAsync())
            .SelectMany(updates => updates)
            .ObserveOnDispatcher()
            .Subscribe(/*an observer the update the ListBox*/);
    }
}

正如上面的代码,在 Observable 上定期调用Web服务十分简单优雅。这里忽略ObserveOnDispatcher操作符,后面会进行讲解。至此,我们都有意回避了一个重点,Interval产生的值来自哪里,来自哪个线程?在后面的篇幅我们会讲解 Rx 的并发模式,同时会讲解Interval和其他与时间有关的操作符之间的联系。现在,只要知道默认情况下Interval运行在与 Observer 订阅时所在线程不同的线程中就可以了。在 WPF 和其他 GUI 框架中,修改 UI 控件的代码只能运行在 UI 线程里,ObserveOnDispatcher操作符可以保证 Observer 的代码运行在 UI 线程(通过 WPF 的 Dispatcher)。

重要的一点是使用Interval操作符时两次推送间的间隔是相同的,包括第一次推送。Timer操作符具有更大的灵活性。

创建 Observable 定时器

有时可能想要创建一个定期推送值的 Observable,但是对于第一次推送值的时间可能需求不同,也许想要立即推送第一个值,也许想要延迟一段时间再推送。这时可以使用Timer操作符:

static IObservable<long> Timer(TimeSpan dueTime, TimeSpan period)

下图展示了一个例子,该例子在订阅后 2 秒推送第一个值,然后每隔 1 秒推送下一个值:

可以看出Interval就是dueTimeperiod相同的特殊Timer

Timer还有可以用绝对或相对时间来计划什么时候推送数据的重载。

用 Timer 定时推送

Timer还可以用来定时推送单个值(long 类型的 0)。

static IObservable<long> Timer(TimeSpan dueTime)
static IObservable<long> Timer(DateTimeOffset dueTime)

正如上面的接口,Timer有两个重载方法来接收定时时间用于推送单值。这两个重载的区别是由提供的时间类型来区分的:

  • 相对时间 — 由TimeSpan定义,表示从订阅时刻开始的时间。例如,5 秒钟
Observable.Timer(TimeSpan.FromSeconds(5))
  • 绝对时间 — 由DateTimeOffset定义,表示具体时间,与订阅时间无关。例如,7月4日或今天零点
Observable.Timer(DateTimeOffset.Parse("00:00:00"))

将预先配置指定时间推送的 Observable 与其他 Observable 结合使用很有用。下面是使用Switch操作符在 5 秒后切换 Observable 的例子:

IObservable<string> firstObservable =
    Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Select(x => "value" + x);
IObservable<string> secondObservable =
    Observable
        .Interval(TimeSpan.FromSeconds(2))
        .Select(x => "second" + x)
        .Take(5);

Observable<IObservable<string>> immediateObservable = Observable.Return(firstObservable);

//Scheduling the second observable emission
IObservable<IObservable<string>> scheduledObservable =
    Observable
        .Timer(TimeSpan.FromSeconds(5))
        .Select(x => secondObservable);

immediateObservable
    .Merge(scheduledObservable)
    .Switch()
    .Timestamp()
    .SubscribeConsole("timer switch");

结果如下:

timer switch – OnNext(first0@10/08/2015 20:30:52 +00:00)
timer switch – OnNext(first1@10/08/2015 20:30:53 +00:00)
timer switch – OnNext(first2@10/08/2015 20:30:54 +00:00)
timer switch – OnNext(first3@10/08/2015 20:30:55 +00:00)
timer switch – OnNext(first4@10/08/2015 20:30:56 +00:00)
timer switch – OnNext(second0@10/08/2015 20:30:58 +00:00)
timer switch – OnNext(second1@10/08/2015 20:31:00 +00:00)
timer switch – OnNext(second2@10/08/2015 20:31:02 +00:00)
timer switch – OnNext(second3@10/08/2015 20:31:04 +00:00)
timer switch – OnNext(second4@10/08/2015 20:31:06 +00:00)
timer switch – OnCompleted()

可以看到 5 秒钟之后,第二个 Observable 开始推送值(每 2 秒),因此从第一个 Observable 切换到了第二个 Observable。Switch将在后面的篇幅讲解。

想要了解 Rx 中更多有关异步代码执行和处理的信息:如何在管道中间切换到另一个线程并在稍后切换回原线程(如在 UI 中),并且这些转换是如何影响管道的构建和结果的获取的?这些话题将在后面的篇幅中进行介绍,但是现在你至少可以轻松创建 Observable 并在程序中使用异步了。

总结

  • Rx 的 Observable 对推送数据的源的线程进行了抽象,所以 Observer 不需要关心通知的来源
  • 可以通过Observable.CreateObservable.Defer的重载方法创建执行异步操作的 Observable。这些重载方法可以接收使用 async-await 方式的异步方法。
  • Rx 可以提供一个CancellationToken用来取消任务或取消订阅,该CancellationToken可以附加到订阅时返回的Disposable
  • 可以用ToObservable转换像 Task 这样的异步类型为 Observable
  • 要将异步代码作为管道中操作符的一部分运行,可以使用能够等待异步代码(可以表示为 Task 或其他 Observable)的SelectMany操作符,然后继续后面的管道或推送。
  • Concat操作符可以使异步代码执行后的结果顺序与源数据推送时的顺序相同
  • 可以使用Inteval创建定时推送数据的 Observable,如果想要控制第一次推送的时间可以使用Timer

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注