System.Reactiveの中身をほんのちょっと読んで見る。(その3)
動作を追う。
メソッドチェーン時(Subscribe前)
サブスクライブが走るまでは、単にIObservable
Subscribe時
Rxはここが重要ですよね。とりあえずsubject.Select().Subscribe(do something)みたいな感じを想定します。
また再掲まみれになりますが。。。
SelectorクラスのSubscribeが呼ばれると、Producerクラスのここが反応します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
public IDisposable Subscribe(IObserver<TTarget> observer) { if (observer == null) { throw new ArgumentNullException(nameof(observer)); } return SubscribeRaw(observer, enableSafeguard: true); } public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguard) { ISafeObserver<TTarget> safeObserver = null; if (enableSafeguard) { observer = safeObserver = SafeObserver<TTarget>.Wrap(observer); } //後続のオブザーバーからSinkオブジェクトが生成される。 //引数のobserverはIObserver<TTarget> //戻ってくるsinkはIObserver<TSource> //要するに次にOnNextが送られるべきIObserver<TTarget>を内包するIObserver<TSource>が出来上がる。 var sink = CreateSink(observer); //後続のオブザーバにsinkオブジェクトの参照をもたせる safeObserver?.SetResource(sink); if (CurrentThreadScheduler.IsScheduleRequired) { //こっちはとりあえず無視 CurrentThreadScheduler.Instance.ScheduleAction( (@this: this, sink), tuple => tuple.@this.Run(tuple.sink)); } else { //重要 Run(sink); } return sink; } |
Run(sink)は
1 2 |
protected override void Run(_ sink) => sink.Run(_source); |
となっており、sinkオブジェクトのRunを呼ぶのでした。Runは以下です。
1 2 3 4 5 |
public virtual void Run(IObservable<TSource> source) { SetUpstream(source.SubscribeSafe(this)); } |
またRunを呼んだときのsinkオブジェクトは、IObserver
Dispose時
購読停止するためSubscribe()で帰ってきたIDisposableなオブジェクトをdisposeしたときのことを考えます。リソースがどういうふうに開放されていくかは気になりますよね。Subscribeの方さへわかれば、まぁこちらは比較的カンタンで、末端のsinkオブジェクトがsubscribe時には返却されていることがわかります。そこで、Sink
1 2 3 4 5 6 7 8 9 10 11 |
public void Dispose() { if (Interlocked.Exchange(ref _observer, NopObserver<TTarget>.Instance) != NopObserver<TTarget>.Instance) Dispose(true); } protected virtual void Dispose(bool disposing) { Disposable.TryDispose(ref _upstream); } |
となります。Disposable.TryDisposeは以下のようになっていて、_upstreamのDisposeが連鎖的に呼ばれるようになっています。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
internal static bool TryDispose(ref IDisposable fieldRef) { var old = Interlocked.Exchange(ref fieldRef, BooleanDisposable.True); if (old == BooleanDisposable.True) { return false; } old?.Dispose(); return true; } |
とりあえずこれくらいで満足しておくこととします。
感想
だいたいの流れはわかりました。
しかしSink<TSource, TTarget>クラスのGetForwarder関数は結局一体なにものなのか、とか、safeObserver?.SetResource(sink)は一体なんの意味が…とか、まぁ気になることはこの時点でもいくつかあるのですが、まぁ初回の探索なのでまぁいいでしょう、という感じにしておきます。
だいたいのクラスの役割とか動作はわかったけど、これ一体どういう思想でこういう設計してるのかなーみたいなところがいまいち察することができないのでココらへんは精進したいですね。。。
あと非同期を考慮しているからInterlockedが頻発して大変。
最近のコメント