Apache Airflowの紹介(kintoneとの連携編)

Airflowとkintoneアプリを連携して、期日が近づくとメール送信・Slack通知する処理を作成したので紹介します。

処理の概要

次のようなイメージで処理を作成していきます。

  • kintoneの契約書管理アプリから各レコードの契約満了日を取得。
  • 契約自動更新が無しで直近(一週間以内)で契約満了となるのものについて、契約更新のお知らせメール・Slackメッセージを送信。

kintoneアプリ

デフォルトで用意されている契約書管理アプリを利用します。
f:id:toheih:20201012154335p:plain

  • 連絡先にはメールアドレスを指定することとします。

AirflowのDAG作成

以下のタスクを実行するDAGを作成します。

  • kintoneからREST APIでデータ取得。
    • レコードを一括取得するカーソルを作成。(SimpleHttpOperator)
    • カーソルIDをXComから取得。(PythonOperator)
    • カーソルからデータを取得。(SimpleHttpOperator)
  • 取得結果からメール本文を作成し、メール送信処理用のDAGを実行(PythonOperator)
  • メール送信処理を実行(EmailOperator)

XComとは、タスク間で変数のやり取りをするための仕組みです。
また、メール送信処理については複数回実行可能とするためデータ取得処理のDAGとは別のDAGを作成し、データ取得処理DAGから呼び出すようにします。

kintoneのデータ取得処理

複数レコードを取得する必要があるため、カーソルを利用して取得します。

# REST API実行結果判定処理
def response_check(response):
    result=response.json()
    return response.status_code == requests.codes.ok

# カーソル作成タスク
create_cursor = SimpleHttpOperator(
    task_id='create_cursor',
    http_conn_id='',
    method='POST',
    endpoint='https://(サブドメイン名).cybozu.com/k/v1/records/cursor.json',
    data=json.dumps(
        {
            'app': '(アプリID)',
            'fields' : ['company_name', 'tanto_name', 'tanto_address', 'keiyaku_manryo_date'],
            'query' : 'keiyaku_manryo_date <= THIS_WEEK() and keiyaku_auto_update not in ("有")'
        }
    ),
    headers={'X-Cybozu-API-Token': '(kinoneのAPIトークン)', 'Content-Type' : 'application/json'},
    response_check=response_check,
    log_response=True,
    xcom_push=True,
    dag=dag,
)
  • response_checkで指定した関数によって、タスクを正常終了とするかエラーとするかを決定できます。今回はHTTPステータスコードで判定しています。
  • xcom_push=Trueにすることで、タスクの実行結果をXComに保存することが出来ます。
    • 保存した値はタスクのViewLogから確認することが出来ます。

f:id:toheih:20201015160247p:plain

次に、カーソルからレコードデータを取得します。
取得にはカーソルIDが必要で、カーソル作成APIの取得結果から取得します。

# カーソルID取得処理
def get_id_from_xcom(**context):
    jsonvalue = context['ti'].xcom_pull(task_ids='create_cursor')
    value = json.loads(jsonvalue)
    return value['id']

# カーソルID取得タスク
get_id = PythonOperator(
    task_id='get_id',
    dag=dag,
    provide_context=True,
    xcom_push=True,
    python_callable=get_id_from_xcom,
)

# レコードデータ取得タスク
open_cursor = SimpleHttpOperator(
    task_id='open_cursor',
    http_conn_id='',
    method='GET',
    endpoint='https://(サブドメイン名).cybozu.com/k/v1/records/cursor.json',
    data={"id" : "{{ ti.xcom_pull(task_ids='get_id') }}"},
    headers={'X-Cybozu-API-Token': '(kinoneのAPIトークン)'},
    response_check=response_check,
    log_response=True,
    xcom_push=True,
    dag=dag,
)
  • context['ti'].xcom_pull(task_ids='xxxx')でXComに保存された値を取得することが出来ます。
    • また"{{ ti.xcom_pull(task_ids='xxxx') }}"と記載することで文字列に埋め込むことも出来ます。
  • カーソルから取得したレコードデータは以下の通りXComに保存されます。

f:id:toheih:20201015160553p:plain

メール・メッセージ送信処理

レコードデータからメール・メッセージ本文を作成し、別DAGに渡します。

# 本文作成処理
def create_mail_content_from_xcom(**context):
    jsonvalue = context['ti'].xcom_pull(task_ids='open_cursor')
    value = json.loads(jsonvalue)
    for keiyaku_info in value['records']:
        k = {'mail_content' : '', 'slack_content' : '', 'mail_address' : keiyaku_info['tanto_address']['value'] }
        mail_content = keiyaku_info['company_name']['value'] + '<br/>'
        mail_content += keiyaku_info['tanto_name']['value']+ '様<br/><br/>'
        mail_content += '平素よりサービスをご利用いただき、誠にありがとうございます。<br/>'
        mail_content += 'お客様の契約有効期限が間近となっておりますのでお知らせいたします。<br/>'
        mail_content += 'このメールは ' + keiyaku_info['keiyaku_manryo_date']['value'] +' に契約満了を迎える方へ送信しています。<br/>'
        mail_content += '更新のお手続きをお願い致します。'
        k['mail_content'] = mail_content
        k['slack_content'] = mail_content.replace('<br/>','\n')
        trigger_dag(dag_id='sub_notice',
                    conf=json.dumps(k),
                    execution_date=None,
                    replace_microseconds=False)
        pass
    return

# 本文作成処理実行タスク
create_mail_content = PythonOperator(
    task_id='create_mail_content',
    dag=dag,
    provide_context=True,
    xcom_push=False,
    python_callable=create_mail_content_from_xcom,
)
  • trigger_dagで別DAGを実行することが出来ます。またconfに指定した値は別DAGで参照することが可能です。

別DAGではメール送信処理・Slackメッセージ送信処理を実装します。

# メール送信処理タスク
send_email = EmailOperator(
    task_id='send_email',
    dag=sub_dag,
    to="{{ dag_run.conf['mail_address']}}",
    subject='契約満了のお知らせ',
    html_content="{{ dag_run.conf['mail_content']}}",
    mime_charset='utf-8',
)

# Slackメッセージ送信処理タスク
send_slack = SlackWebhookOperator(
    task_id='send_slack',
    dag=sub_dag,
    http_conn_id='slack_test',
    message="{{ dag_run.conf['slack_content']}}",
)

send_email >> send_slack
  • Slack送信を行う場合はあらかじめConn IdとWebhook URLをAirflow側に登録しておく必要があります。
    • Admin > Connection から登録することが出来ます。
    • Conn IdはSlackWebhookOperatorのhttp_conn_idに指定します。

f:id:toheih:20201015165504p:plain

動作確認

実際にkintoneにデータを登録します。
f:id:toheih:20201015182719p:plain

AirflowのDAGを実行し、エラーなく完了しました。
f:id:toheih:20201015182956p:plain

メールが届くことが確認できました。
f:id:toheih:20201015183059p:plain

Slackのほうにもちゃんとメッセージが送られています。
f:id:toheih:20201016131639p:plain

今回はDAGを手動で実行しましたが、スケジュール実行も可能なので定期的にデータをチェックして通知することも可能です。

以上、Airflowとkintoneの連携について紹介しました。