アラートを出す際にAirflowのContextから誤ったtask idが取得されてしまうバグの対処法
先日投稿した記事はAirflow DAGのon_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する方法を紹介しました。 Contextから誤ったtask idが取得されてしまう contextからdag_runの情報を取得してチャットツールやメールにアラートを出すのは一般的です。Slackにアラートを出す際の例ですが、dag_id, run_id, task_id, reason, log_urlを取得して、webhookでSlackの特定なチャンネルに投稿し、log_urlをクリックするだけですぐローカルあるいはクラウド環境(例えばCloud Composer)で失敗したtaskのログを確認できるので、アラート解消の効率化に繋がります。 ソースは以下となります。 from slack_sdk.webhook import WebhookClient from airflow.models import Variable from textwrap import dedent def notify_error(workflow: str, context: dict) -> None: webhook = WebhookClient(Variable.get("slack_webhook_access_token")) log_url = context.get("task_instance").log_url message = dedent( f""" :x: Task has failed. *Workflow*: {workflow} *DAG*: {context.get('task_instance').dag_id} *Run ID* {context.get('dag_run').run_id} *Task*: {context.get('task_instance').task_id} *Reason*: {context.get('reason')} <{log_url}| *Log URL*> """ ) webhook.send( text="alert", blocks=[ { "type": "section", "text": {"type": "mrkdwn", "text": message}, } ], ) しかし、数回検証してみた結果、実行が失敗したタスクtask_idではなく、誤ったtask_idが取得されてしまう事象がしばしば発生します。Airflowの既知バグで、現時点(2022....