RxJS Schedulerについて

こんにちは、bitbankでフロントエンドを担当しているkouです。
この記事ではRxJSというライブラリのSchedulerという機能について紹介します。

なお、この記事はRxJS@6.2.1に則り書かれています。
また本記事内のサンプルコードは理解しやすいようにRxJSのものを一部変更して使用しています。

Schedulerとは

RxJSにはSchedulerという仕組みがあります。
一般的にユーザーが意識して使うことはほとんどありませんが、時間をコントロールするオペレーターは内部的にこの仕組みを利用しています。
表立つことは少ないですが、RxJSの仕組みを読み解く上で欠かすことはできない重要な仕組みです。

Scheduler Class

まずは全てのSchedulerの元となる、 Scheduler Classの仕様を見てみましょう。
Schedulerは下記のようなインターフェースを持っています。

class Scheduler {
  now(): number;
  schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay?: number, state?: T): Subscription;
}

Schedulerには2つのメソッドがあります。
ある処理を予約して遅延実行するための schedule() 、そのための基準となる時間を判定するための now() です。
schedule() は下記の3つの引数を取ります。

  1. work: 遅延実行させる関数
  2. delay: 遅延時間、主にsetIntervalに使用される。
  3. state: 処理実行時の状態。workの実行時に引数として渡される。

非常にシンプルですね。

SchedulerとAction

Schedulerが前述の schedule() を実行すると、その結果としてスケジュール実行された Action を生成します。
イメージとしては、下のようなコードになります。(※分かりやすいよう実際のコードを改変しています。)

class Scheduler {
  schedule<T>(work: (this: Action<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
    return new Action(this, work).schedule(state, delay);
  }
}

class Action<T> extends Subscription {
  schedule(state?: T, delay?: number): Subscription;
}

Actionはスケジュール実行する処理の一つ一つを代表しています。そして面白いことにActionはSubscriptionでもあります。
つまり、遅延実行する処理を unsubscribe() によってキャンセルすることが出来ます。
この仕組みにより、RxJSではSubscriberなのかActionなのか区別することなく unsubscribe() でまとめて後片付け処理が出来るようになっています。

また、基本的にSchedulerはシングルトンであり、RxJSの内部でも同じインスタンスが使い回されています。
1つのSchedulerが処理単位でActionを生成し、最終的な実行までをマネージメントしているイメージになります。

Schedulerの種類

上記でSchedulerとActionについて触れましたが、ActionはSchedulerの数だけあります。
つまり、AsyncSchedulerに対してAsyncAction、AsapSchedulerに対してAsapActionのように、Schedulerはそれぞれ専用のActionを所有しています。
RxJSには現在6種類のSchedulerが存在しますが、Actionもまた6種類存在しています。

そして、それぞれSchedulerの役割は異なりますが、全てAsyncSchedulerが元になっています。
RxJSではAsyncSchedulerがSchedulerの中心的な存在といっても過言ではありません。

debounceTimeでの使用例

SchedulerとActionの仕様を踏まえて、今度は実際にSchedulerが使用されているオペレーターの実装を見てみましょう。
ここでは debounceTime を見てみたいと思います。

debounceTimeは、ある時間間隔で連続して値が流れなかった時にのみ値をnextするというオペレーターです。
フォームインプットなどで、キーボードのタイピングが一定時間動かなかった時を判定するのに使用されたりします。

それではdebounceTimeの実装を見てみましょう。
コードは必要な箇所だけ抜粋し、改変しています。

class DebounceTimeSubscriber<T> extends Subscriber<T> {
  private debouncedSubscription: Subscription = null;
  private lastValue: T = null;
  private hasValue: boolean = false;

  constructor(
    destination: Subscriber<T>,
    private dueTime: number,
    private scheduler: SchedulerLike) {
    super(destination);
  }

  protected _next(value: T) {
    this.clearDebounce();
    this.lastValue = value;
    this.hasValue = true;
    this.add(this.debouncedSubscription = this.scheduler.schedule(dispatchNext, this.dueTime, this));
  }

  debouncedNext(): void {
    this.clearDebounce();

    if (this.hasValue) {
      const { lastValue } = this;
      this.lastValue = null;
      this.hasValue = false;
      this.destination.next(lastValue);
    }
  }

