Airflowのon_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する

https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 この記事では新しくDAGを作成してデータの転送処理が遅れているかを監視する方法が紹介されました。監視と実行用のDAGを分離することでスッキリにはなりますが、DAGが増えることによって管理の手間が生じます(特にDAGが大量にある場合)。 https://stackoverflow.com/questions/69312630/airflow-triggering-the-on-failure-callback-when-the-dagrun-timeout-is-exceed を参考にして DAGの引数on_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する方法を試してみました。 from datetime import datetime, timedelta from airflow import DAG from airflow.models import TaskInstance from airflow.operators.bash import BashOperator def _on_dag_run_fail(context): print("***DAG failed!! do something***") print(f"The DAG failed because: {context['reason']}") print(context) default_args = { "owner": "mi_empresa" } with DAG( dag_id="failure_callback_example", start_date=datetime(2021, 9, 7), schedule_interval=None, default_args=default_args, catchup=False, on_failure_callback=_on_dag_run_fail, dagrun_timeout=timedelta(seconds=45), ) as dag: delayed = BashOperator( task_id="delayed", bash_command='echo "waiting..";sleep 60; echo "Done!!"', ) will_fail = BashOperator( task_id="will_fail", bash_command="exit 1" ) delayed >> will_fail 実行時間が45秒超えると、_on_dag_run_failが実行され失敗したDAGのコンテキスト情報がプリントされます。誰かの役に立つかもしれないので、試しながら気づいた2つのポイントを共有します...

November 17, 2022 · Me

Telegramで多言語ニュースチャットボットを作った話

先日多言語Webニュースアプリを作りました。 通勤電車で当日のニュースをチェックしながら、外国語を勉強できるので、まあまあ使いやすかったです。 実はこのWebを開発する1年ほど前に、一度Telegram botで多言語ニュースチャットボットを開発していました。今日はそれについて紹介したいと思います。 https://github.com/aibazhang/multitrue-bot 事前準備 Telegram bot Telegramは日本ではあまり人気がないですが、Telegramのアカウントを作るだけでAPIキーを取得できるので、ざくっとボット作りたい場合は使い勝手がけっこういいです。また、良さげのSDKもあります↓。そういう意味ではSlackも同じですが、Telegramは感覚的にLineに近いので手軽さで勝っていると思います。 https://github.com/python-telegram-bot/python-telegram-bot/tree/master/examples ニュースの取得 ニュースの取得方法は先日開発したWebアプリと同じくNewsAPI利用して取得しています。なぜNewsAPIを採用したかについては半年前の記事で詳細を説明したので、割愛します。 実装 https://core.telegram.org/ に参照してボットの初期設定が完了したら、実装に入ります。 フローは以下となります。 /startでニュースボットを起動 国・地域を選ぶ ニュースのジャンルを選ぶ 終了あるいは2.に戻る 全体フロー SDKが提供してくれた下記3つのhandlerを利用しています。 ConversationHandler: 先ほど設計したフローに基づいてhandlerを定義する。 CommandHandler: コマンドによって発火される。今回は「入り口 (entry point)」として使う CallbackQueryHandler: 他のhandlerの返り値によって発火される def main(): updater = Updater( token=json.load(open(KEY_PATH / "keys.json", "r"))["telegram_key"], use_context=True, ) dispatcher = updater.dispatcher country_pattern = "^us|jp|cn|tw|kr|gb$" headlines_pattern = "^us|jp|cn|tw|kr|gb business|entertainment|general|health|science|sports|technology|$" conv_handler = ConversationHandler( entry_points=[CommandHandler("start", start)], states={ "CATEGORY": [CallbackQueryHandler(select_category, pattern=country_pattern)], "HEADLINES": [CallbackQueryHandler(get_news, pattern=headlines_pattern)], "START OVER OR NOT": [ CallbackQueryHandler(start_over, pattern="^start over$"), CallbackQueryHandler(end, pattern="^end$"), ], }, fallbacks=[CommandHandler("start", start)], ) dispatcher....

November 3, 2022 · Me

GitHub Issueだけで自分のルーティングを管理し、そして草を生やす

