今回は、ApacheがOSSとして提供しているAirflowを調査しましたので
Airflowでできることと、環境構築について紹介したいと思います。
Airflowでできること
Airflowではワークフローの作成とスケジュール実行、モニタリングを行うことができます。
ここで言うワークフローとは、複数のタスクを依存関係に従って実行していくものを言います。
ワークフローの作成
AirflowではワークフローはDAGと呼ばれています。
DAGとは有向非巡回グラフのことで、大雑把に説明すると各ノード(タスク)に順番をつけることができ(有向)、一度実行したノードを再実行しない(非巡回)グラフです。
ja.wikipedia.org
DAGはPythonで各タスクを定義することで作成します。
DAGを構成するタスクはOperatorと呼ばれており
Bashコマンドの実行や外部Pythonスクリプトの実行、HTTPリクエストの送信、Slackへの通知など様々なOperatorが用意されています。
Python API Reference — Airflow Documentation
簡単なワークフローであれば
用意されているOperatorを組み合わせるだけで作成することができます。
また、独自に開発したOperatorをプラグインとして追加することも可能です。
Plugins — Airflow Documentation
スケジュール実行
DAGに対して開始日時や実行間隔などが設定でき、定期実行させることが可能です。
タスクでエラーが発生した場合の再実行回数など、エラーハンドリングについても設定可能です。
設定はDAGのパラメータとして記述します。
https://airflow.apache.org/docs/stable/tutorial.html#default-arguments
モニタリング
DAGの一覧、各タスクの実行状況、タスクの依存関係などを確認できる管理コンソールが用意されており、Webブラウザ上で確認することができます。
失敗したタスクを手動で再実行させることも可能です。
Airflowの環境構築
Airflowのセットアップ
公式サイトでは数コマンドで完了させていますが、
githubにてDockerfileが公開されており、今回はこちらを利用して環境構築を行いました。
GitHub - puckel/docker-airflow: Docker Apache Airflow
以下3つの構築方法(Executor)があるようで
今回はLocalExecutorを利用しました。
- SequentialExecutor
- LocalExecutor
- CeleryExecutor
コンテナの開始後、http://localhost:8080 にWebブラウザでアクセスし以下画面が表示されればセットアップ完了です。
DAGの作成とデプロイ
コンテナ開始後、dagsフォルダが自動で作成されます。
そこにPythonスクリプトで記述したDAGを配置すると、自動デプロイされ実行可能になります。
今回は公式サイトにあるチュートリアルのDAGを配置します。
airflow.example_dags.tutorial — Airflow Documentation
DAGの実行
左端に表示されているトグルをOnにすると、
DAGが設定したスケジュール通りに実行されます。
チュートリアルDAGの場合、2日前を基準にして2回実行されます。
なお、右側のボタンをクリックするとスケジュールに関係なく即時実行されます。
DAG名をクリックするとTree View画面が表示され、DAGの実行状況が確認できます。
チュートリアルDAGでは以下タスクを定義していますが、
これらの実行結果を画面から確認することができます。
- 現在日付を表示
- Bashのsleepコマンドで5秒間処理を遅延
- Jinja Templateを利用して現在日付と一週間後の日付、メッセージを表示
各タスクのステータスをクリックするとタスクの詳細画面が表示されます。
ViewLogをクリックして実行ログを確認します。
実行ログに現在の日付が表示されていることが確認できます。
以上、簡単ですがAirflowについて紹介しました。
今後は、Operatorプラグインの開発やREST APIを利用した外部サービス連携などについて紹介できればと思います。