Celeryを使用したスケーリングアウト¶
CeleryExecutor
ワーカーの数をスケールアウトする方法の一つです。 この作業のためには、Celeryバックエンド(RabbitMQ、Redis、…)を設定し、executorパラメータをCeleryExecutor
にポイントするようにairflow.cfg
を変更し、関連するCelery設定を提供する必要があります。
Celery brokerの設定の詳細については、このトピックのexhaustive Celeryのドキュメントを参照してください。
ここでは、あなたの労働者のためのいくつかの不可欠な要件があります:
-
airflow
インストールする必要があり、CLIがパスにある必要があります -
気流の構成設定はクラスタ全体で同質である必要があります
-
ワーカーで実行される演算子は、そのコンテキストでdependenciesmetを持つ必要があります。 たとえば、
HiveOperator
を使用する場合は、hive CLIをそのボックスにインストールするか、MySqlOperator
を使用する場合は、必要なPythonライブラリをPYTHONPATH
で使用できる必要があります。 -
ワーカーは
DAGS_FOLDER
にアクセスできる必要があり、ファイルシステムを独自の方法で同期する必要があります。 一般的な設定は、GitリポジトリにDAGS_FOLDERを格納し、chef、Puppet、Ansible、またはyourenvironmentでマシンを設定するために使用するものを使用してマシン間で同期することです。 すべてのボックスに共通のマウントポイントがある場合は、そこでyourpipelinesファイルを共有することも機能するはずです
ワーカーをキックオフするには、Airflowを設定し、workersubcommandをキックオフする必要があります
airflow worker
あなたの労働者は、彼らがその方向に解雇されるとすぐにタスクを拾い始めるべきです。
Celeryの上に構築されたweb UIである”Celery Flower”を実行して作業者を監視することもできます。 ショートカットコマンドairflow flower
を使用して、Flower webサーバーを起動できます。
システムにはflower
pythonライブラリが既にインストールされている必要があることに注意してください。 お勧めの方法は、airflow celeryバンドルをインストールすることです。
pip install 'apache-airflow'
いくつかの注意点:
-
データベースにバックアップされた結果バックエンドを使用してください
-
最も長い実行中のタスクのETAを超える可視性タイムアウトを設定してください
-
タスクはリソースを消費できます。 Worker_concurrencyタスクを実行するのに十分なリソースがワーカーにあることを確認します
-
キュー名は256文字に制限されていますが、各ブローカバックエンドには独自の制限がある場合があります