ActiveRecordとdry-operationでバッチジョブをお手軽に管理してみる(3)

前回 ActiveRecordとdry-operationでバッチジョブを管理してみる(2) (2025-08-24) | あーありがち の続き。
くどいけど、コード例は出てくるけど、設計を考える話。今回は ActiveRecord や Dry::Operation を利用した書き味の部分とは別に考えないといけない大事な屋台骨の部分。排他制御、全体の統合、エラー処理について。
排他制御をどうするか
これ最初は ActiveRecord でロックしようとしていたんだけど、
- ActiveRecord では待つことしかできず、「ロックされているのでただちに実行を失敗扱いにして中断する」といった制御ができない
- SQLite ではデータベース全体でロックされてしまう
これが組み合わさると「現在の状況のチェック」みたいな機能が実現できない。インフラを動的に作って完全に並列に動かすような大規模な使い方をしたいわけじゃなくて、
- ローコストに
- 途中からの再開可能
- 時間は掛かってもよい(というかむしろ掛かるからこそ上記の機能が欲しい)
と考えているので、ここは別な方法を考えた。
- お手軽なロックの仕組みとしては flock がある
- 伝統的には PaaS はファイルシステムを持っていない(あっても in-memory)
- 最近はオブジェクトストレージを FUSE で mount してブロックデバイスのように扱うことができる1
ということで
flockでロックを取得する仕組みで排他制御しよう
と思いついた。もちろん並列にインスタンス(コンテナ)を起こしてしまう場合は考慮してません。ローコストに実現したいって言ってるでしょ。
実装のイメージはこういう感じ。
#
# @param [Job] job - job instance
# @yield [Proc] block - 準備してOperationを実行してExecutionに結果を詰める処理
#
def call(job:, &block)
with_mutex do # 全体のロックを取得
if File.exist?(guardfile)
Failure(:busy)
else
File.write(guardfile, "")
end
end
begin
block.call(job: job)
ensure
with_mutex do # 全体のロックを取得
FileUtils.rm(guardfile)
end
end
end
- job instance と Operation block を受け取る
- job class に応じたファイルを自動的に準備、これを lock することで、Jobの定義単位でロックを取得する(これで同じジョブが同時に動くことはないのでいろいろ破綻しない)
- ただこのロック用のファイルを作成するタイミングがロックできていないとダメなので、全体のロック用のファイルも用意(
with_mutexブロックがそれ)- 途中で何が起きるか分からないので、 ensure 節で確実にロック用のファイルを削除
という想定。
全体の束ね方
上の仕組みは event 部分を除く処理を図示すると以下のような感じになる。
描いといてよく分からない…コード読んだ方がマシかも。
- 太線の枠である Job と Operation がユーザーの定義するもの
- 組み立て処理は定型なのでユーザーは書きたくない
最初はこんな ? の枠組みは考えていなくて、動作だけを Job と JobExecution でガリガリ書いていたんだけど、
Job は ActiveRecord なので、ActiveRecord 以上の責務を負わせたくない。複雑な責務を追わせると ActiveRecord としてはテストしにくい2
ということに気づいたので、分離を試みた。もう一つ大事なのは書き味で、
- (上にも書いたが)ユーザーが定義するのは Job と Operation だけなので、このいずれかで処理をスタートして結果を受け取りたい
と考えた。結果
- Dry::Operation を継承した親クラスを作る
- Operation.execute というクラスメソッドを作り、ここをスタート地点とする
- 上図の ? を Organizer と名づけ、Operation から必要なクラス群を(デフォルト引数を使いながら)Organizer に渡し
- Organizer が内部で mutex と(受け取った)Operation (スタート地点からすると自分自身)のインスタンスを生成して call して結果を返す
- 実際の Operation はこうした機能を持つ親クラスを継承して、call の中身だけ実装する
#
# @param [JobBase] job_class - subclass of JobBase
# @param [Organizer] organizer - organizer class
# @param [JobProcessor] job_processor - JobProcessor instance
# @return [JobExecution]
#
def self.execute(
*args,
job_class:,
organizer: Organizer,
job_processor: MutexFileJobProcessor.new,
**kwargs
)
org = organizer.new(job_class: job_class, job_processor: job_processor)
org.call(*args, operation: self, **kwargs)
end
こんな感じ。Mutex の部分はテスト用の設定を差し込めるようにするため inject する形になっている。本来は Organizer しか必要ないものだが、実行のスタート地点がここなので、ここで inject している。将来的には拡張も可能ではある3。
自分自身を相手に預ける辺りはパッと分かりにくい感じはするけど、全体をテストコードでガードして、発想に合わせて内部構造を変えながら設計していった。まぁまぁスマートにできたのではないかと思う。このおかげで
- Operation は Operation だけで動作する(
proceed!の実体を inject されていない場合はただ block を実行する) - ActiveRecord は ActiveRecord だけで動作する(
proceed!に block を与えなくても動く)
ようになった。以下の汚い部分は全部 Organizer が受け持つ。
- skip するかどうか
- result の組み立て
ActiveRecord 側は Job が持っている定義の範囲内かどうかだけを気にする(でないと記録する event が存在しないことになりかねないので)。
自分の中ではこれは割と clean architecture の影響を受けた設計だなと感じている。
- 別々の I/O が各々の責務に集中したまま
- それを束ねる人にその I/O を inject してロジックを書く
エラー処理
責務が分かれてすべてを Organizer が束ねる方針に決まったので、最後にエラー処理4。ここまで含めて Organizer のコアの部分はざっくり言うとこんな感じで5、
def call(*args, operation:, processor:, job:, **kwargs)
processor.call(job: job) do
ex = job.job_execution.create(job: job) # ActiveRecordでexecutionを増やす
# もろもろ起動の処理
begin
result = operation.new.call(...) # 普通に自分で定義したcallメソッドを実行
rescue => e
ActiveSupport::Notifications.instrument("batch_job.exception", e) do
result = Failure(e.full_message(highlight: false))
end
end
if result.success?
ex.complete!
else
ex.error!
end
# もろもろ終了の処理
result.either(->(r) { Success(ex) }, ->(e) { Failure(ex) })
end
end
- Operation は Dry::Operation の subclass であり、もともと Dry::Monads::Result を返す設計
- Operation の実行はすべて begin-rescue で囲み、例外が発生した場合にはそれをエラーという情報に変換して通常の実行時と同じ Failure オブジェクトにする
- エラーがあったことは ActiveSupport::Notifications を利用して伝える
- 監視システムとの接続はこれを subscribe して各自でやる
- Success の場合は complete イベントを記録、例外じゃない Failure もそうでない Failure も error テーブルの方に記録する
これで
バッチジョブシステム自体は例外で停止せずにすべての処理を実行でき、すべてのエラーを記録できるようになった。
これも結構大事で、なんか想定外のことがあった時にも例外のトレースがむなしくログに放流されているだけ、みたいなのが扱いにくいとずっと思っていたので、CI/CD サービスみたいにちゃんと実行と紐付けたエラーの情報を見れるようにしたいと思っていた。実現方法も大事で、
- イベント駆動とか Thread などのバックグラウンド処理を利用していない
- 基本的なエラー情報の伝播に例外を利用していない(Dry::Monads::Resultを利用)
だから素朴に begin-rescue で囲むだけで済む
ようになっている。この辺は Dry::Operation に触れてからイメージとしてはずっと持っていたもので、やっとイメージ通りに実現できるようになってきた。
ただ、開発時には例外は本当に例外で終了してくれた方が扱いやすいかもしれないので、そこは切り替えられるようにしてもよいと思う。
実際の起動と結果の受け取り
これはとてもシンプルになる。
result = SomeOperation.execute(job_class: SomeJob) # Dry::Monads::Result<JobExecution>
# 結果に基づく処理 ..
これも最初は Operation の call の中に job が渡ってきて、そこから create する処理だけを wrap した何かにしていたんだけど、絶対こんなの毎回書きたくないよなーと思って ActiveRecord 部分ができあがってから考え直そうと放置しておいて、ActiveRecord, Operation, 排他制御、Organizer が出揃ったら最終的にこうなった感じ。
最後に
改めて背景と考えたこと、実現できたことを挙げると、
- バッチジョブの成否の記録の扱い、失敗からの再試行の流れを手作業リカバリ込みで作り込むのが面倒で、一直線に実行して失敗は例外のまま放置とかログに垂れ流して放置していたが、さすがに扱いにくいので見直そうと思った
- 途中からの再開はぜひ実現したかったので、成功のログを記録し、そのログをもとに途中までの実行をスキップできるように、アプリケーションから扱いやすいログにしたかった
- ジョブの記法、その実行方法、実行結果の受け取り方はできるだけシンプルにしたかった
- 大きな基盤システムではなく、むしろ手軽に組み込める仕組みで考えたので何らかのサーバプロセスに依存しないようにした
- 結果、クラウドでうっかりコンテナ、インスタンスレベルで並列実行した場合の排他制御はできないが、一つのコンテナで扱っている限りは問題なく制御する方法を見つけた
- 例外で処理が止まって失敗を記録できなくなることは避けたかった
- でも例外が発生した異常終了と同等の事態が起きていることの発見が遅れるようにはしたくない
- 今後も活用しやすいように、できるだけクリーンな設計になるように
こんな感じ。
くり返すけど 最初からこんな設計ができていたわけではなくて、動かしながら設計していった。 これこそが自分の中の TDD のよさなんだよなと改めて感じた。
特に今回の ActiveRecord や Mutex のように I/O にゴリゴリに依存している時に最初からきれいに責務を分離して作るのは自分はあんまり得意じゃなくて、「あれこの機能ってこういう動きなのか、じゃあこういう使い方を想定していたけどダメだね」みたいなことがよく起きる。そういう時に最初からきれいに分離しようとすると、今度は分離はできてるけど必要な要件を満たしていない、みたいなことが起きがちになる。動かしながらテストして詰めていくのが向いているなぁと改めて思ったし、こういうのを AI に作ってもらうのがとても難しいと感じている。YAGNI 難しくないすか、AI コーディング。