Celeryを使用したスケーリングアウト¶

CeleryExecutor ワーカーの数をスケールアウトする方法の一つです。 この作業のためには、Celeryバックエンド(RabbitMQ、Redis、…)を設定し、executorパラメータをCeleryExecutorにポイントするようにairflow.cfgを変更し、関連するCelery設定を提供する必要があります。

Celery brokerの設定の詳細については、このトピックのexhaustive Celeryのドキュメントを参照してください。

ここでは、あなたの労働者のためのいくつかの不可欠な要件があります:

  • airflow インストールする必要があり、CLIがパスにある必要があります

  • 気流の構成設定はクラスタ全体で同質である必要があります

  • ワーカーで実行される演算子は、そのコンテキストでdependenciesmetを持つ必要があります。 たとえば、HiveOperatorを使用する場合は、hive CLIをそのボックスにインストールするか、MySqlOperatorを使用する場合は、必要なPythonライブラリをPYTHONPATHで使用できる必要があります。

  • ワーカーはDAGS_FOLDERにアクセスできる必要があり、ファイルシステムを独自の方法で同期する必要があります。 一般的な設定は、GitリポジトリにDAGS_FOLDERを格納し、chef、Puppet、Ansible、またはyourenvironmentでマシンを設定するために使用するものを使用してマシン間で同期することです。 すべてのボックスに共通のマウントポイントがある場合は、そこでyourpipelinesファイルを共有することも機能するはずです

ワーカーをキックオフするには、Airflowを設定し、workersubcommandをキックオフする必要があります

airflow worker

あなたの労働者は、彼らがその方向に解雇されるとすぐにタスクを拾い始めるべきです。

Celeryの上に構築されたweb UIである”Celery Flower”を実行して作業者を監視することもできます。 ショートカットコマンドairflow flowerを使用して、Flower webサーバーを起動できます。

システムにはflowerpythonライブラリが既にインストールされている必要があることに注意してください。 お勧めの方法は、airflow celeryバンドルをインストールすることです。

pip install 'apache-airflow'

いくつかの注意点:

  • データベースにバックアップされた結果バックエンドを使用してください

  • 最も長い実行中のタスクのETAを超える可視性タイムアウトを設定してください

  • タスクはリソースを消費できます。 Worker_concurrencyタスクを実行するのに十分なリソースがワーカーにあることを確認します

  • キュー名は256文字に制限されていますが、各ブローカバックエンドには独自の制限がある場合があります

コメントを残す

メールアドレスが公開されることはありません。