[Rx.NET] 10 – 错误处理和恢复

[Rx.NET] 10 – 错误处理和恢复

发生错误;是编程生活的日常。为了向程序的用户提供高质量的服务,必须确保代码能够处理错误,并在发生错误时能够正常恢复。否则,用户会遇到程序崩溃或错误行为(例如,错误的计算或意外的警告),这些错误最终可能会导致用户放弃产品。如果发生错误,可能需要忽略并继续,或者为特定的错误添加特定的处理。如果Observable定期从中央服务器推送更新,并且其中一个更新引发了意外错误(例如,网络断开连接),那么通过重新订阅以获取下一组更新来处理错误可能是最好的解决方案。本章将介绍那些用来保护Observable管道的各种错误处理操作符。

除了处理错误外,还可以提前预防某些错误,例如可能导致内存泄漏的不正确的资源处理以及未关闭的服务器连接。以比Observer可以处理的速率更高的速率推送叫做背压,这会导致错误和消耗大量资源。本章将介绍如何正确控制资源的生命周期,即使出现意外错误,也能处理背压问题。

应对错误

在.NET的世界,错误就是一个异常,并且很多原因都能够引发异常。有些(例如,OutOfMemoryException)甚至不受控制。区分Observable管道内可能引发异常的不同位置(或阶段)非常重要,因为对于不同位置,需要不同的方式来处理。

在响应式管道中,下面四个位置可能会引发错误:

  • 在订阅时的ObservableSubscribe方法中
  • 订阅后,在Observable准备推送值的代码中(例如,Observable试图从一个断开的外部源拉数据)
  • 在操作符代码中(例如,为Select操作符提供的选择器函数引发的异常)
  • ObserverOnNextOnCompletedOnError函数中

对于前面三种情况,Rx指南规定应该通过OnError函数将错误通知给Observer,并且取消订阅,也就是说Observer将不再接收到通知。

在最后一种情况下,Observer应该负责错误处理,这是Observer(和开发者)应对错误的责任。Rx无法保证在这种情况下会发生什么。

提示

再次强调最后一点,如果Observer函数中的代码引发了异常,那么Px包中没有任何东西可以拯救。因此,如果没有使用try-catch处理“风险”代码,那么调用线程将会引发未处理的异常。这将导致进程终止。与程序中任何其他引发异常的代码一样。唯一的选择是确保代码不会引发不必要的异常。

Observable侧的错误

既然已经了解了必须要明确处理Observer代码中引发的异常,那么接下来就看看上面列表中的其它三种情况,即当异常是由Observable或管道的操作符引发时的情况。

《Rx设计指南》确保将这些地方的错误传递给ObserverOnError函数。这使处理错误变得更简单,因为可以在一个地方对错误进行响应—OnError函数,如下图所示,OnError函数接收一个参数(引发的Exception),因此,可以对其进行检查然后做出响应。

下面的示例创建了一个引发OutOfMemoryException异常的Observable对象。天气模拟程序实现了一个天气预报Observable,其运行数据密集型计算,然后推送结果。由于计算会创建大量存储在内存中的数据,所以存在内存不足的风险。如果引发异常,作为最后的选择,Observer可以同时运行垃圾回收(GC)和大对象堆(LOH)压缩来尝试释放内存。

// 创建引发异常的Observable
IObservable<WeatherSimulation> weatherSimulationResults =
    Observable.Throw<WeatherSimulation>(new OutOfMemoryException());

weatherSimulationResults
    .Subscribe(
        _ => { /* OnNext code */ },
        e =>
        {
            if (e is OutOfMemoryException)
            {
                // 通过调用GC来释放内存
                GCSettings.LargeObjectHeapCompactionMode =
                    GCLargeObjectHeapCompactionMode.CompactOnce;
                GC.Collect();
                GC.WaitForPendingFinalizers();
            }
        });

从开发者的角度来看这个示例,可能会认为对错误的响应不太友好。没错。必须检查异常的类型,并且对每种类型都需要手动干预错误的处理,即使只是需要消除异常。