先日シェルスクリプトで個人ナレッジマネジメントツールを作った話しを投稿して、予想以上に需要がありました。 ルーティングをGitHub Posterに生成 似たような発想でルーティング管理アプリを使わずに、GitHubのcontributionsのように自分のルーティング(例えば読書、ランニング、LeetCode、外国語の勉強)を管理できると面白くない?と思いながら、GitHub上で検索したらyihong0618さんが開発したGitHubPosterを発見しました。 https://github.com/yihong0618/GitHubPoster GitHub Isssue、Duolingo、Twitter、Kindleなど20個以上のAPIで履歴を取得し、GitHub svg poster(aka. 皆さんが大好きなGitHubの草)を生成します。 実際使ってみよう ローダーは20個以上あり、とりあえずissueで今年年始以来の読書ルーティングの草を生やしてみました。 Issueを書く issueフォーマットは↓に従う必要でがあります。 {整数} {内容} 今年は1月から、日課をこなした日に当該Issueに↓のコメント追加していました。 2 「データ指向アプリケーションデザイン」 環境構築 pip install -U 'github_poster[all]' 実行 GitHubをトークンを取得し、下記コマンドを実行するだけです。 github_poster issue --issue_number ${issue_number} --repo_name ${repo_name} --token ${github_token} また、オプション --special-color1 --special-color2 ---stand-with-ukraine などによって色を指定することも可能です。(ウクライナをサポートする配色もあるようですね) 結果 最終的に生成されたGitHub Poster(.svgファイル)はこんな感じです。単位はtimes(回数)になっているが、hours(時間)が正しいです。 確認してみると、 今年今まで336時間読書していて、週の真ん中あまり本を読んでいないと気づきました。 使った感想 余計なモバイルアプリを使わずに、ルーティング管理できるのはミニマムリスト的には最高 生成されたファイルは.svgなので、自分のサイトや他のところに取り入れるのも簡単そう 新しいローダーの開発に貢献してみたい

November 2, 2022 · Me

Airflowで構築したワークフローを検証する

データ基盤の監視 データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、監視は言うまでもなく品質を担保するため重要な存在です。データ基盤監視の考え方についてこの2つの記事が紹介しています。 https://tech-blog.monotaro.com/entry/2021/08/24/100000 https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 同じくSQLによるデータ基盤を監視しており、最も大きな違いは自作ツールかAirflowで検証することだけです。本文はAirflowで構築したワークフローの検証についてもう少し紹介したいと思います。 まず、Data Pipelines Pocket Referenceではデータ基盤検証の原則が紹介されました。 Validate Early, Validate Often 要はできるだけ早く、できるだけ頻繁に検証するとのことです。ELTあるいはETL処理においては、Extract, Load, Transformそれぞれのステップが終了した直後に監視するのは最も理想的だと思います。 Transformはデータセットのコンテキストを把握しておかないと検証できないため、データセットごとに対応していく必要があります。ExtractとLoadはnon-contextで汎用的な検証ができるため、データ基盤構築の序盤からやっておいた方が安心だと思います。 Extractの検証 ストレージサービス(例えばGCS)をデータレイクにする場合、データソースからデータレイクにデータがちゃんとレプリケートされたかを検証するためにAirflowのairflow.providers.google.cloud.sensors.gcsを利用すると簡単にできます。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/gcs/index.html 一つのファイルをチェックする場合はほとんどないと思うので、GCSObjectExistenceSensorよりGCSObjectsWithPrefixExistenceSensorの方がもっと実用的でしょう。下記のタスクをExtractと次の処理の間に挟むと、障害の早期発見が期待できます。なお、Extract時点で既に障害が起きている場合はほとんどデータソース側の処理が失敗しているので、アプリケーション側と連携して作業する必要があります。 check_extract = GCSObjectsWithPrefixExistenceSensor( task_id="check_extract", bucket="{YOUR_BUCKET}", prefix="{YOUR_PREFIX}", ) Loadの検証 ELTとETL処理、Loadするタイミングは異なりますが、検証の方法(データがデータウェアハウスあるいはデータマートにロードされたか)は同じです。よく使われているデータウェアハウスサービスBigQueryだと、Airflowのairflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperatorを利用できます。 https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/contrib/operators/bigquery_check_operator/index.html#airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator 下記のタスクをLoad処理の後ろに追加すれば良いです。 check_load = BigQueryCheckOperator( task_id="check_load", sql={YOUR_SQL}, use_legacy_sql=False, params={ "bq_suffix": Variable.get("bq_suffix"), "dataset": setting.bq.dataset, "table": setting.bq.table_name, }, location="asia-northeast1", ) SQLは検証用のクエリとなっており、BigQueryのメタデータテーブルがよく用いられます。例えばこの記事が紹介されたクエリでテーブルが空になっているかを確認できます。 SELECT total_rows FROM ${dataset_id}.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '${table_name}' ORDER BY last_modified_time DESC LIMIT 1; GCPについて簡単に説明しましたが、AWSとAzureも似たようなことはできるはずです。 皆さんのワークフロー設計にご参考になれば幸いです。

November 1, 2022 · Me