  private clearDebounce(): void {
    const debouncedSubscription = this.debouncedSubscription;

    if (debouncedSubscription !== null) {
      this.remove(debouncedSubscription);
      debouncedSubscription.unsubscribe();
      this.debouncedSubscription = null;
    }
  }
}

function dispatchNext(subscriber: DebounceTimeSubscriber<any>) {
  subscriber.debouncedNext();
}

debounceTimeの実装について

少しコードがありますが、ひとつひとつを順番に見ていきましょう。
DebounceTimeSubscriberdebounceTime の内部実装です。
この記事ではSubscriberについて割愛しますが、debounceTimeを実行するとDebounceTimeSubscriberがnewされるというイメージを持ってください。

DebounceTimeSubscriber#debouncedSubscription

これはスケジュールされたActionのSubscriptionです。
値がnextされると、dueTimeの時間後に処理を実行するActionが生成されます。
これをキャンセル可能にする為、Subscriptionとしてclassのプロパティに保持しています。
dueTimeの時間内に次の値がnextされると、これをunsubscribeしてキャンセルします。

DebounceTimeSubscriber#lastValue

最後にnextされた値です。
dueTimeの時間内に次の値がnextされなかったら、これをdestinationである次のObserverにnextして流します。

DebounceTimeSubscriber#hasValue

dueTime後にnextできる値を持っているかのフラグ管理に使います。

DebounceTimeSubscriber#dueTime

この時間内に次の値がnextされなかった場合、destinationに対して値をnextします。

DebounceTimeSubscriber#_next

これは前のSubscriberから値がnextされた時に呼ばれるメソッドです。

const subject = new Subject()

subject.pipe(
  debounceTime(100),
).subscribe(
  function destinationNext(value) {}
)

上記のような場合、 subject.next() が呼ばれた時に
DebounceTimeSubscriber._next() が呼ばれます。
この処理が呼ばれると、スケジュールされたActionが発行されます。
同時に既にスケジュールされたActionを持っている場合はキャンセルをします。
これらにより、debounceTimeの処理が実現されています。

DebounceTimeSubscriber#debouncedNext

スケジュールされたActionが実行されると呼ばれます。
ActionのSubscriptionの後片付けをして、次のObserverに対して値をnextします。
上記のコード例では function destinationNext が呼ばれます。

DebounceTimeSubscriber#clearDebounce

スケジュールされたActionをunsubscribeしてキャンセルします。

dispatchNext

Actionの実処理として渡される関数になります。
指定時間後に実行されます。

debounceTimeの処理の流れ

それでは上記のメソッドなどが実際にどのような手順で処理が進んでいくのか確認してみましょう。
下のコードを想定して処理を確認します。

const subject = new Subject()

subject.pipe(
  debounceTime(100),
).subscribe(
  function destinationNext(value) { console.log(value); }
)

1度だけsubjectがnextする場合

  1. Subjectから値がnextされる。
  2. DebounceTimeSubscriber#_next が実行され、スケジュールされたActionが発行される。
  3. 100ms(debounceTimeに指定した時間)経過する。
  4. スケジュールされたActionが実行され、Actionの実処理である dispatchNext が実行される。
  5. DebounceTimeSubscriber#debouncedNext が実行され、destinationである次のSubscriberのnextを実行する。
  6. function destinationNext(value) が実行され、consoleにログが出力される。

nextが1回だけ実行された場合は上記のような流れになります。
それでは今度は100msの間に次の値がnextされる場合を考えてみましょう。

100msの間にsubjectが続けてnextする場合

  1. Subjectから値がnextされる。
  2. DebounceTimeSubscriber#_next が実行され、スケジュールされたActionが発行される。
  3. 100ms経過する前に、subjectが次の値をnextする。
  4. DebounceTimeSubscriber#_next が実行され、スケジュールされた前のActionがキャンセルされて、新しいActionが発行される。
  5. 100ms経過する。
  6. 以降は同様。

いかがでしょうか。
debounceTimeの実装がActionの発行とキャンセル処理で実現されているのがイメージ出来ましたでしょうか。

おわりに

今回はdebounceTimeを一例に上げましたが、Schedulerは時間が関係するあらゆる処理で使用されています。
また、様々なオペレーターに対して自分のカスタマイズしたSchedulerを渡して処理を行わせることも出来ます。

複雑な時間管理を行う必要が出てきた場合、Schedulerは強力な力を発揮するでしょう。
今後Schedulerを理解する上で本記事が参考になれば幸いです。

次回はRxJSの時間処理のテストを実現しているTestSchedulerについて記事を書こうと思います。
それでは、また。

Author image
About Kou
expand_less