Rx团队意识到开发者倾向于对Observable管道中发生的错误进行共同处理。这些处理包括捕获特定的异常类型并执行相应的某些操作,或者消除错误并使用源Observable或另一个Observable恢复执行。

Rx团队在确保了Observable管道代码继续保持声明性和间接性的前提下,创建了特定操作符来简化开发者的工作。

捕获错误

在传统的命令式编程模型中,将可能引发错误的代码用try-catch块包裹,并在每个catch块中处理指定的异常类型,或留空表示处理所有异常类型。catch代码完成后,程序将继续工作。

语义上讲,Rx的Catch操作符执行类似的操作,处理特定的异常类型,但是是通过指定一个后备Observable的方式来继续执行。在发生错误时,Observer将订阅到指定的后备Observable(如下图)。

在下面的示例中,改进了上一个示例中的错误处理方式。现在,添加了Catch操作符来处理OutOfMemoryException,并且优雅的关闭了Observable管道:

IObservable<WeatherSimulation> weatherSimulationResults =
    Observable.Throw<WeatherSimulation>(new OutOfMemoryException());

weatherSimulationResults
    .Catch((OutOfMemoryException ex) =>
    {
        Console.WriteLine("handling OOM exception");

        // 返回一个空的Observable,以便于Observer接收到完成通知,而不是错误通知
        return Observable.Empty<WeatherSimulation>();
    })
    .SubscribeConsole("Catch (source throws)");

输出结果如下:

handling OOM exception
Catch (source throws) – OnCompleted()

Catch接收一个处理特定异常类型的函数。该函数返回一个用于继续执行Observable管道的Observable。在该示例中,返回一个空的Observable,以便与Observable管道完成,但是在实际代码中,返回的可能是一个后备Observable用来代替源Observable来推送值。

IObservable<TSource> Catch<TSource, TException>(
    IObservable<TSource> source,
    Func<TException, IObservable<TSource>> handler) where TException : Exception

如果要为Observable管道中可能引发的所有异常返回同一个Observable,可以使用Catch操作符的重载,该重载仅接受发生错误时使用的Observable

一旦天气模拟Observable发生错误,则下一个错误将正常结束:

weatherSimulationResults
    .Catch(Observable.Empty<WeatherSimulation>())
    .SubscribeConsole("Catch (handling all exception types)");
OnErrorResumeNext—变种Catch

如果发生错误,Catch操作符会连接Observable对象。在前面,介绍了Concat操作符。其在第一个Observable完成时连接第二个Observable。扩展Concat操作符很有意义,这样,切换Observable不仅在完成时进行,而且在发生错误时也会进行。这就是OnErrorResumeNext的作用,如下图所示:

下面的示例展示了如何合并来自两个气象站的天气预报。示例展示了,当第一个Observable(A站)因为错误而终止,第二个Observable(B站)就会被连接上。

IObservable<WeatherReport> weatherStationA =
    Observable.Throw<WeatherReport>(new OutOfMemoryException());

IObservable<WeatherReport> weatherStationB =
    Observable.Return<WeatherReport>(new WeatherReport() { Station = "B",
        Temperature = 20.0 });

weatherStationA
    .OnErrorResumeNext(weatherStationB)
    .SubscribeConsole("OnErrorResumeNext(source throws)");

weatherStationB
    .OnErrorResumeNext(weatherStationB)
    .SubscribeConsole("OnErrorResumeNext(source completed)");

运行程序,只有B站的报告显示出来,结果如下:

OnErrorResumeNext(source throws) – OnNext(Station: B, Temperature: 20)
OnErrorResumeNext(source throws) – OnCompleted()
OnErrorResumeNext(source completed) – OnNext(Station: B, Temperature: 20)
OnErrorResumeNext(source completed) – OnNext(Station: B, Temperature: 20)
OnErrorResumeNext(source completed) – OnCompleted()

借助CatchOnErrorResumeNext操作符,进行串联的Observable可能就是引发异常的源Observable。当发生错误时,Observer将会重新订阅Observable。从概念上讲,意味着想要重试该操作;但是,可能希望限制重试的次数或者显示设置重试次数为无穷大。为了使设置重试次数更方便,可以使用Retry操作符。

