アラートを出す際にAirflowのContextから誤ったtask idが取得されてしまうバグの対処法

先日投稿した 記事 はAirflow DAGのon_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する方法を紹介しました。 Contextから誤ったtask idが取得されてしまう contextからdag_runの情報を取得してチャットツールやメールにアラートを出すのは一般的です。Slackにアラートを出す際の例ですが、dag_id, run_id, task_id, reason, log_urlを取得して、webhookでSlackの特定なチャンネルに投稿し、log_urlをクリックするだけですぐローカルあるいはクラウド環境(例えばCloud Composer)で失敗したtaskのログを確認できるので、アラート解消の効率化に繋がります。 ソースは以下となります。 from slack_sdk.webhook import WebhookClient from airflow.models import Variable from textwrap import dedent def notify_error(workflow: str, context: dict) -> None: webhook = WebhookClient(Variable.get("slack_webhook_access_token")) log_url = context.get("task_instance").log_url message = dedent( f""" :x: Task has failed. *Workflow*: {workflow} *DAG*: {context.get('task_instance').dag_id} *Run ID* {context.get('dag_run').run_id} *Task*: {context.get('task_instance').task_id} *Reason*: {context.get('reason')} <{log_url}| *Log URL*> """ ) webhook.send( text="alert", blocks=[ { "type": "section", "text": {"type": "mrkdwn", "text": message}, } ], ) しかし、数回検証してみた結果、実行が失敗したタスクtask_idではなく、誤ったtask_idが取得されてしまう事象がしばしば発生します。Airflowの既知バグで、現時点(2022.11.21)はまだ修正されていないです。 検証環境 Airflow 2.1.4 Airflow 2.2.5 対処法 https://stackoverflow.com/questions/72668764/get-task-id-of-failed-task-from-within-an-airflow-dag-level-failure-callback stackoverflowの回答を参考しながら、解決法を考えてみました。 一旦dag_run.get_task_instances(state="failed")を利用して全てfailedとなったDAGのタスクを取得し、最初に失敗したtaskのtask_idをアラートメッセージに入れることで、上記バグを回避できます。 通知用の関数を少し修正を入れる必要があります。 ...

November 21, 2022 · Me

データ基盤におけるGoogleグループ・IAMによるアクセス制御

会社のTech Blogに記事を書きました データ基盤におけるGoogleグループ・IAMによるアクセス制御

November 18, 2022 · Me

Airflowのon_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する

https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 この記事では新しくDAGを作成してデータの転送処理が遅れているかを監視する方法が紹介されました。監視と実行用のDAGを分離することでスッキリにはなりますが、DAGが増えることによって管理の手間が生じます(特にDAGが大量にある場合)。 https://stackoverflow.com/questions/69312630/airflow-triggering-the-on-failure-callback-when-the-dagrun-timeout-is-exceed を参考にして DAGの引数on_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する方法を試してみました。 from datetime import datetime, timedelta from airflow import DAG from airflow.models import TaskInstance from airflow.operators.bash import BashOperator def _on_dag_run_fail(context): print("***DAG failed!! do something***") print(f"The DAG failed because: {context['reason']}") print(context) default_args = { "owner": "mi_empresa" } with DAG( dag_id="failure_callback_example", start_date=datetime(2021, 9, 7), schedule_interval=None, default_args=default_args, catchup=False, on_failure_callback=_on_dag_run_fail, dagrun_timeout=timedelta(seconds=45), ) as dag: delayed = BashOperator( task_id="delayed", bash_command='echo "waiting..";sleep 60; echo "Done!!"', ) will_fail = BashOperator( task_id="will_fail", bash_command="exit 1" ) delayed >> will_fail 実行時間が45秒超えると、_on_dag_run_failが実行され失敗したDAGのコンテキスト情報がプリントされます。誰かの役に立つかもしれないので、試しながら気づいた2つのポイントを共有します 1. on_failure_callbackとdagrun_timeoutをdefault_argsに入れちゃったら動作しない default_argsはDAG作成時に実行されるため、DAGのメタデータを入れるのが一般的 with DAG(...)に記載されている引数はDAG実行時に反映されるため、on_failure_callbackとdagrun_timeoutに入れないといけない 2. context.get('reason')を利用すると、エラーの種類を判断可能 task_failure タスク実行失敗によるエラー timed_out 遅延によるエラー なので、エラーによって異なる処理する場面では活用できそうですね。

