https://buildersbox.corp-sansan.com/entry/2022/08/18/110000
この記事では新しくDAGを作成してデータの転送処理が遅れているかを監視する方法が紹介されました。監視と実行用のDAGを分離することでスッキリにはなりますが、DAGが増えることによって管理の手間が生じます(特にDAGが大量にある場合)。
を参考にして
DAGの引数on_failure_callback
とdagrun_timeout
を組み合わせることでDAGの遅延を監視する方法を試してみました。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
def _on_dag_run_fail(context):
print("***DAG failed!! do something***")
print(f"The DAG failed because: {context['reason']}")
print(context)
default_args = {
"owner": "mi_empresa"
}
with DAG(
dag_id="failure_callback_example",
start_date=datetime(2021, 9, 7),
schedule_interval=None,
default_args=default_args,
catchup=False,
on_failure_callback=_on_dag_run_fail,
dagrun_timeout=timedelta(seconds=45),
) as dag:
delayed = BashOperator(
task_id="delayed",
bash_command='echo "waiting..";sleep 60; echo "Done!!"',
)
will_fail = BashOperator(
task_id="will_fail",
bash_command="exit 1"
)
delayed >> will_fail
実行時間が45秒超えると、_on_dag_run_fail
が実行され失敗したDAGのコンテキスト情報がプリントされます。誰かの役に立つかもしれないので、試しながら気づいた2つのポイントを共有します
1. on_failure_callback
とdagrun_timeout
をdefault_argsに入れちゃったら動作しない
default_args
はDAG作成時に実行されるため、DAGのメタデータを入れるのが一般的with DAG(...)
に記載されている引数はDAG実行時に反映されるため、on_failure_callback
とdagrun_timeout
に入れないといけない
2. context.get('reason')
を利用すると、エラーの種類を判断可能
task_failure
タスク実行失敗によるエラーtimed_out
遅延によるエラー
なので、エラーによって異なる処理する場面では活用できそうですね。