[Rx.NET] 附录 – Rx中的Disposable库

[Rx.NET] 附录 – Rx中的Disposable库

System.Reactive.Core包中包括一个也许有用的附加功能:Rx Disposables库。它是实现了IDisposable接口,并为代码复用提供了通用实现,可以帮助完成大部分释放工作的一组类型。本附录中列出的所有类型都位于System.Reactive.Disposables命名空间中。

下表列出了Disposable及其用途:

类型/效用 描述
Disposable.Create 用于创建Disposable对象的静态方法,当Dispose时执行给定代码
Disposable.Empty 创建一个空的Disposable
ContextDisposable 在给定的SynchronizationContext上运行Dispose的调用
ScheduledDisposable 在给定的IScheduler上运行Dispose的调用
SerialDisposable 内部有一个可替换的Disposable,并且在替换后Dispose前一个Disposable
MultipleAssignmentDisposable 内部有一个可替换的Disposable,并且在替换后不Dispose前一个Disposable
RefCountDisposable 当被引用的所有DisposableDispose后,Dispose内部的Disposable
CompositeDisposable 将多个Disposabel合并为一个可以同时DisposeDisposable
CancellationDisposable Dispose时取消给定的CancellationTokenSource
BooleanDisposable Dispose时设置布尔标志。可以使用IsDisposed属性查看BooleanDisposable是否已经Dispose

Disposable.Create

创建Disposabel最灵活的方式是使用Disposable.Create静态工厂方法。唯一需要做的就是传递一个在调用Dispose时希望执行的操作。下面的示例中,创建了一个可以改变屏幕状态(从忙到不忙)的Disposable。屏幕上显示下载后的新闻信息。当屏幕处于繁忙状态时,UI可以显示一个进度条,向用户展示后台正在进行某些操作。

private async Task RefreshNewsAsync()
{
    IsBusy = true;
    NewsItems = Enumerable.Empty<string>();
    using (Disposable.Create(() => IsBusy = false))
    {
        NewsItems = await DownloadNewsItems();
    }
}

IsBusy属性绑定到屏幕上的忙碌指示器,因此,将其设置为true时,将显示忙碌指示器,而将其设置为false时,将隐藏忙碌指示器。使用Disposable的好处是,即使代码引发异常,using语句也可以确保Dispose方法得以执行,因此可以确保屏幕不会卡在忙碌状态。

Disposable.Empty

Disposable.Empty静态属性返回一个Dispose方法为空的Disposable。可以用于初始化IDisposable变量或成员,因此就不必编写代码来进行null检查(很有可能会忘记)。还可以用于Disposable的返回,例如,当创建自己的Rx操作符时,该操作符必须通过其Subscribe方法返回Disposable,但是这里并不需要任何特殊处理。下面是使用Disposable.EmptyObservable.Return操作符的简化版本:

public static IObservable<T> Return<T>(T value)
{
    return Observable.Create<T>(o =>
    {
        o.OnNext(value);
        o.OnCompleted();
        return Disposable.Empty;
    });
}

ContextDisposable

ContextDisposable类封装了一个Disposable,并在指定的SynchronizationContext上执行其Dispose方法。当操作绑定到特定的上下文时(例如,更改UI元素时),在SynchronizationContext上执行Dispose方法十分重要。下面的代码创建了一个StartBusy方法来创建Disposabel,该Disposabel可以在SynchronizationContext上关闭UI的忙碌指示器:

public IDisposable StartBusy()
{
    IsBusy = true;
    return new ContextDisposable(
        SynchronizationContext.Current,
        Disposable.Create(() => IsBusy = false));
}

ScheduledDisposable

ScheduledDisposable类的工作方式与ContextDisposable的相似,但是不是指定一个SynchronizationContext,而是指定一个希望在其上调度Dispose方法的IScheduler。例如,当在Observable上调用SubscribeOn操作符时,从订阅中返回的Disposable将被使用传入的ISchedulerScheduledDisposable封装。参见下一节的示例。

SerialDisposable

SerialDisposable类可以用来封装可以替换的Disposable。一旦更换内部的Disposable,之前的Disposable就会被自动调用Dispose方法。除此之外,SerialDisposable还会记录其是否已经被Dispose,如果已经被Dispose,那么再更换内部Disposabel,更换后的Disposabel也会被Dispose。在SubscribeOn操作符的简化版本中展示了一个示例。因为无法准确预测Scheduler将在何时调度,所以需要创建一个SerialDisposable并将其内部Disposable放到调度操作内:

public static IObservable<TSource> MySubscribeOn<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler)
{
    return Observable.Create<TSource>(observer =>
    {
        var d = new SerialDisposable();

        d.Disposable = scheduler.Schedule(() =>
        {
            d.Disposable = new ScheduledDisposable(scheduler,
                source.SubscribeSafe(observer));
        });

        return d;
    });
}

在调度任务执行了对源Observable的订阅后,SerialDisposable内部的Disposabel被设置为了使用ScheduledDisposable封装的订阅,因此将在指定的Scheduler上执行Dispose。如果已经被Dispose,被分配的Disposable也将被Dispose

RefCountDisposable

RefCountDisposable类封装了一个Disposable,并且仅在所有引用它的Disposable都被Dispose后才会被Dispose。可以通过调用RefCountDisposable类的GetDisposable方法创建对其的引用。

下面是一个示例,展示了仅当Dispose了两个引用Disposable之后才会Dispose内部的Disposable