November 17, 2022 · Me

Telegramで多言語ニュースチャットボットを作った話

先日 多言語Webニュースアプリ を作りました。 通勤電車で当日のニュースをチェックしながら、外国語を勉強できるので、まあまあ使いやすかったです。 実はこのWebを開発する1年ほど前に、一度Telegram botで多言語ニュースチャットボットを開発していました。今日はそれについて紹介したいと思います。 https://github.com/aibazhang/multitrue-bot 事前準備 Telegram bot Telegramは日本ではあまり人気がないですが、Telegramのアカウントを作るだけでAPIキーを取得できるので、ざくっとボット作りたい場合は使い勝手がけっこういいです。また、良さげのSDKもあります↓。そういう意味ではSlackも同じですが、Telegramは感覚的にLineに近いので手軽さで勝っていると思います。 https://github.com/python-telegram-bot/python-telegram-bot/tree/master/examples ニュースの取得 ニュースの取得方法は先日開発したWebアプリと同じく NewsAPI 利用して取得しています。なぜNewsAPIを採用したかについては 半年前の記事 で詳細を説明したので、割愛します。 実装 https://core.telegram.org/ に参照してボットの初期設定が完了したら、実装に入ります。 フローは以下となります。 /startでニュースボットを起動 国・地域を選ぶ ニュースのジャンルを選ぶ 終了あるいは2.に戻る 全体フロー SDKが提供してくれた下記3つのhandlerを利用しています。 ConversationHandler: 先ほど設計したフローに基づいてhandlerを定義する。 CommandHandler: コマンドによって発火される。今回は「入り口 (entry point)」として使う CallbackQueryHandler: 他のhandlerの返り値によって発火される def main(): updater = Updater( token=json.load(open(KEY_PATH / "keys.json", "r"))["telegram_key"], use_context=True, ) dispatcher = updater.dispatcher country_pattern = "^us|jp|cn|tw|kr|gb$" headlines_pattern = "^us|jp|cn|tw|kr|gb business|entertainment|general|health|science|sports|technology|$" conv_handler = ConversationHandler( entry_points=[CommandHandler("start", start)], states={ "CATEGORY": [CallbackQueryHandler(select_category, pattern=country_pattern)], "HEADLINES": [CallbackQueryHandler(get_news, pattern=headlines_pattern)], "START OVER OR NOT": [ CallbackQueryHandler(start_over, pattern="^start over$"), CallbackQueryHandler(end, pattern="^end$"), ], }, fallbacks=[CommandHandler("start", start)], ) dispatcher.add_handler(conv_handler) updater.start_polling() updater.idle() メニュー 続いてはcallback関数を見ていきましょう。start関数はwelcomeメッセージをユーザに送って、ユーザに国・地域を選んでもらいます。その後、select_category関数でニュースのジャンルを選んでもらいます。 引数contextとupdate context.user_data: ボットを利用してユーザの情報を取得する context.bot: ボット自体を操作する(今回はメッセージを送るだけ)のに使う update: メッセージを更新するのに使う def start(update, context): user = update.message.from_user logger.info("User {} started the conversation.".format(user)) for i, v in vars(user).items(): context.user_data[i] = v welcome_message = ( "Hello, {}\n" "This is JC News bot🗞️🤖\n\n" "You can get Top News Headlines for a Country and a Category from here. \n\n".format(user.first_name) ) print(context) keyborad = [ [ InlineKeyboardButton("🇺🇸", callback_data="us"), InlineKeyboardButton("🇯🇵", callback_data="jp"), InlineKeyboardButton("🇹🇼", callback_data="tw"), ], [ InlineKeyboardButton("🇰🇷", callback_data="kr"), InlineKeyboardButton("🇬🇧", callback_data="gb"), InlineKeyboardButton("🇨🇳", callback_data="cn"), ], ] reply_markup = InlineKeyboardMarkup(keyborad) context.bot.send_message(chat_id=update.effective_chat.id, text=welcome_message) update.message.reply_text("Please Choose a Country🤖", reply_markup=reply_markup) return "CATEGORY" def select_category(update, context): logger.info("User data from context {}".format(context.user_data)) logger.info("Chat data from context {}".format(context.chat_data)) logger.info("Bot data from context {}".format(context.bot_data)) query = update.callback_query query.answer() country = query.data keyborad = [ [ InlineKeyboardButton("👩🏼‍💻Technology", callback_data=country + " technology"), InlineKeyboardButton("🧑‍💼Business", callback_data=country + " business"), ], [ InlineKeyboardButton("👨🏻‍🎤Entertainment", callback_data=country + " entertainment"), InlineKeyboardButton("👩🏻‍⚕️Health", callback_data=country + " health"), ], [ InlineKeyboardButton("👨🏿‍🔬Science", callback_data=country + " science"), InlineKeyboardButton("🏋🏼‍♂️Sports", callback_data=country + " sports"), ], [InlineKeyboardButton("🌎General", callback_data=country + " general")], ] reply_markup = InlineKeyboardMarkup(keyborad) query.edit_message_text(text="Please Choose a Category🤖", reply_markup=reply_markup) return "HEADLINES" ニュースを取得 最後は最もコアとなるニュースを取得するロジックです。countryとcategoryによってNewAPIから最新ホットラインニュースを取得しフォーマットした後、ユーザにメッセージを送ります。 具体的な詳細は説明しませんが、NewsAPICollectorというクラスでNewAPIをラッピングしています。 ...

