Apache Airflowの紹介(プラグイン編)

前回に引き続きAirflowについて紹介します。
今回はプラグインについてです。

プラグインとは

Airflowで使用するOperatorやHook、Sensorを独自に作成することができる機能です。
※Hookとは外部リソースとのやり取りをするプログラムで、Sensorはあるアクションが発生した際に実行されるOperatorです。
今回は独自のOperator(カスタムOperator)を作成してみます。

作成方法

事前準備

前回用意したdocker-compose.xmlにvolumesの設定を追加します。
プラグイン用のフォルダを設定します。

 volumes:
     - ./dags:/usr/local/airflow/dags
     - ./plugins:/usr/local/airflow/plugins

フォルダ構成

docker-compose up をするとpluginsフォルダが作成されます。
pluginsフォルダ配下に以下のファイル、フォルダを作成します。

plugins
├─sample_plugin.py・・・・・・・プラグインクラス
└─operators
     └─ sample_operator.py ・・・カスタムOperatorクラス

プラグインクラスの作成

sample_plugin.pyはプラグインクラスになります。
プラグインとして追加するOperatorの名前を定義します。
以下のように定義します。

from airflow.plugins_manager import AirflowPlugin
from operators.sample_operator import SampleOperator

class SamplePlugin(AirflowPlugin):
    name = "sample_plugin"
    operators = [SampleOperator]

プラグインクラスでは、airflow.plugins_manager.AirflowPluginクラスを継承する必要があります。
またSampleOperatorはこれから作成するカスタムOperatorクラスです。

カスタムOperatorクラスの作成

sample_operator.pyはカスタムOperatorクラスになります。
このクラスで具体的に実行する処理を定義します。
今回は公式サイトにあるサンプル処理(渡された引数に「Hello」をつけてログ出力する処理)を使用します。

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class SampleOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        print(message)
        return message

カスタムOperatorクラスではairflow.models.baseoperator.BaseOperatorクラスを継承する必要があります。

動作確認

作成したOperatorを実行するDAGを作成します。
Operatorの引数には「toheih」を渡します。

from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago

from airflow.operators.sample_plugin import SampleOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(0),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'hello',
    default_args=default_args,
    description='hello DAG',
    schedule_interval=timedelta(days=1),
)

hello = SampleOperator(
    task_id='sample-task',
    name='toheih',
    dag=dag,
)

hello

Aiflowにデプロイして動作させてみます。
プラグインを有効にするにはAirflowを再起動する必要があります。
f:id:toheih:20200611132834p:plain

エラーなく動作完了しました。
f:id:toheih:20200611132827p:plain

以下の通り「Hello toheih」と表示されることが確認できました。
f:id:toheih:20200611132831p:plain


以上、簡単ですがAirflowのプラグイン機能について紹介しました。