var inner = Disposable.Create(
    () => Console.WriteLine("Disposing inner-disposable"));
var refCountDisposable = new RefCountDisposable(inner);
var d1=refCountDisposable.GetDisposable();
var d2=refCountDisposable.GetDisposable();

refCountDisposable.Dispose();
Console.WriteLine("Disposing 1st");
d1.Dispose();
Console.WriteLine("Disposing 2nd");
d2.Dispose();

输出结果如下:

Disposing 1st
Disposing 2nd
Disposing inner-disposable

MultipleAssignmentDisposable

MultipleAssignmentDisposable类包含一个可以被随时替换的Disposabel,但是与SerialDisposable不同,替换Disposable不会自动调用上一个DisposabelDispose。但是MultipleAssignmentDisposable会记住Disposable是否已经被Dispose。如果已经被Dispose,然后替换了Disposable,那么MultipleAssignmentDisposable会自动对新的Disposable进行Dispose

CompositeDisposable

CompositeDisposable类可以将多个Disposable组成一个,以便当CompositeDisposableDispose时可以将内部的所有Disposabel都被Dispose

var compositeDisposable = new CompositeDisposable(
    Disposable.Create(() => Console.WriteLine("1st disposed")),
    Disposable.Create(() => Console.WriteLine("2nd disposed")));

compositeDisposable.Dispose();

也可以使用Add方法:

var compositeDisposable = new CompositeDisposable();
compositeDisposable.Add(Disposable.Create(
    () => Console.WriteLine("1st disposed")));
compositeDisposable.Add(Disposable.Create(
    () => Console.WriteLine("2nd disposed")));

compositeDisposable.Dispose();

通常,当在类的内部订阅了多个Observable时,希望将所有的订阅组合在一起,然后可以一次性Dispose。为了使Observable保持流畅,创建了下面的扩展方法:

static CompositeDisposable AddToCompositeDisposable(this IDisposable @this,
    CompositeDisposable compositeDisposable)
{
    if (compositeDisposable == null)
        throw new ArgumentNullException(nameof(compositeDisposable));
    compositeDisposable.Add(@this);
    return compositeDisposable;
}

可以这样使用:

IObservable<string> observable = ...

observable.Where(x => x.Length % 2 == 0)
    .Select(x => x.ToUpper())
    .Subscribe(x => Console.WriteLine(x))
    .AddToCompositeDisposable(compositeDisposable);

observable.Where(x => x.Length % 2 == 1)
    .Select(x => x.ToLower())
    .Subscribe(x => Console.WriteLine(x))
    .AddToCompositeDisposable(compositeDisposable);

SingleAssignmentDisposable

SingleAssignmentDisposable类仅允许对内部的Disposabel进行一次配置。如果对已经配置好的再次尝试配置,就会引发InvalidOperationException异常。

CancellationDisposable

CancellationDisposable类是IDisposable世界和CancellationTokenSource世界之间的适配器。调用CancellationDisposableDispose时,将取消内部的CancellationTokenSource。例如,在Rx的TaskPoolScheduler中使用,以便于从Schedule返回的Disposabel可以与发送到TaskSchedulerCancellationToken进行绑定。下面是一个简单版本:

IDisposable Schedule<TState>(TState state,
    Func<IScheduler, TState, IDisposable> action)
{
    var d = new SerialDisposable();
    var cancelable = new CancellationDisposable();
    d.Disposable = cancelable;
    Task.Run(() =>
    {
        d.Disposable = action(this, state);
    }, cancelable.Token);
    return d;
}

如果用户Dispose了从方法返回的Disposabel,则由CancellationDisposable内部的CancellationTokenSource创建的CancellationToken将传给TaskScheduler,以阻止其运行Task

BooleanDisposable

BooleanDisposable类包含一个布尔标志,可以用来检查其是否已经Dispose。例如:

var booleanDisposable = new BooleanDisposable();

Console.WriteLine("Before dispose, booleanDisposable.IsDisposed = {0}",
    booleanDisposable.IsDisposed);

booleanDisposable.Dispose();
Console.WriteLine("After dispose, booleanDisposable.IsDisposed = {0}",
    booleanDisposable.IsDisposed);

输出结果如下:

Before dispose, booleanDisposable.IsDisposed = False
After dispose, booleanDisposable.IsDisposed = True

总结

Rx包不仅提供特定于Rx的类型和工具,还提供了可以简化创建Disposabel的库。

  • 如果要在Dispose后执行一段代码,可以使用Disposable.Create静态方法。
  • 如果要创建空Disposable,可以使用Disposable.Empty静态属性。
  • 如果Disposable需要在特定SynchronizationContext上进行Dispose,可以使用ContextDisposable进行封装。
  • 如果Disposable需要在特定IScheduler上进行Dispose,可以使用ScheduledDisposable进行封装。
  • SerialDisposable类可以在替换其内部Disposable时对前一个Disposable进行Dispose
  • MultipleAssignmentDisposable类可以替换其内部Dispose,而不会Dispose前一个Disposabel
  • 如果想要在所有引用的Disposabel都被Dispose后才对一个Disposable进行Dispose,可以使用RefCountDisposable类。
  • 如果想将多个Disposable组合为一个Disposable,并且同时Dispose所有Disposable,可以使用CompositeDisposable类。
  • 可以使用SingleAssignmentDisposable来确保内部Disposabel只会被配置一次。
  • 可以使用CancellationDisposable来在Dispose后取消一个CancellationTokenSource
  • BooleanDisposable可以查询其是否已经被Dispose

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注