发生错误时重新订阅

Retry操作符在发生错误时将Observer重新订阅到Observable。请记住,Rx准则规定当发生错误时,ObserverObservable之间的订阅将断开。如果Observable是“冷”的(意味着Observer之间不共享通知,例如从文件中逐行读取的Observable),Retry操作符会导致重新订阅,然后Observable将重新生成序列,最后将可能会再次导致失败。如果Observable是“热”的,那么重新订阅将会使Observer接收后续的通知。

在下面的示例中,Observable模拟从气象站接收天气预报。由于短暂的错误(例如,网络接收速度慢),可能导致与站点的连接失败,那么重试是最好的选择。当然,错误可能不是暂时的,因此需要限制重试的次数(这里限制为3次),如下图所示:

IObservable<WeatherReport> weatherStationA =
    Observable.Throw<WeatherReport>(new OutOfMemoryException());

weatherStationA
    .Log()    // 为每个通知添加日志信息
    .Retry(3) // 限制重试次数
    .SubscribeConsole("Retry");

输出结果如下(处于可读性考虑,删除了一些输出):

- OnError:
System.OutOfMemoryException: Insufficient memory to continue …
- OnError:
System.OutOfMemoryException: Insufficient memory to continue …
- OnError:
System.OutOfMemoryException: Insufficient memory to continue …
Retry – OnError:
System.OutOfMemoryException: Insufficient memory to continue …

可以看到错误被抛出了四次。由于原始错误和前两次重置,导致打印前三次消息,最后一条消息来自于上次重试,这种情况下,又导致了错误,并且被ObserverOnNext函数接收。

提示

如果将Retry操作符留空(不传递数字),那么将进行无限次重试。

接下来,将介绍如何控制Observable管道中使用的资源的生命周期,以便于可以正确的释放。

控制资源的生命周期

为了建设Observable管道,分配和使用了不同的资源。包括占用不被管理的内存和资源(例如文件的句柄或外部服务的连接)的原语和简单对象。当Observable完成时,无论是推送完成还是取消了订阅,释放已分配的资源都非常重要。发生错误时,处理这些资源的分配还要更加重要;否则,可能无法从错误中恢复(例如,文件可能由于未释放其句柄儿被锁定)。

好的一面是Rx操作符会处理好自己并且清理其所使用的的资源。因此,只需要注意在Observable管道中创建和使用的资源。

显式释放资源

在.NET中,GC以一种不确定的方式释放托管对象。即使不再使用某个对象(没有根引用),该对象也可以在内存中保留很长时间,直到GC运行为止。某些托管对象可能会使用非托管资源,例如连接或文件句柄,这种情况下,不再使用这些资源时尽快处理十分重要。这就使得释放具有确定性。

在.NET中,可以在使用资源的类上实现IDisposable接口,并在Dispose方法中实现释放资源的代码,来实现显示释放资源。在运行时,当不再使用资源(以及其封装对象),可以调用Dispose方法释放资源。当然,GC回收封装对象或资源使用的其它对象的托管内存(垃圾回收本质上是不确定的)。

在C#中,使用实现了IDisposable接口的类最简单且安全的方法时使用using语句:

class DisposableType : IDisposable
{
    public void Dispose() { /* 释放资源 */ }
}

private static void TraditionalUsingStatement()
{
    using (var disposable = new DisposableType())
    {
        // 其他代码
    }
}

当执行到代码块末尾时,即使代码块内引发了异常,也会自动调用Dispose方法。

或者,可以直接调用Dispose方法而不使用using语句。通常,在创建可释放对象的位置和使用该对象的位置不同时,才这么做。

由于希望在Observable管道内使用与显示释放一样的语义,所以Rx提供了Using操作符,其与using语句作用类似。

在下面的简单示例中,假设有一个来自温度传感器的Observable,并且试图跟踪代码中出现的问题。这里希望将通知写入日志文件,以便于后面进行分析。当处理文件时,需要在完成时关闭文件;否则,其他人将无法使用该文件,并且没有刷新的数据也会消失。确保文件句柄释放的方式如下:

string logFilePath = ...
IObservable<SensorData> sensorData = ...

var sensorDataWithLogging =
    Observable.Using(() => new StreamWriter(logFilePath), // 创建资源的工厂函数
        writer =>  // 创建使用资源的Observable的工厂函数
        {
            return sensorData.Do(x => writer.WriteLine(x.Data));
        });

sensorDataWithLogging.SubscribeConsole("sensor");

Using操作符的用法与using语句相似,先创建资源,然后在块内使用资源。主要区别在内部块(第二个参数)需要返回使用资源的Observable,如下图所示。

Using操作符接收两个参数:第一个是资源工厂(创建可释放对象的函数),第二个是Observable工厂(接收可释放对象并返回使用它的Observable对象的函数)。

Using操作符返回一个Observable,其封装了每次Observer订阅时调用资源工厂然后调用Observable工厂的过程。Using操作符会在Observable终止时释放资源,无论其终止的原因是什么。

下面是一个证明该点的例子:

Subject<int> subject = new Subject<int>();
var observable =
    Observable.Using(
        () => Disposable.Create(()=>{ Console.WriteLine("DISPOSED"); }),
        _ => subject);

Console.WriteLine("Disposed when completed");
observable.SubscribeConsole();
subject.OnCompleted();

Console.WriteLine("Disposed when error occurs");
subject = new Subject<int>();
observable.SubscribeConsole();
subject.OnError(new Exception("error"));

Console.WriteLine("Disposed when subscription disposed");
subject = new Subject<int>();
var subscription =
    observable.SubscribeConsole();
subscription.Dispose();

在资源工厂中,创建一个可释放对象,该对象在释放后会打印一条消息。将Subject作为Observable工厂的返回对象。然后,测试当Subject推送OnCompletedOnError通知时发生什么,以及当取消订阅时会发生什么。

在所有测试中,资源都被释放了。可以看到,在每个测试用例之间,都创建了一个新的Subject,因为已完成的Subject不能再使用,并且会自动将其完成通知推送给所有订阅它的新Observer

输出结果如下:

Disposed when completed
- OnCompleted()
DISPOSED
\
Disposed when error occurs
- OnError:
System.Exception: error
DISPOSED
\
Disposed when subscription disposed
DISPOSED

这证明,对于Observable或订阅的任何终止,资源都会被释放。

Using操作符还存在一个异步版本,该版本中资源工厂和Observable工厂返回Task

IObservable<TResult> Using<TResult, TResource>(
    Func<CancellationToken, Task<TResource>> resourceFactoryAsync,
    Func<TResource, CancellationToken, Task<IObservable<TResult>>>
        observableFactoryAsync)

由于工厂是异步的,所以它们都接收取消令牌,如果工厂在运行时订阅被取消,令牌将报告取消。除此之外,异步版的工作方式与之前相同。

当需要释放资源时,使用Using操作符很有用。但是,在某些情况下,清理操作不会通过可释放对象进行。在C#中,当无论操作成功还是失败都需要运行一段代码时,可以使用try-finally语句。Rx也提供了相似的语义。

显式完成

Finally操作符与C#中的finally块相似。在操作最后,无论成功还是失败,都会执行一段代码。如下图所示:

finally块中的代码通常用于清理一些不一定需要释放的资源,并且运行与关闭有关的逻辑事务。Finally操作符为Observable做同样的事情:在Observable终止时(无论成功还是发生错误)运行一段需要执行的代码。

假设有一个展示操作进度的窗口(例如,加载文件或运行长时间的计算),并且无论操作成功还是失败都希望关闭该窗口。下面是实现代码:

IObservable<int> progress =...

progress
    .Finally(() =>{/* 关闭窗口 */})
    .Subscribe(x =>{/* 更新界面 */});

无论Observable是如何终止的,关闭窗口的那段代码都会被执行。

下面的代码示例展示了执行Finally子句时的不同情况:

