Ruby用バックグラウンドジョブライブラリ「sidekiq」のコードを読んでみる(前半)


自分が担当するRailsアプリケーションでバックグラウンドジョブを動かす必要が出てきた。

Railsでバックグラウンドジョブといえば二大巨頭の ResqueSidekiq があると思っている。どちらかといえば最近採用されているイメージがあるSidekiqに関して、どんなコードになっているか主要な部分をチェックしておきたい。

今回は、Railsにsidekiqを組み込んでジョブを登録する部分まで。Sidekiqを直接使ってジョブを登録する方法と、ActiveJob経由で登録する方法の2つを比べて見る。

まとめ

  • Sidekiqを直接使用する場合、Redisのコネクションプールを用意しておき、それを使用して各Jobの情報をキューにpushしているだけ。ジョブの登録に関してはシンプルだった。
  • ActiveJob経由もバックエンドを抽象化している分、少しコードが複雑になっているが、やっていることはほとんど同じだった。あとコードが綺麗で読みやすい。
  • Redisに登録される情報は以下の通り。
    • 「queues」というSetにキューの名前を登録。
    • 「queue:default」というListにジョブの情報を追加。ジョブの情報はクラス(class)、ジョブの引数(args)、ID(jid)等。
    • ActiveJob経由の場合、クラスは ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper になっている。

詳細

  • 投稿時点で最新の「v5.0.0」が対象
  • Railsは5.0.2を使用
  • キューはlocalhostのRedisを使用する
  • 「Sidekiq」を直接使う方法と、「ActiveJob」経由の2つを確認する。

関連するファイル一覧は以下。

ActiveJobで関連するファイルは以下。

事前準備

Sidekiqにジョブを登録できる環境をRailsでサクッと構築する。事前にlocalhostでRedisを起動させておく(redis-cli -h 127.0.0.1 -p 6379 で接続できること)。

実施したコマンド一覧は以下。vimで変更したファイルは、Githubの このリポジトリ で確認できます。

ここに対してcurlで以下のようなリクエストを投げる。resultを含むJSONが返ってくれば、ジョブの登録に成功している。

Sidekiqのロード

まずライブラリロード時の挙動を確認するため、 lib/sidekiq.rb を見ておく。

  1. HardWorkerクラスでincludeしている Sidekiq::Worker モジュールなど、必要なファイルをロード
  2. Sidekiq モジュールの定義。ざっと見た感じオプションの設定や、Redis関連の機能が詰まっている感じ。
  3. Rails環境だったら、追加のファイルをロードしている。Rails 5系の場合、関係ありそうなのは この部分のコード。ただし、 Sidekiq.configure_server 経由で実行されており、このメソッドが Sidekiq::CLI が定義されていることであるため、今回のようなRailsでジョブを登録するだけの使い方の場合、実行されない。

Sidekiqを直接使用してジョブを登録

次にSidekiqを直接使用してジョブを登録するケースのコードを確認する。

  1. Sidekiq::Worker をincludeしたクラスを作成
  2. 上記クラスに #perform という名前のインスタンスメソッドを定義
  3. このクラスのクラスメソッド #perform_async を呼び出して、ジョブを登録。

2は定義するだけなので、1と3についてコードを読んでいく。

Sidekiq::Worker

はじめにWorkerクラスにSidekiq::Workerをincludeした時の挙動を確認する。
このモジュールには inculuded メソッドが定義されているため、include時にこのメソッドが呼び出される。

  1. include時にクラスメソッドを追加
  2. いくつかのクラス属性を追加
  3. ジョブ登録用のクラスメソッド。あとで読む
  4. このジョブ専用のオプションや次回リトライ実行時間の計算、リトライ失敗時の挙動を定義できるメソッドを追加

4に関しては、以下のような使い方を想定していると思われる。

Sidekiq::Worker.perform_async

ジョブを登録したい時は、 HardWorker.perform_async を呼び出す。その時の挙動を確認する。

  1. Redisのコネクションプールオブジェクトを取り出している。今回のケースは3つ目の Sidekiq.redis_pool から取得するはず。 Sidekiq.redis_pool では @redis がセットされていればそのオブジェクトを、そうでない場合、 Sidekiq::RedisConnection.create の結果(ConnectionPoolのインスタンス)を返す。今回のケースでは初期化していないので、 Sidekiq::RedisConnection.create を呼び出す。公式Wikiにある Using Redis の初期化をしている場合、すでにセットされている。
  2. 取得したRedisコネクションプールオブジェクトから Sidekiq::Client のインスタンスを生成し、さらに #push を呼んでいる。おそらくこの中でRedisにジョブを登録しているはず。

Sidekiq::RedisConnection.create

  1. 今回のケースでは、sizeは5、timeoutは1(秒)でConnectionPoolのインスタンスが生成される。この値は、 Sidekiq.redis= のオプションで変更可能。
  2. Sidekiq.redis_pool.with do |redis| ... end を呼ぶケースを考える。初回呼び出し時は、urlはnil、driverは’ruby’、reconnect_attemptsには1という値がセットされて Redis.new のインスタンスが生成され、以降このコネクションがプールされて使い回される。urlがnilの場合、 Redis.new のデフォルト値である redis://127.0.0.1:6379 が使用される。

