Cloud Workflowsで長時間の処理の実行と成否を分かりやすく

何が欲しかったのか

  • long-runなジョブの実行をクラウド上で行う
  • 途中で失敗する可能性がそれなりに高い上記のジョブの成否をシュッと知りたい
    • スケジューラには成否の情報はない
    • 例えばスクレイピングなんかはサイトのHTML構造の変化でけっこう簡単に失敗する

真面目に考えると結果の UI とか欲しくなってくるが、待てよ、なんか似たようなものを知っているような…? これって CI/CD を汎用にしたようなものじゃない?

Workflow Engineてこと?

  • イメージとしては CI/CD のような何かだが、目的は delivery じゃなくてさまざま
    • 処理の途中や全体の成否の様子を後から振り返ることができると嬉しい
  • ブラウザでジョブの失敗とか確認したいが、そのためだけに Rails と RDBMS のインスタンスを稼働させておきたいかというとそんなことはない
  • どうせなら途中で失敗した際には失敗したところからやり直せるとなおよい

どこでどう思いついたのかはまったく思い出せないんだが、「これって workflow engine じゃね?」と思いつくことができた。そこまでくれば話は早い。今回は Cloud Run で long-run job を処理しようとしているので work flow engine は自動的に

Workflows | Google Cloud

に決まる。ということであれこれ見てみる。

比較対象の Cloud Composer は Apache Airflow というものを利用したクラウドサービスらしい。別に Python で定義したいと思わないし、複雑なフロー制御も欲しいとも思っていないし、何より安いので、Cloud Workflows はおあつらえ向きっぽい。

改めて今回やりたいこと

  • 主に Cloud Run Jobs を用いた長時間のタスクを
  • 定期実行したい
  • これは何段階かに分割もできる(ステップ)
    • 途中で失敗した場合、システムを修正したのち途中から再開できるとなおよい

Cloud Workflowsの対応状況

項目対応
Cloud Run Jobs長時間タスクコネクタで可能
定期実行したいCloud Schedulerから実行可能
何段階かに分割step定義可能
途中から再開不可

Google Cloud のさまざまな機能を Workflows から呼び出せるので連携して以下のように動かすことはできるが、途中再開はできないと。なるほど。

失敗はどう知るのか

もう一度確認すると

サービス オーケストレーション用のワークフローまたは Cloud Composer を選択する  |  Google Cloud

Cloud Workflows は

  • HTTPリクエスト
  • Googleプロダクトのコネクタ

のいずれかを利用してアプリを起動するということになっているんだけど、なんとなく

HTTPステータスコードで成否を判定してそう

に見える。では Cloud Run Jobs のように HTTP 関係ないものはどうなるんだろう? と思ったので試した。

Cloud Build で

steps:
  - name: ubuntu
    entrypoint: bash
    args:
      - -c
      - 'exit 1'

単に sh script で exit 1 を返して失敗する(Cloud Build では普通に失敗として判定される)アプリを準備。仮にこの trigger の名前を fail とする。これを

main:
  steps:
    - main:
      call: googleapis.cloudbuild.v1.projects.triggers.run
      args:
        triggerId: fail
        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}

こんな感じで呼び出してあげると、Cloud Workflows 上でも失敗になる。どうも

コネクタは Google Cloud API を HTTP で叩いて返事をもらっていて、API が exit status を HTTP STATUS で返しているっぽい?

と予想が立つ。てゆーか書いてあった。

Workflows を使用してジョブを呼び出す  |  Cloud Run のドキュメント  |  Google Cloud

Cloud Run ジョブは HTTP リクエストをリッスンまたは処理しません。ワークフローから Cloud Run ジョブを実行するには、Cloud Run Admin API コネクタを使用します。

コンテナ ランタイムの契約  |  Cloud Run のドキュメント  |  Google Cloud

ジョブの実行中に動作するコンテナは、完了時に終了しなければならない

Cloud Run ジョブの場合、ジョブが正常に完了したときに、コンテナは終了コード 0 で終了する必要があります。ジョブが失敗した場合は、ゼロ以外の終了コードで終了する必要があります。

ジョブはリクエストを処理しないため、コンテナでポートのリッスンや、ウェブサーバーの起動は行わないでください。

ということは普通の Unix コマンドのように Cloud Run Job を作ればよい、ということになる。

ということで

Cloud Workflows と Cloud Run Jobs を組み合わせて使うことで長時間タスクの状況を把握しやすくすることはできそう。

今後の課題

途中再開

これについては Cloud Workflows ではなくて Cloud Run 側にドキュメントがある。

ジョブの再試行とチェックポイントのベスト プラクティス  |  Cloud Run のドキュメント  |  Google Cloud

チェックポイントを設定する

ジョブにチェックポイントを設定することで、障害発生後にタスクが再開したときに、タスクを最初からではなく、中断したところから再開できます。これにより、ジョブが高速化されるだけでなく、不要なコストを最小限に抑えることができます。

結果の一部を定期的に書き込み、Cloud Storage やデータベースなどの永続ストレージ ロケーションに対する進行状況を確認します。タスクを開始するときに、起動時の結果の一部を確認します。結果の一部が見つかった場合は、その続きから処理を開始します。

ジョブがチェックポインティングに適していない場合は、ジョブを小さなチャンクに分割して、より多くのタスクを実行することを検討してください。

あーまぁそうですよね。自分がこの辺りで

PubSub Functionのfan-outの限界 (2020-03-02) | あーありがち

やってたことっぽい。この時は

  • Cloud Functions
  • Cloud PubSub
  • Cloud Firestore

でやってたんだけど、道具が変わっただけでやりたいことは一緒っぽいな。もしかしたら

geekq/workflow: Ruby finite-state-machine-inspired API for modeling workflow

こういうの使うとよいのかも。これは #load_workflow_state, #persist_workflow_state を追加するだけで永続化周りを扱えるので、Firestore やら何なら Cloud Storage FUSE 辺りでも十分使いものになるんじゃないかという気がする。(そもそも何重も同時に呼び出されることは想定してない)

監視

Workflows 上で失敗になった場合にそれをモニタリングで拾えるのか? それとも Cloud Run や Cloud Logging から拾うようにするのか?はまだ調べられていないのでこれから。

More