Console.WriteLine("Successful complete");
Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(3)
    .Finally(() => Console.WriteLine("Finally Code"))
    .SubscribeConsole();

Console.WriteLine("Error termination");
Observable.Throw<Exception>(new Exception("error"))
    .Finally(() => Console.WriteLine("Finally Code"))
    .SubscribeConsole();

Console.WriteLine("Unsubscribing");
Subject<int> subject = new Subject<int>();
var subscription =
    subject.AsObservable()
           .Finally(() => Console.WriteLine("Finally Code"))
           .SubscribeConsole();
subscription.Dispose();

输出结果如下:

Successful complete
- OnCompleted()
Finally Code
\
Error termination
- OnError:
System.Exception: error
Finally Code
\
Unsubscribing
Finally Code

Observable执行最后一步工作且无法使用可释放对象时(例如,关闭连接或向外部服务发送消息),Finally操作符很有用。

接下来,将要展示如何降低不再使用但是没从内存中删除的Observer的风险,这种情况叫悬空Observer

悬空Observer

悬空ObserverObserverObservable保持住(引用)的结果,即使Observer的逻辑寿命已经结束。如果Observer是显示来自聊天Observable对象的聊天消息窗口,则即使用户已经关闭窗口,Observable对象仍可能引用该窗口对象。

Observer订阅了一个Observable,但是从未通过Dispose取消订阅,就会出现悬空Observer。定义一个对象来订阅Observer,并由其作为Observer的所有者负责整个生命周期。

悬空Observer会导致内存泄漏,因为Observer是占用内存的对象。悬空Observer也会导致不必要的(和意外的)行为,因为Observer仍然会处理通知。例如,前面提到的聊天窗口,即使已经关闭,仍然会将聊天信息添加到其内部集合中。下图展示了一个悬空Observer

提醒一下,当Observer订阅了一个Observable对象时,将会获得一个可以用来取消订阅的对象。例如:

IObservable<int> observable = ...
IDisposable subscription = observable.Subscribe(x =>{/*the observer code*/});

不幸的是,很多开发者丢弃了该对象,没有对其进行维护。开发者忘记取消订阅以至于忘记保存取消订阅的对象,这将导致悬空Observer

如果Observer保持着对其他对象的引用,那么就会创建一系列未回收的对象。当通过Subscribe操作符的OnNextOnErrorOnCompleted函数隐式创建Observer时就会发生这种特殊情况。如果函数使用对象的属性或方法,那么将隐式创建从Observer到创建订阅的对象的引用。

明确的说,如果程序确实需要Observer在其所订阅的Observable对象的生命周期内保持活动状态,那么使Observer处于悬空状态是预期行为。但是,很多情况下,Observer应在其所有者(或创建者)有效期内保持活动状态,这种情况下,保存取消订阅的对象并在需要的时候取消订阅就至关重要。

提示

关于用于取消订阅的对象的一种误解是错误的假设当GC回收用于取消订阅的对象时会调用其Dispose方法。Rx的可释放对象没有实现终结器,如果GC回收了可释放对象,将会释放其内存,但是不会取消订阅。所以不能依靠GC来取消订阅,

在某些情况下,无法确定订阅的有效期何时结束,而是想要动态的保持订阅直到没有对Observer的引用(Observable除外)时,再进行释放。这种情况的一个示例是,在使用Windows Phone之类的平台时,其应用程序的页面保留在后台。(后台是指允许用户按下返回键并返回到上一页。)Windows Phone应用程序还可以在需要阻止用户返回时清除后台(例如,当用户注销并返回登录页面,之前访问的所有页面将不再相关)。

假设页面(或者ViewModel)订阅了一个Observable。由于页面生命周期的不确定性,该页面不知道其是否仍需要在后台。无法确切知道何时取消订阅。对于这种情况,就需要一个弱Observer

创建弱Observer

悬空Observer的问题类似于悬空事件的问题。在传统.NET事件中,事件处理程序向事件的注册会创建一个从事件源到包含该事件处理程序的对象的引用(除非事件处理程序是静态的)。因此,除非使用-=操作符从事件中注销,否则只要事件源处于活动状态,包含事件处理程序的对象就会保持活动状态。

