仮想通貨取引所とRxJS

こんにちは、ビットバンクでフロントエンドを開発しているDaikiです☆彡。
好きなRxJSオペレーターはcombineLatestです。
ビットバンクテックブログ始まって以来のフロントエンド関連の記事ですが、仮想通貨取引所bitbank.ccにおけるRxJSの活用事例を紹介しようと思います。

前提としてbitbank.ccのフロントエンド開発の概要などを知りたい場合、弊社松本の過去登壇資料が参考になるかと思います。
bitbankフロントエンド開発について -SlideShare

概要

仮想通貨の板取引を行うWebアプリケーションという都合上、リアルタイムのデータを扱う画面が多い弊社のフロントエンド開発ですが、リアクティブプログラミングライブラリRxJS、またRxJSと密にインテグレートされたWebアプリケーションフレームワークAngularが無くてはならないものになっています。
今回は、bitbank.ccで実際にRxJSを使って実装されている箇所を例に挙げ、RxJSの活用例をいくつか紹介していきたいと思います。

*注: これから記事内に記載するコードはTypeScriptで記述しています。また、あくまでイメージであり実行可能なものではございません。

WebSocketのハンドリング

bitbank.ccでは次のリアルタイムデータをWebSocketから取得しています。

  • depth(リアルタイム板情報)
  • transaction(歩み値情報)
  • ticker(リアルタイム価格情報)

WebアプリケーションでWebSocketによる通信を実装する際にはSocket.ioを使うのが一般的かと思います。
Socket.ioではコネクションを張ったあとsocket.on('eventName', (data) => { /* 受信したデータを処理 */}) のようにcallbackを使った形で、サーバーから配信されるデータのハンドリングを行います。
AngularのServiceとDIを使うとアプリケーション全体でシングルトンなサービスを作ることができるので、次のように、WebScoket逐次取得したデータをRxJSのSubjectを利用しストリームデータとしてとしてアプリケーション全体で共有するサービスを簡単に実装することができます。

export class WebSocketHandleService {
  socket: SocketIOClient.Socket;
  wsSubject: Subject<EventType> = new Subject<EventType>();

  constructor() {
    this.init();
  }

  // 他のサービスに提供するストリーム
  subscribeWs(): Observable<EventType> {
    return this.wsSubject.asObservable();
  }

  private init() {
    // コネクションの確立
    // ルームへのJoinなど
    // ...略

    this.socket.on('<EventName>', (wsData) => {
      const data = wsData.message.data;
      // コールバックで受け取ったデータをSubjectに流す
      this.wsSubject.next(data);
    });
  }
}

また、bitbank.ccではWebSocketの他にPubNubを使ったリアルタイムデータ配信も行っており、これらのデータ取得でも、Angular向けに提供されているライブラリを利用して、RxJSのObservable型でストリームデータを扱っています。
異なるデータソースでも、RxJSのObservable型で統一して扱えるようにしておくことで、各ドメインのModel層、やViewで扱いやくシンプルな実装を行うことができます。

複数データソースのマージ

上記でも紹介したtransaction(歩み値情報)のリアルタイム取得に関して、RxJSのオペレーターを使った複数データソースの結合方法を紹介したいと思います。
WebScoketのエンドポイントから取得できるデータは前回配信時からの差分情報のみであるため、ページ初回ロード時、ブラウザリロード時には、REST APIからのバルク取得も合わせて使用しています。
以下の例では、ページ初回ロード時にREST APIからバルク取得したデータとWebSocketで取得したリアルタイムデータの結合を行うサンプルコードです。このような処理も、RxJSのオペレータを使うことで簡単に実装することが可能です。

  // REST APIからTransactionをバルクで取得する
  getBulkTransactions$(): Observable<Transaction[]> {
    // ...省略
  }

  // WebSocketからTransactionをリアルタイムで取得する
  getWsTransactions$(): Observable<Transaction> {
    // ...省略
  }

  getTransactions$(): Observable<Transaction[]> {
    // Subjectを作成する
    // ここでは値をSubjectにキャシュするためにReplaySubjectを使用
    this.transactionSubject = new ReplaySubject<Transaction[]>(1);

    // REST APIからバルクで取得したデータを起点としてストリームを作成
    const transactions$ = this.getBulkTransactions$().pipe(
      mergeMap(bulk: Transaction[]) => {

        return getWsTransactions$().pipe(
          // REST APIからバルクで取得したデータと、リアルタイムなストリームをscanオペレーターで合成
          scan(
            (current: Transaction[], realtime: Transaction) => current.unshift(realtime), bulk,
          ),
          // 強制的に初期データを流す
          startWith(firstTransactions),
        );
      }),
    );

    transactions$.subscribe((transactions) => this.transactionSubject.next(transactions));

    return this.transactionSubject.asObservable();
  }

イベントストリームによるデータソースのスイッチ

最後にもう一つ例を紹介したいと思います。
bitbank.ccでは複数の通貨ペアを扱っており、ユーザーがアクティブな通貨ペアを変更した際には、取得するデータストリームを切り替えるといった処理を行ってる箇所が複数あります。
以下の例は、アクティブな通貨ペアを変更したというイベントを起点に、取得する取引履歴データを切り替えるスイッチ処理です。また、一つ前に紹介した例と同様にバルク取得とリアルタイム取得のストリームを合成も行っています。

  // 通貨ペアが選択されると変化するSubject
  pairChange$ = new BehaviorSubject<string>(null);

  // 通貨ペアを指定してREST APIからHistoryをリアルタイムで取得
  getBulkHistoryByName$(name: string): Observable<History[]> {
    // ...省略
  }

  // 通貨ペアを指定してPubNubからHistoryをリアルタイムで取得
  getPubNubHistoryByName$(name: string): Observable<History[]> {
    // ...省略
  }

  getHistory$(): Observable<History[]> {
    return this.pairChange$.pipe(
      switchMap((name: string) => {
        return merge(this.getPubNubHistoryByName$(name), getBulkHistoryByName$(name)).pipe(
          scan<History[]>((current, new) => {
            // タイムスタンプを条件にマージを行う処理
            return mergeHistory(current, new);
          }, []),
        );
      }),
    );
  }

最後に

これまで紹介した通り、仮想通貨取引所などのリアルタイムなストリーム情報を頻繁に扱うクライアントアプリケーションにおいて、リアクティブプログラミングライブラリを使ったストリームデータの管理は非常に相性が良いものだと感じています。
また、Angularでは公式のhttpクライアントモジュールがデフォルトでObservableを返すほか、多くの非同期処理がRxJSのObservableが利用されていることもあり、Angular + RxJSの組み合わせは特にリアルタイムなデータストリームを扱うアプリケーションと相性が良いと言えるでしょう。
ぜひ皆さんも、RxJSとAngularで仮想通貨取引所のフロントエンドを作ってみてはいかがでしょうか☆彡。

Author image
About Daiki
expand_less