前回に引き続きAirflowについて紹介します。
今回はプラグインについてです。
プラグインとは
Airflowで使用するOperatorやHook、Sensorを独自に作成することができる機能です。
※Hookとは外部リソースとのやり取りをするプログラムで、Sensorはあるアクションが発生した際に実行されるOperatorです。
今回は独自のOperator(カスタムOperator)を作成してみます。
作成方法
事前準備
前回用意したdocker-compose.xmlにvolumesの設定を追加します。
プラグイン用のフォルダを設定します。
volumes: - ./dags:/usr/local/airflow/dags - ./plugins:/usr/local/airflow/plugins
フォルダ構成
docker-compose up をするとpluginsフォルダが作成されます。
pluginsフォルダ配下に以下のファイル、フォルダを作成します。
plugins ├─sample_plugin.py・・・・・・・プラグインクラス └─operators └─ sample_operator.py ・・・カスタムOperatorクラス
プラグインクラスの作成
sample_plugin.pyはプラグインクラスになります。
プラグインとして追加するOperatorの名前を定義します。
以下のように定義します。
from airflow.plugins_manager import AirflowPlugin from operators.sample_operator import SampleOperator class SamplePlugin(AirflowPlugin): name = "sample_plugin" operators = [SampleOperator]
プラグインクラスでは、airflow.plugins_manager.AirflowPluginクラスを継承する必要があります。
またSampleOperatorはこれから作成するカスタムOperatorクラスです。
カスタムOperatorクラスの作成
sample_operator.pyはカスタムOperatorクラスになります。
このクラスで具体的に実行する処理を定義します。
今回は公式サイトにあるサンプル処理(渡された引数に「Hello」をつけてログ出力する処理)を使用します。
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class SampleOperator(BaseOperator): @apply_defaults def __init__( self, name: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message
カスタムOperatorクラスではairflow.models.baseoperator.BaseOperatorクラスを継承する必要があります。
動作確認
作成したOperatorを実行するDAGを作成します。
Operatorの引数には「toheih」を渡します。
from datetime import timedelta from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators.sample_plugin import SampleOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(0), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'hello', default_args=default_args, description='hello DAG', schedule_interval=timedelta(days=1), ) hello = SampleOperator( task_id='sample-task', name='toheih', dag=dag, ) hello
Aiflowにデプロイして動作させてみます。
※プラグインを有効にするにはAirflowを再起動する必要があります。
エラーなく動作完了しました。
以下の通り「Hello toheih」と表示されることが確認できました。
以上、簡単ですがAirflowのプラグイン機能について紹介しました。