为消除该风险,常见的处理方式是将事件持有的引用改为弱引用。WeakReference类表示一个仍允许GC回收引用的对象的引用。使用WeakReference对象的代码可以查询其引用的对象是否仍然存在。

下面的示例展示了,只要存在对对象的强引用,WeakReference就会表明该对象是活动的。如果没有了强引用,那么WeakReference就表明该对象不再存在。

object obj = new object();
WeakReference weak = new WeakReference(obj);

GC.Collect();
Console.WriteLine("IsAlive: {0} obj!=null is {1}", weak.IsAlive,obj!=null);

obj = null;
GC.Collect();
Console.WriteLine("IsAlive: {0}", weak.IsAlive);

输出结果如下:

IsAlive: True obj!=null is True
IsAlive: False

可以使用WeakReference弱化Observer的订阅,因此,如果使Observer保存活动的唯一情况是Observable,那么就不会阻止GC回收Observer。这种模式被称为弱Observer

下图展示了要实现的目标。这里的想法是创建一个代理对象,该对象持有对Observer的弱引用,并将来自源Observable的调用委派给Observer。为了使代理从Observable接收通知,必须实现IObserver<T>接口。

对于WeakObserverProxyObservable收到的每条通知,都会检查Observer是否仍然存在且没有被GC回收。如果存在,就会把通知传递给Observer。如果Observer已经被回收,那么WeakObserverProxy就会取消订阅。

下面是OnNext方法看起来的样子:

IObserver<T> observer;
if (_weakObserver.TryGetTarget(out observer))
{
    observer.OnNext(value);
}
else
{
    _subscriptionToSource.Dispose();
}

OnErrorOnCompleted方法也是如此,所以像下面这样重构代码:

void NotifyObserver(Action<IObserver<T>> action)
{
    IObserver<T> observer;
    if (_weakObserver.TryGetTarget(out observer))
    {
        action(observer);
    }
    else
    {
        _subscriptionToSource.Dispose();
    }
}

public void OnNext(T value)
{
    NotifyObserver(observer=>observer.OnNext(value));
}

除了在内部可能会回收Observer外,用户还可以随时取消订阅。WeakObserverProxy对象保存了订阅对象,并通过AsDisposable方法公开它。公开的方法想客户代码返回用于取消订阅的对象。

下面是WeakObserverProxy的完整代码:

class WeakObserverProxy<T>:IObserver<T>
{
    private IDisposable _subscriptionToSource;
    private WeakReference<IObserver<T>> _weakObserver;

    public WeakObserverProxy(IObserver<T> observer)
    {
        _weakObserver = new WeakReference<IObserver<T>>(observer);
    }

    internal void SetSubscription(IDisposable subscriptionToSource)
    {
        _subscriptionToSource = subscriptionToSource;
    }

    void NotifyObserver(Action<IObserver<T>> action)
    {
        IObserver<T> observer;
        if (_weakObserver.TryGetTarget(out observer))
        {
            action(observer);
        }
        else
        {
            _subscriptionToSource.Dispose();
        }
    }

    public void OnNext(T value)
    {
        NotifyObserver(observer=>observer.OnNext(value));
    }

    public void OnError(Exception error)
    {
        NotifyObserver(observer => observer.OnError(error));
    }

    public void OnCompleted()
    {
        NotifyObserver(observer => observer.OnCompleted());
    }

    public IDisposable AsDisposable()
    {
        return _subscriptionToSource;
    }
}

为了更方便,创建了AsWeakObservable扩展方法,该方法会封装想要弱订阅的所有Observable

现在,当Observer订阅时,将创建一个WeakObserverProxy,并将Observer和对源Observable的订阅传给它。最后,内部的订阅传递给调用者:

public static IObservable<T> AsWeakObservable<T>(this IObservable<T> source)
{
    return Observable.Create<T>(o =>
    {
        var weakObserverProxy = new WeakObserverProxy<T>(o);
        var subscription = source.Subscribe(weakObserverProxy);
        weakObserverProxy.SetSubscription(subscription);
        return weakObserverProxy.AsDisposable();;
    });
}

