Apache Airflowの紹介(概要・環境構築編)

今回は、ApacheOSSとして提供しているAirflowを調査しましたので
Airflowでできることと、環境構築について紹介したいと思います。

Airflowでできること

Airflowではワークフローの作成とスケジュール実行、モニタリングを行うことができます。
ここで言うワークフローとは、複数のタスクを依存関係に従って実行していくものを言います。

ワークフローの作成

AirflowではワークフローはDAGと呼ばれています。
DAGとは有向非巡回グラフのことで、大雑把に説明すると各ノード(タスク)に順番をつけることができ(有向)、一度実行したノードを再実行しない(非巡回)グラフです。
ja.wikipedia.org
DAGはPythonで各タスクを定義することで作成します。

DAGを構成するタスクはOperatorと呼ばれており
Bashコマンドの実行や外部Pythonスクリプトの実行、HTTPリクエストの送信、Slackへの通知など様々なOperatorが用意されています。
Python API Reference — Airflow Documentation

簡単なワークフローであれば
用意されているOperatorを組み合わせるだけで作成することができます。

また、独自に開発したOperatorをプラグインとして追加することも可能です。
Plugins — Airflow Documentation

スケジュール実行

DAGに対して開始日時や実行間隔などが設定でき、定期実行させることが可能です。
タスクでエラーが発生した場合の再実行回数など、エラーハンドリングについても設定可能です。
設定はDAGのパラメータとして記述します。
https://airflow.apache.org/docs/stable/tutorial.html#default-arguments

モニタリング

DAGの一覧、各タスクの実行状況、タスクの依存関係などを確認できる管理コンソールが用意されており、Webブラウザ上で確認することができます。
失敗したタスクを手動で再実行させることも可能です。

f:id:toheih:20200513151757p:plain
DAGに定義された各タスクの実行状況が視覚的に確認できます。

Airflowの環境構築

Airflowのセットアップ

公式サイトでは数コマンドで完了させていますが、
githubにてDockerfileが公開されており、今回はこちらを利用して環境構築を行いました。
GitHub - puckel/docker-airflow: Docker Apache Airflow

以下3つの構築方法(Executor)があるようで
今回はLocalExecutorを利用しました。

  • SequentialExecutor
  • LocalExecutor
  • CeleryExecutor

コンテナの開始後、http://localhost:8080Webブラウザでアクセスし以下画面が表示されればセットアップ完了です。
f:id:toheih:20200513154903p:plain

DAGの作成とデプロイ

コンテナ開始後、dagsフォルダが自動で作成されます。
そこにPythonスクリプトで記述したDAGを配置すると、自動デプロイされ実行可能になります。
今回は公式サイトにあるチュートリアルのDAGを配置します。
airflow.example_dags.tutorial — Airflow Documentation

f:id:toheih:20200513160903p:plain
DAGを配置すると自動でデプロイされます。

DAGの実行

左端に表示されているトグルをOnにすると、
DAGが設定したスケジュール通りに実行されます。
チュートリアルDAGの場合、2日前を基準にして2回実行されます。
なお、右側のボタンをクリックするとスケジュールに関係なく即時実行されます。
f:id:toheih:20200513162203p:plain

DAG名をクリックするとTree View画面が表示され、DAGの実行状況が確認できます。
f:id:toheih:20200513162905p:plain

チュートリアルDAGでは以下タスクを定義していますが、
これらの実行結果を画面から確認することができます。

  • 現在日付を表示
  • Bashのsleepコマンドで5秒間処理を遅延
  • Jinja Templateを利用して現在日付と一週間後の日付、メッセージを表示

各タスクのステータスをクリックするとタスクの詳細画面が表示されます。
ViewLogをクリックして実行ログを確認します。
f:id:toheih:20200513165234p:plain

実行ログに現在の日付が表示されていることが確認できます。
f:id:toheih:20200513165527p:plain


以上、簡単ですがAirflowについて紹介しました。
今後は、Operatorプラグインの開発やREST APIを利用した外部サービス連携などについて紹介できればと思います。