November 3, 2022 · Me

GitHub Issueだけで自分のルーティングを管理し、そして草を生やす

先日シェルスクリプトで 個人ナレッジマネジメントツール を作った話しを投稿して、予想以上に需要がありました。 ルーティングをGitHub Posterに生成 似たような発想でルーティング管理アプリを使わずに、GitHubのcontributionsのように自分のルーティング(例えば読書、ランニング、LeetCode、外国語の勉強)を管理できると面白くない?と思いながら、GitHub上で検索したらyihong0618さんが開発したGitHubPosterを発見しました。 https://github.com/yihong0618/GitHubPoster GitHub Isssue、Duolingo、Twitter、Kindleなど20個以上のAPIで履歴を取得し、GitHub svg poster(aka. 皆さんが大好きなGitHubの草)を生成します。 実際使ってみよう ローダーは20個以上あり、とりあえずissueで今年年始以来の読書ルーティングの草を生やしてみました。 Issueを書く issueフォーマットは↓に従う必要でがあります。 {整数} {内容} 今年は1月から、日課をこなした日に当該Issueに↓のコメント追加していました。 2 「データ指向アプリケーションデザイン」 環境構築 pip install -U 'github_poster[all]' 実行 GitHubをトークンを取得し、下記コマンドを実行するだけです。 github_poster issue --issue_number ${issue_number} --repo_name ${repo_name} --token ${github_token} また、オプション --special-color1 --special-color2 ---stand-with-ukraine などによって色を指定することも可能です。(ウクライナをサポートする配色もあるようですね) 結果 最終的に生成されたGitHub Poster(.svgファイル)はこんな感じです。単位はtimes(回数)になっているが、hours(時間)が正しいです。 確認してみると、 今年今まで336時間読書していて、週の真ん中あまり本を読んでいないと気づきました。 使った感想 余計なモバイルアプリを使わずに、ルーティング管理できるのはミニマムリスト的には最高 生成されたファイルは.svgなので、自分のサイトや他のところに取り入れるのも簡単そう 新しいローダーの開発に貢献してみたい