Sidekiq::Client

  1. 引数で渡されたHashからRedisにpushするためのHashに変形している。もともとは ‘class’ と ‘args’ という2つのキーしかセットされていないHashだが、いくつかのキーと値が追加され、’class’ の値はStringに変換されている。
  2. 今回のケースはMiddlewareをセットしていないため、引数で渡したitemがそのまま返るだけだが、公式Wiki によるとMiddlewareを活用すると共通の処理を入れ込むことができそう。
  3. Redisに値をセットしている。まず引数で渡されたpayloadにenqueued_atをセット。そして ‘queues’ というSetに ‘default’ がpushされる。そして、queue:default というList型にpayloadをJSONにした文字列をセットしている。

ここまででSidekiqを直接使用して、Redisにジョブを登録する部分を確認することができた。以下にredisのqueue:defaultのうちHardWorkerでpushされたものを抜粋するが、想定通りのデータになっていると思う。

ActiveJob経由でSidekiqにジョブを登録

ここまででかなりお腹いっぱい感があるが、あと少しだけActiveJob経由の登録も見てみる。

  1. ApplicationJob を継承したクラスを作成
  2. 上記クラスに #perform という名前のインスタンスメソッドを定義
  3. このクラスのクラスメソッド #perform_later を呼び出して、ジョブを登録。

1と3についてコードを読んでいく。

ApplicationJob

ApplicationJob はActiveJob::Baseを継承しただけのクラス。

ActiveJob::Base

ActiveJob::Base はactivejobのGem内に定義されている。

  1. 機能別のModuleをinclude。今回関係あるのは、ActiveJob::Core(初期化周り)、ActiveJob::QueueAdapter(バックエンドの設定)、ActiveJob::Enqueuing(ジョブの登録)の3つ。
  2. :active_job で登録されたフックスクリプトを ActiveJob::Base をレシーバーに実行している。実行されるスクリプトは lib/active_job/railtie.rb で定義されている。

ActiveJob::Railtie

このファイルは config/application.rb の中でロードされており、ロードした後で config.queue_adapter = :sidekiq を実施している。そして、先ほどの ActiveJob::Base の最後でここで登録したスクリプトが実行されている。

  1. ActiveJob::Base.loggerにRailsのloggerをセット
  2. config.queue_adapterがセットされていなければデフォルトで :async を使用
  3. ここで ActionJob::Base.queue_adapter = :sidekiq を実行
  4. ここはよくわからなかった…。

3で実行される処理は ActiveJob::QueueAdapter 、および ActiveJob::QueueAdapters に書かれている。

ActiveJob::QueueAdapter, ActiveJob::QueueAdapters

  1. ActiveJob::Baseでのinclude時にクラス属性「_queue_adapter」を定義している。ActiveSupportの .class_attribute を使って定義しているため、サブクラスでも親クラスの属性が参照可能で、かつ独自に上書きもできるようになっている。
  2. ここが先ほどの続き。 パラメーター :sidekiq に対応するQueueAdapterのクラスを探して、newでインスタンス化したものを_queue_adapterにセットしている。以降、サブクラスでは「.queue_adapter」メソッドを呼ぶとここでセットされたインスタンスを参照可能。さらにサブクラスで独自に上書きもできる。
  3. name.to_s.camelize << ADAPTER の結果として 'SidekiqAdapter' が返る。この文字列に対応する定数なので ActiveJob::QueueAdapters::Sidekiq クラスが返る。

以上から SoftWorkJob クラスで .queue_adapter メソッドを呼ぶと SidekiqAdapter のインスタンスが返るようになっていることが確認できる。

ActiveJobの登録

ジョブの登録は SoftWorkJob.perform_later を実行する。このメソッドは ActiveJob::Enqueuing に定義されている。

  1. おおまかな流れとしては「ジョブインスタンスの生成」->「#enqueueメソッドの呼びだし」の2つ
  2. インスタンス生成は ActiveJob::Core に定義されている。SoftWorkJob.perform_later で指定した引数は全て @arguments に保存される。また、この段階でpush先のキューや優先度をクラスの設定値から設定する。
  3. Jobの情報をRedisに格納可能な形のHashにする。Jobに渡された引数の他、ジョブのクラスやID、キューの名前なども含まれる。
  4. 先ほど見た通り self.class.queue_adapter.enqueue はActiveJob::QueueAdapters::SidekiqAdapterのインスタンスを返す。このインスタンスの #enqueue メソッドにSoftWorkJobのインスタンスを引数として実行する。

ActiveJob::QueueAdapters::SidekiqAdapter#enqueue

  1. ここでSidekiqを直接使用したケースとほぼ同じコードが出てくる。ただし、渡しているデータは少し異なっていて、一番の違いはclassにJob自身のクラスではなく、 ActiveJob::QueueAdapters::SidekiqAdapter を指定している点である。そして、本来のJobのクラスはwrappedに指定し、ジョブの引数等が含まれたHashはargsに含めている。

Sidekiqに登録するクラスが全てのジョブで共通になってしまうため、Sidekiqの機能である retry_in などのカスタマイズは実行できないと思われる。

最後に、redis-cliで登録されたデータを見てみるが、想定通りになっている。