自作PythonラッパーでGoogleグループのメンバーシップをより便利に管理する

Googleグループのメンバーシップを管理する方法 Googleグループを管理するのは Workspaceのadmin SDKを利用する https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups CLI gcloudを利用する https://cloud.google.com/sdk/gcloud/reference/identity/groups/memberships Cloud Identity APIを叩く https://cloud.google.com/identity/docs/how-to/setup 3つの方法があります CLI gcloudはCloud Identity APIを叩いているので、方法2と方法3は実質同じです。 方法1との違いはなんでしょう? ドキュメント読んで実際に試したところ、大きな違いは WorkspaceはGCPで作ったサービスアカウントをメンバーとして追加できない(それはそう) gcloudあるいはCloud Identity APIだと有効期限付きでメンバー追加することが可能 Googleグループによってセンシティブな情報のアクセス制御する際に、有効期限付きでメンバー追加する機能を利用すると、セキュリティを担保できるので、使い勝手がかなり良いと思います。 シェルスクリプトを書いてGoogleグループ管理を自動化する場合であれば、ローカル環境やリモート環境(EC2やCloud Runなどを含めて)問わず、gcloudは断然便利です。下記のコマンドで一発aliceさんを1時間有効期限付きでグループgroup_name@email.comに追加できます。 gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='1h' 一方、アプリケーションなどに組み込みたい場合は、Cloud Identity APIを叩くことになります。 現時点(2022.10.1)Cloud IdentityはBigqueyなどのサービスのような公式SDKが存在しなく、APIのドキュメントも若干わかりづらいので、この度より便利にアプリケーションからGoogleグループを管理するために、Pythonラッパーを作ってみました。簡単に修正すれば他言語でも利用できる(はず)です。 Pythonラッパー 認証 Cloud Identity APIを叩くのに、言うまでもなく認証が必要です。 キーによる認証は公式のドキュメントにあるため、コピペすればOKです。 https://cloud.google.com/identity/docs/how-to/setup アプリケーションをCloud Runなどにデプロイしている場合、キーを使わずにサービスアカウントとして認証する方法もあります。 from httplib2 import Http from oauth2client.client import GoogleCredentials from googleapiclient.discovery import build def create_service(): credentials = GoogleCredentials.get_application_default() service_name = "cloudidentity" api_version = "v1" return build( service_name, api_version, http=credentials....

October 2, 2022 · Me

AirflowのBigQueryInsertJobOperatorのオプションについて

先日「AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法」についての記事を書きましたが、BigQueryInsertJobOperatorにフォーマットのオプションを追加する際に、実際の挙動は公式ドキュメントの記述と若干違うので、追記しておきます。 なぜ気づいたのか 外部テーブルを取り組む際に、閉じるカッコが見つからないというエラーが出て、 Error detected while parsing row starting at position: 4204. Error: Missing close double quote (") character. 調査してみた結果、改行記号が含まれているのが原因のようでした。stack overflowによるとallowQuotedNewlinesといったCSVのオプションを追加すると解決できるらしいです。 https://stackoverflow.com/questions/65782572/how-to-avoid-missing-close-double-quote-character-error-in-google-bigquer https://qiita.com/sawanoboly/items/410b3346518e569da581 ただし、configurationにcsvOptionsを追加しても解決されず(同じエラーが出た)、困っていました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*.csv"], "autodetect": True, "sourceFormat": "CSV", "hivePartitioningOptions": { "mode": "AUTO", "sourceUriPrefix": "gs://my-bucket/biostats/", }, "csvOptions": { "allowQuotedNewlines": True }, } }, 解決法 CLI bq loadを利用す際にどうなっているかを確認したら、どうやらcsvOptionsを明示的に書く必要がなく--allow_quoted_newlinesだけ追加すとでうまくいきました。AirflowのBigQueryInsertJobOperatorで同じことをすると、問題なくloadできました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*....

August 30, 2022 · Me

AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法