November 2, 2022 · Me

Airflowで構築したワークフローを検証する

データ基盤の監視 データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、監視は言うまでもなく品質を担保するため重要な存在です。データ基盤監視の考え方についてこの2つの記事が紹介しています。 https://tech-blog.monotaro.com/entry/2021/08/24/100000 https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 同じくSQLによるデータ基盤を監視しており、最も大きな違いは自作ツールかAirflowで検証することだけです。本文はAirflowで構築したワークフローの検証についてもう少し紹介したいと思います。 まず、 Data Pipelines Pocket Reference ではデータ基盤検証の原則が紹介されました。 Validate Early, Validate Often 要はできるだけ早く、できるだけ頻繁に検証するとのことです。ELTあるいはETL処理においては、Extract, Load, Transformそれぞれのステップが終了した直後に監視するのは最も理想的だと思います。 Transformはデータセットのコンテキストを把握しておかないと検証できないため、データセットごとに対応していく必要があります。ExtractとLoadはnon-contextで汎用的な検証ができるため、データ基盤構築の序盤からやっておいた方が安心だと思います。 Extractの検証 ストレージサービス(例えばGCS)をデータレイクにする場合、データソースからデータレイクにデータがちゃんとレプリケートされたかを検証するためにAirflowのairflow.providers.google.cloud.sensors.gcsを利用すると簡単にできます。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/gcs/index.html 一つのファイルをチェックする場合はほとんどないと思うので、GCSObjectExistenceSensorよりGCSObjectsWithPrefixExistenceSensorの方がもっと実用的でしょう。下記のタスクをExtractと次の処理の間に挟むと、障害の早期発見が期待できます。なお、Extract時点で既に障害が起きている場合はほとんどデータソース側の処理が失敗しているので、アプリケーション側と連携して作業する必要があります。 check_extract = GCSObjectsWithPrefixExistenceSensor( task_id="check_extract", bucket="{YOUR_BUCKET}", prefix="{YOUR_PREFIX}", ) Loadの検証 ELTとETL処理、Loadするタイミングは異なりますが、検証の方法(データがデータウェアハウスあるいはデータマートにロードされたか)は同じです。よく使われているデータウェアハウスサービスBigQueryだと、Airflowのairflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperatorを利用できます。 https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/contrib/operators/bigquery_check_operator/index.html#airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator 下記のタスクをLoad処理の後ろに追加すれば良いです。 check_load = BigQueryCheckOperator( task_id="check_load", sql={YOUR_SQL}, use_legacy_sql=False, params={ "bq_suffix": Variable.get("bq_suffix"), "dataset": setting.bq.dataset, "table": setting.bq.table_name, }, location="asia-northeast1", ) SQLは検証用のクエリとなっており、BigQueryのメタデータテーブルがよく用いられます。例えば この記事 が紹介されたクエリでテーブルが空になっているかを確認できます。 SELECT total_rows FROM ${dataset_id}.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '${table_name}' ORDER BY last_modified_time DESC LIMIT 1; GCPについて簡単に説明しましたが、AWSとAzureも似たようなことはできるはずです。 皆さんのワークフロー設計にご参考になれば幸いです。

November 1, 2022 · Me

自作PythonラッパーでGoogleグループのメンバーシップをより便利に管理する

