読者です 読者をやめる 読者になる 読者になる

the sea of fertility

C#er blog - http://ugaya40.net より移転。今度のブログは落ちない

RxFlowについて

Reactive Extensions


ugaya40/RxFlow · GitHub

RxFlowとは?

RxFlowはRx( Reactive Extensions )を用いてフロー制御を行うためのライブラリです。
現在の業務でネットワークアクセスを含む分岐点の多いバッチ処理を書いていて面倒になって作りました。
もともとはライブラリと言えるくらい大きくなる予定だったんですが、整理されていくうちにだんだんライブラリとして独立させる意味あるのか?ってサイズになってきました。でも今後拡充・・のつもりでライブラリとして公開しておきます。

なぜ Reactive Extension を使いたいのか?

ネットワークアクセスを含むリトライをこんなにシンプルに書ける環境他にあります?

IObservable<string> GetApiResultText(Func<WebRequest> webRequestFactory)
{
    return Observable.Defer(() => webRequestFactory().DownloadStringAsObservable())
        .Retry(3, TimeSpan.FromSeconds(3));
}

このメソッドはリクエストを生成するファクトリを放り込むと非同期でネットワークアクセスを行い、結果を文字列として取得し、エラーの際は3秒おきに計3回までリトライするものです。
意図即コード、僕にとってはこれがRxの一番の魅力です。
昔OptionとEitherを定義してC#で勝手if文無しバッチを書いた時もやっぱりRxの柔軟な機能が欲しくなってRxに寄せるはめになったんですよね。

また継承の多態を駆使したコードと組み合わせるとメソッドチェインによるフローの一覧性ってものすごく強力です。

そしたらもうこれはRxです、Rx。というわけでRxです。

RxFlow

シンプルなフロー制御機構です。RxのConnectableObservableと異なる形でシーケンスを分配します。

  • 複数のシーケンスから同一のシーケンスに分岐
  • 一つのシーケンスから複数のシーケンスへ分岐
  • 複数のシーケンスから複数のシーケンスへ分岐
  • 分岐から分岐へ
  • 分岐の使いまわし

のような事が簡単にできるようになります。
Rx-Mainにしか依存していないので、Rx-Mainが使える環境では使えます。ちゃんとNugetもあります。

RxFlowの機能

シンプルなもので、現在は3つの機能しかありません。

Branch

Branchクラス、これは分岐そのものを示します。CreateBranchのTでこの分岐に流れてくる値の型を指定し分岐を実現します。
作成したBranchは複数のシーケンスから参照することも、BranchからさらにBranchへ分岐することも可能です。

Junction

シーケンスをスイッチします。この例だと1~10までのシーケンスを生成し、2で割れるものはBranchAへ、3で割れるものはBranchBへ分岐させています。
分岐してbranchに流れていった値は後続に流れません。

static void Sample1()
{
    var branchA = Branch.CreateBranch<int>(input =>
        input.Do(i => Console.WriteLine("branchA :" + i))
        .Subscribe());

    var branchB = Branch.CreateBranch<int>(input =>
        input.Do(i => Console.WriteLine("branchB :" + i))
        .Subscribe());

    Observable.Range(1, 10)
        .Junction(i => i % 2 == 0, branchA)
        .Junction(i => i % 3 == 0, branchB)
        .Subscribe();
}

/*output
* branchA :2
* branchB :3
* branchA :4
* branchA :6 // Not processed in branchB
* branchA :8
* branchB :9
* branchA :10
*/

Distribution

Junctionの後続に値が流れる版です。2で割れるのでBranchAに流れた値が後続にも流れているため、3で割れる値が流れるBranchBにも流れて、結果6がそれぞれで出力されています。

static void Sample2()
{
    var branchA = Branch.CreateBranch<int>(input =>
        input.Do(i => Console.WriteLine("branchA :" + i))
        .Subscribe());

    var branchB = Branch.CreateBranch<int>(input =>
        input.Do(i => Console.WriteLine("branchB :" + i))
        .Subscribe());

    Observable.Range(1, 10)
        .Distribution(i => i % 2 == 0, branchA)
        .Distribution(i => i % 3 == 0, branchB)
        .Subscribe();
}

/*output
* branchA :2
* branchB :3
* branchA :4
* branchA :6 //processed in branchA
* branchB :6 //processed in branchB
* branchA :8
* branchB :9
* branchA :10
*/

複数シーケンスから同一Branchへの分岐例