下面是一个测试弱Observer的示例。创建了一个Observable,其每秒推送一个通知(就像一个传感器报告它测量的值),并且弱化了Observer。该程序保持订阅2秒钟,以使Observer保持活动状态。然后,删除对订阅对象的引用(将其设置为null)并强制执行GC。之后,即使没有显式的调用Dispose方法,也不会接收到任何通知:

var subscription =
    Observable.Interval(TimeSpan.FromSeconds(1))
              .AsWeakObservable()
              .SubscribeConsole("Interval");

Console.WriteLine("Collecting");
GC.Collect();
Thread.Sleep(2000); // 2 seconds

GC.KeepAlive(subscription);
Console.WriteLine("Done sleeping");
Console.WriteLine("Collecting");

subscription = null;
GC.Collect();
Thread.Sleep(2000); // 2 seconds
Console.WriteLine("Done sleeping");

输出结果如下:

Collecting
Interval – OnNext(0)
Interval – OnNext(1)
Done sleeping
Collecting
Done sleeping

从输出中可以看到,当订阅被强引用时,通知会持续推送显示。当订阅不被强引用时,通知停止了推送显示。

不应该总是使用弱Observer(就像弱事件一样),因为大多数情况下,订阅是可以被控制的。但是,如果发现无法确定Observer的生命周期(例如,使用Windows Store应用程序的后台程序),那么弱Observer就十分有用,其可以使编程更简单,并提高程序资源的利用率。

需要注意的是,WeakObserverProxy对象在回收其引用的Observer之后可能会存活很长时间。这是因为,当Observable推送通知时,WeakObserverProxy可以检查是否仍需要它,如果不需要,其才可以取消订阅。

接下来,将会遇到另一种情况,即即使编写的代码没有出错,程序中的资源消耗也会增加。当Observer在一段时间内接收到通知数量很多时,就可能会发生这种情况。这称之为背压。

处理背压

Observable提供了其推送对象源的一个抽象,但是Observable接口中无法提供有关这些通知推送速率的任何线索。

不同速率的Observable

关于Observer的处理速度,可能有三种结果,如下图所示:

  1. Observer的处理速度与Observable的推送速度相同。
  2. Observable的速度快于Observer。这将导致过载。
  3. Observer的速度快于Observable。这时,Observer可以处理更多由Observable推送的通知。

可以将这些情况与客户端从网站获取请求进行比较。托管网站的Web服务器可以处理有限的请求。当请求数量过多时,就可能收到一条错误信息,表明该网站不可用,如下图所示:

对于情况1和3,Observer的速度与Observable的速度相同或者更快,那么就不会出现任何问题,并且系统运行良好。但是,当Observable的速度大于Observer处理通知的能力时,除非使用某种方式降低推送通知的速度,否则就会处于过载状态,最终导致系统崩溃。

如前所述,这种过载叫做背压,这很容易理解,如下面的示例所示。

在下面的示例中,使用Zip操作符将每秒推送一个通知的Observable与另一个每2秒推送一个通知的Observable组合在一起。这些Observable可能是从两个传感器或两个远程服务器推送通知的,但是无论如何,结果将是慢的Observable通知将被Zip操作符缓冲:

var fast = Observable.Interval(TimeSpan.FromSeconds(1));
var slow = Observable.Interval(TimeSpan.FromSeconds(2));

var zipped = slow.Zip(fast, (x, y) => x + y);

Zip操作符根据元素的推送顺序进行组合,因此其必须存储快Observable的元素,直到慢Observable推送相应的通知为止。10秒后,快Observable将推送10个元素,而慢Observable仅推送5个元素,因此Zip操作符在内存中缓存了5个元素。如果运行一整天(总共86400秒),那么内存中将缓存43200个元素。下图展示了该问题。

现在,已经了解了什么是背压,那么接下来就是怎么解决背压。

缓解背压