Googleグループのメンバーシップを管理する方法 Googleグループを管理するのは Workspaceのadmin SDKを利用する https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups CLI gcloudを利用する https://cloud.google.com/sdk/gcloud/reference/identity/groups/memberships Cloud Identity APIを叩く https://cloud.google.com/identity/docs/how-to/setup 3つの方法があります CLI gcloudはCloud Identity APIを叩いているので、方法2と方法3は実質同じです。 方法1との違いはなんでしょう? ドキュメント読んで実際に試したところ、大きな違いは WorkspaceはGCPで作ったサービスアカウントをメンバーとして追加できない(それはそう) gcloudあるいはCloud Identity APIだと有効期限付きでメンバー追加することが可能 Googleグループによってセンシティブな情報のアクセス制御する際に、有効期限付きでメンバー追加する機能を利用すると、セキュリティを担保できるので、使い勝手がかなり良いと思います。 シェルスクリプトを書いてGoogleグループ管理を自動化する場合であれば、ローカル環境やリモート環境(EC2やCloud Runなどを含めて)問わず、gcloudは断然便利です。下記のコマンドで一発aliceさんを1時間有効期限付きでグループgroup_name@email.comに追加できます。 gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='1h' 一方、アプリケーションなどに組み込みたい場合は、Cloud Identity APIを叩くことになります。 現時点(2022.10.1)Cloud IdentityはBigqueyなどのサービスのような公式SDKが存在しなく、APIのドキュメントも若干わかりづらいので、この度より便利にアプリケーションからGoogleグループを管理するために、Pythonラッパーを作ってみました。簡単に修正すれば他言語でも利用できる(はず)です。 Pythonラッパー 認証 Cloud Identity APIを叩くのに、言うまでもなく認証が必要です。 キーによる認証は公式のドキュメントにあるため、コピペすればOKです。 https://cloud.google.com/identity/docs/how-to/setup アプリケーションをCloud Runなどにデプロイしている場合、キーを使わずにサービスアカウントとして認証する方法もあります。 from httplib2 import Http from oauth2client.client import GoogleCredentials from googleapiclient.discovery import build def create_service(): credentials = GoogleCredentials.get_application_default() service_name = "cloudidentity" api_version = "v1" return build( service_name, api_version, http=credentials.authorize(Http()), cache_discovery=False, ) Group IDとMembership ID 実際にメンバーシップを取得するAPIを叩いてみたらわかると思います。memberships_idとgroup_idによってメンバーシップのnameが決められます。後ほどグループからメンバーを削除する際に、ここのnameが必須です。 ...

October 2, 2022 · Me

AirflowのBigQueryInsertJobOperatorのオプションについて

先日「 AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法 」についての記事を書きましたが、BigQueryInsertJobOperatorにフォーマットのオプションを追加する際に、実際の挙動は公式ドキュメントの記述と若干違うので、追記しておきます。 なぜ気づいたのか 外部テーブルを取り組む際に、閉じるカッコが見つからないというエラーが出て、 Error detected while parsing row starting at position: 4204. Error: Missing close double quote (") character. 調査してみた結果、改行記号が含まれているのが原因のようでした。stack overflowによるとallowQuotedNewlinesといったCSVのオプションを追加すると解決できるらしいです。 https://stackoverflow.com/questions/65782572/how-to-avoid-missing-close-double-quote-character-error-in-google-bigquer https://qiita.com/sawanoboly/items/410b3346518e569da581 ただし、configurationにcsvOptionsを追加しても解決されず(同じエラーが出た)、困っていました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*.csv"], "autodetect": True, "sourceFormat": "CSV", "hivePartitioningOptions": { "mode": "AUTO", "sourceUriPrefix": "gs://my-bucket/biostats/", }, "csvOptions": { "allowQuotedNewlines": True }, } }, 解決法 CLI bq loadを利用す際にどうなっているかを確認したら、どうやらcsvOptionsを明示的に書く必要がなく--allow_quoted_newlinesだけ追加すとでうまくいきました。AirflowのBigQueryInsertJobOperatorで同じことをすると、問題なくloadできました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*.csv"], "autodetect": True, "sourceFormat": "CSV", "hivePartitioningOptions": { "mode": "AUTO", "sourceUriPrefix": "gs://my-bucket/biostats/", }, # csvOptionsの記述は不要 "allowQuotedNewlines": True } },

August 30, 2022 · Me