https://buildersbox.corp-sansan.com/entry/2022/08/18/110000

この記事では新しくDAGを作成してデータの転送処理が遅れているかを監視する方法が紹介されました。監視と実行用のDAGを分離することでスッキリにはなりますが、DAGが増えることによって管理の手間が生じます(特にDAGが大量にある場合)。

https://stackoverflow.com/questions/69312630/airflow-triggering-the-on-failure-callback-when-the-dagrun-timeout-is-exceed

を参考にして DAGの引数on_failure_callbackdagrun_timeoutを組み合わせることでDAGの遅延を監視する方法を試してみました。

from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator


def _on_dag_run_fail(context):
    print("***DAG failed!! do something***")
    print(f"The DAG failed because: {context['reason']}")
    print(context)

default_args = {
    "owner": "mi_empresa"
}

with DAG(
    dag_id="failure_callback_example",
    start_date=datetime(2021, 9, 7),
    schedule_interval=None,
    default_args=default_args,
    catchup=False,
    on_failure_callback=_on_dag_run_fail,
    dagrun_timeout=timedelta(seconds=45),
) as dag:

    delayed = BashOperator(
        task_id="delayed",
        bash_command='echo "waiting..";sleep 60; echo "Done!!"',
    )
    will_fail = BashOperator(
        task_id="will_fail",
        bash_command="exit 1"
    )
delayed >> will_fail

実行時間が45秒超えると、_on_dag_run_failが実行され失敗したDAGのコンテキスト情報がプリントされます。誰かの役に立つかもしれないので、試しながら気づいた2つのポイントを共有します

1. on_failure_callbackdagrun_timeoutをdefault_argsに入れちゃったら動作しない

  • default_argsはDAG作成時に実行されるため、DAGのメタデータを入れるのが一般的
  • with DAG(...)に記載されている引数はDAG実行時に反映されるため、on_failure_callbackdagrun_timeoutに入れないといけない

2. context.get('reason')を利用すると、エラーの種類を判断可能

  • task_failure タスク実行失敗によるエラー
  • timed_out 遅延によるエラー

なので、エラーによって異なる処理する場面では活用できそうですね。