假设有人向你高速扔球,你需要抓住球并将它们放在架子上。有三种可能的方式来解决问题:

  1. 忽略一些球,丢弃它们(有损方法)。
  2. 将一些球暂时放入盒子中,然后再拿出来(无损方法)。
  3. 通知投球者停止投球,直到可以接球为止(受控的无损方法)。

一些Rx操作符采用有损方法,一些采用无损方法,但是没有操作符采用受控无损方法。

提示

Reactive Streams(www.reactive-streams.org)试图为Observable提供一种受控的无损方法。如Reactive Streams网站上所诉,该计划为具有无阻塞背压(受控无损)的异步流提供了标准。该标准扩展了Rx模型,以使Observer可以将可承受的负载告诉Observable。撰写本文时,Rx.NET还不支持Reactive Streams。

有损方法

假设有两个推送通知的传感器。一个的推送速度是另一个的两倍,并且需要组合这些通知。这里需要考虑慢速传感器推送的通知是否仍然有用。如果传感器推送心率,那么就得考虑一个小时前测量的心率是否还有用。丢弃它仅使用最新的值是否更好?这时,怎么合理的丢弃通知,有下面几个可以选择的选项:

  • 如果需要合并Observable,并且仅需要合并每个Observable推送的最新通知就够了,可以使用CombineLatest操作符。
  • 如果Observable的速率有时很高,并且在很短时间内推送的上一个通知无关紧要,可以使用Throttle操作符。
  • 如果需要稳定的使用通知,且无论每个时间段推送多少个通知,可以使用Scan操作符。

如果需要将来自心率检测器的通知与来自速度计的通知进行组合,并且心率检测器的速率可能比速度计更快,那么可以使用CombineLatest操作符来缓解背压:

heartRates.CombineLatest(speeds, (h, s) => String.Format("Heart:{0} Speed:{1}", h, s))

在所有有损方法的选项中,都会丢弃一些通知,从而降低了资源消耗,如果将响应能力和可用性放在首位,这是理想的选择。当需要优先使用每个推送的通知时,就需要采用无损方法。

无损方法

假设Observable正在推送需要在屏幕上显示的文本消息。每次更改屏幕时,都需要刷新屏幕,这很耗时间。当消息的频率很高时,刷新屏幕会导致UI无响应,并使用户不满意。更好的解决办法是一次刷新多条消息,而不是每条都刷新。这时,不能因为消息推送的太快而丢弃消息。因此,需要一种无损的方法来处理背压。Rx支持的无损方法是通过缓冲,存储通知,然后转换为批量操作进行处理,

在前面介绍的Buffer操作符可以按时间或数量指定缓冲方式。应该谨慎处理;否则,程序将会增加内存消耗,并可能导致程序崩溃。

总结

在本章中,介绍了用于优化Rx代码的方法。介绍了如何优雅的处理发生的错误,以及如何控制代码使用的资源。

  • Catch操作符可以对Observable管道中引发的特定类型异常做出响应。其设置了一个后备Observable,当引发异常时Observer就会订阅它。
  • OnErrorResumeNext操作符在无论成功完成还是出错终止时都将连接转到另一个Observable
  • Retry操作符会在发生错误时重新订阅Observable
  • Using操作符可以在Observable终止时明确的释放对象。这样就可以正确的清除Observable管道内使用的资源。
  • Finally操作符可以在Observable终止时运行特定的代码(如清除或打日志)。这样就可以在Observable处理结束时运行清除代码。
  • ObservableObserver的强引用,可能导致Observer的生命周期比其预期的要更长(悬空Observer)。
  • WeakObservers将用于保存Observer的引用改为WeakReference,从而解决了由于Observable持有Observer而无法回收Observer的问题。
  • 当消费者比生产者慢时,就会导致背压。
  • 背压会导致系统性能(包括内存和吞吐量)下降。
  • CombineLatestThrottleScan操作符可以以有损方法处理背压;丢弃一些通知,以降低资源消耗。
  • Buffer操作符通过将通知保存到可以稍后进行整体处理的批量操作中来处理背压。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注