static void Sample3()
{
    var branchA = Branch.CreateBranch<int>(input =>
        input.Do(i => Console.WriteLine("branchA :" + i))
        .Subscribe());

    var branchB = Branch.CreateBranch<int>(input =>
        input.Do(i => Console.WriteLine("branchB :" + i))
        .Subscribe());

    Observable.Range(1, 5)
        .Distribution(i => i % 2 == 0, branchA)
        .Distribution(i => i % 3 == 0, branchB)
        .Subscribe();

    Observable.Range(6, 5)
        .Distribution(i => i % 2 == 0, branchA)
        .Distribution(i => i % 3 == 0, branchB)
        .Subscribe();
}

/*output
* branchA :2
* branchB :3
* branchA :4
* branchA :6
* branchB :6
* branchA :8
* branchB :9
* branchA :10
*/

BranchからBranchへ分岐する例

static void Sample4()
{
    var branchA = Branch.CreateBranch<int>(input =>
        input.Do(i => Console.WriteLine("branchA :" + i))
        .Subscribe());

    var branchB = Branch.CreateBranch<int>(input =>
        input.Do(i => Console.WriteLine("branchB :" + i))
        .Junction(i => i % 2 == 0, branchA)
        .Subscribe());

    Observable.Range(1, 10)
        .Junction(i => i % 3 == 0, branchB)
        .Subscribe();
}

/*output
branchB :3
branchB :6
branchA :6 //branchB to branchA
branchB :9
*/

応用例イメージ

今から僕がやる予定の仕事、それはiOSAndroid用のレシート検証バッチです。僕はそれをRxFlowとRxでやるんですよ!
実務のはもうちょっと複雑だったりいろいろ言えない要件がありますが単純化すると

  1. DBからレシート情報何千件かまとめてひっぱってくる
  2. 以下一件づつ
  3. Google/Appleのサーバに投げてレシート検証(その際一つのレシートごとに3秒ごとに3回までアクセスし、それでもだめならサーバ落ちてると判断してバッチ自体一度終了)
  4. サーバからの戻りを確認。ローカルでも検証
  5. 次のデータへ

なわけです。それがこんな感じで書けます。
(Rxを知っていれば)わかりやすい、シンプル、えらい。

static int Main(string[] args)
{
    int returnCode = 0;

    //▼APIチェック失敗時用分岐シーケンス
    var verifyApiFailureSequence = Branch.CreateBranch<Platform>(platform =>
        platform.Do(p => p.OnFailuredApiCheck()) //APIチェック失敗時処理
        .Subscribe());

    //▼ローカルチェック失敗時用分岐シーケンス
    var verifyInLocalFailureSequence = Branch.CreateBranch<Platform>(platform =>
        platform.Do(p => p.OnFailuredLocalCheck()) //ローカルチェック失敗時処理
        .Subscribe());

    //▼サーバーエラー時用処理
    //(3秒ごと3度のAPIアクセスでダメなようじゃサーバー落ちてるしょって事で一度バッチ自体終了)
    var severErrorSequence = new Func<WebException,IObservable<string>>(exception =>
    {
        Logger.Logging(exception);//ロギング
        returnCode = 1;
        return Observable.Empty<string>();//後続に値を流さない。メインシーケンスは実質ここで待機している他の値の処理をせずに終了する
    });

    //メインシーケンス。処理の主体。
    //GetPurchasedHistories()はDBからレシートを数千件くらい引っ張ってくるイメージ。
    //ToObservable()したので後方のシーケンスで一件一件処理。
    GetPurchasedHistories().ToObservable().Select(Platform.Create) //結果をGoogle/Androidいずれかのプラットフォームオブジェクトに変えて
        .SelectMany(platform => GetApiResultText(platform.GetApiRequest).Catch(severErrorSequence)) //APIアクセスと場合によっちゃ全処理終了
        .Junction(platform => !platform.CheckApiReponse, verifyApiFailureSequence) //API検証の結果ダメならそれ用シーケンスへ
        .Junction(platform => !platform.CheckInLocal, verifyInLocalFailureSequence) //ローカル検証の結果ダメならそれ用シーケンスへ
        .Do(platform => platform.SucessVerified)//成功したものだけ成功時処理。DBに検証結果書き戻しなど
        .Wait(); //結果を待つ

    return returnCode;
}

static IObservable<string> GetApiResultText(Func<WebRequest> webRequestFactory)
{
    //非同期で3秒まで3回のリトライでAPIアクセス
    return Observable.Defer(() => webRequestFactory().DownloadStringAsObservable())
        .Retry(3, TimeSpan.FromSeconds(3)); //
}

というわけでした。