GCS外部パーティションテーブルをBigQueryに取り込む BigQueryでデータレイクを構築する際に、GCS、Google Drive Data、Bigtableから外部テーブルを取り組む場面は少なくないと思います。下記のような外部テーブルがパーティションで分けられている場合(Hive partitioned dataと呼ばれている)は少しややこしくなりますが、 gs://myBucket/myTable/dt=2019-10-31/lang=en/foo gs://myBucket/myTable/dt=2018-10-31/lang=fr/bar 公式ドキュメントは取り組み方をわかりやすくまとめてくれています。bqというcliでコマンド2行で解決できます。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#partition_schema_detection_modes テーブル定義を作成 bq mkdef \ --autodetect \ --source_format=CSV \ --hive_partitioning_mode=AUTO \ --hive_partitioning_source_uri_prefix=gs://your-bucket/your-table/ \ "gs://you-bucket/you-table/*.csv" > /tmp/MyTableDefFile テーブルを作成 bq mk --external_table_definition=/tmp/MyTableDefFile \ my_dataset.my_table Airflowで取り込むとどうなる? まず、簡単に背景を補足すると現在筆者が構築しているデータ基盤はCloud Composer (Airflow)でワークフローを管理しており、AirflowでGCSから外部テーブルを取り込むといった需要があります。パーティションではない普通のテーブルであれば、BigQueryCreateExternalTableOperator利用すると一発で解決できます。 しかし、パーティションのある場合はAirflowから事前用意された関数がなく(2022年7月時点)、他の方法を考えないといけないです。 GCP公式ドキュメントもConosole, bq, API, Java 4つの方法しか書いておらず、PythonのSDKはまだ実装されていないようです。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#creating_an_external_table_for_hive_partitioned_data BigQueryInsertJobOperatorを利用 いろいろ調査した結果、筆者と同じことに困っている人がいたようです。 https://github.com/apache/airflow/issues/13626 その下に Have you tried to use BigQueryInsertJobOperator? for example, see: #13598 「BigQueryInsertJobOperatorを使ってみたらどう?」というコメントがありました。 さらにリンク先のPR(なぜかクローズされた)に行くと、BigQueryInsertJobOperatorの例をまとめてもらっています。どうやら専用のSDKがない場合BigQueryInsertJobOperatorを使うのは普通らしいです。引数もやや雑で、そのままBigQueryのREST APIに叩くのと近い感じです。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/bigquery/index.html#airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator https://github.com/apache/airflow/pull/13598/files しかしながら、上記のPRは外部パーティションテーブルを取り組む例がなかったため、BigQuery APIのドキュメントを調べて、hivePartitioningOptionsというオプションを見つけました。これを追加すると、もしかしてうまくいくのではないかと思い、早速検証してみました。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions データを用意 https://people.math.sc.edu/Burkardt/datasets/csv/csv.html から適当にcsvファイルをダウロードします。 biostats1.csv "Name", "Sex", "Age", "Height (in)", "Weight (lbs)" "Alex", "M", 41, 74, 170 "Bert", "M", 42, 68, 166 "Carl", "M", 32, 70, 155 "Dave", "M", 39, 72, 167 "Elly", "F", 30, 66, 124 "Fran", "F", 33, 66, 115 "Gwen", "F", 26, 64, 121 "Hank", "M", 30, 71, 158 "Ivan", "M", 53, 72, 175 "Jake", "M", 32, 69, 143 "Kate", "F", 47, 69, 139 "Luke", "M", 34, 72, 163 "Myra", "F", 23, 62, 98 "Neil", "M", 36, 75, 160 "Omar", "M", 38, 70, 145 "Page", "F", 31, 67, 135 "Quin", "M", 29, 71, 176 "Ruth", "F", 28, 65, 131 biostats1....

July 21, 2022 · Me

git switch, git restore, git grep, git blameを使ってみよう

https://git-scm.com/ 毎日息を吸うようにgitを使っているけど、gitのドキュメント見たことないなと思って実際に読んでみました。あまり使われていないが、便利そうなgitコマンドを見つけたので、共有します。 git switch git restore git grep git blame git log -p <filename> git commit --amend --no-edit git switch ブランチを切り替えるコマンドです。git checkoutでできるのに、なぜわざわざgit switchを使うのかって思っていましたが、この記事を読んで意図を理解しました。 https://www.banterly.net/2021/07/31/new-in-git-switch-and-restore/ 簡単に説明するとgit checkoutは git checkout <branch_name> ブランチ・コミットを切り替える git checkout -- <file_name> ファイルの変更を廃棄する(直近のcommitに戻す) 2つの機能を持っているので、少し紛らわしいですね。数年前初めてgitを勉強した頃、けっこうハマっていた記憶があります。 version 2.23にブランチ・コミットを切り替える機能のみ持っている機能git switchがリリースされました。 # git checkout your_branchと同じ git switch your_branch ただ、新しくブランチを切る際のオプションは-bではなく、-cになっています。 # git checkout -b your_branchと同じ git switch -c new_branch git restore git restoreは、ファイルの変更を廃棄して、直近のcommitに戻すというgit checkoutの残り半分の機能を持っています。 # git checkout -- file.txtと同じ git restore -- file.txt git checkoutをやめてgit switchとgit restoreを使ったほうがわかりやすいと思います。特に初心者の方はぜひ試してみてください。...

July 10, 2022 · Me