Airflowで構築したワークフローを検証する

データ基盤の監視 データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、監視は言うまでもなく品質を担保するため重要な存在です。データ基盤監視の考え方についてこの2つの記事が紹介しています。 https://tech-blog.monotaro.com/entry/2021/08/24/100000 https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 同じくSQLによるデータ基盤を監視しており、最も大きな違いは自作ツールかAirflowで検証することだけです。本文はAirflowで構築したワークフローの検証についてもう少し紹介したいと思います。 まず、Data Pipelines Pocket Referenceではデータ基盤検証の原則が紹介されました。 Validate Early, Validate Often 要はできるだけ早く、できるだけ頻繁に検証するとのことです。ELTあるいはETL処理においては、Extract, Load, Transformそれぞれのステップが終了した直後に監視するのは最も理想的だと思います。 Transformはデータセットのコンテキストを把握しておかないと検証できないため、データセットごとに対応していく必要があります。ExtractとLoadはnon-contextで汎用的な検証ができるため、データ基盤構築の序盤からやっておいた方が安心だと思います。 Extractの検証 ストレージサービス(例えばGCS)をデータレイクにする場合、データソースからデータレイクにデータがちゃんとレプリケートされたかを検証するためにAirflowのairflow.providers.google.cloud.sensors.gcsを利用すると簡単にできます。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/gcs/index.html 一つのファイルをチェックする場合はほとんどないと思うので、GCSObjectExistenceSensorよりGCSObjectsWithPrefixExistenceSensorの方がもっと実用的でしょう。下記のタスクをExtractと次の処理の間に挟むと、障害の早期発見が期待できます。なお、Extract時点で既に障害が起きている場合はほとんどデータソース側の処理が失敗しているので、アプリケーション側と連携して作業する必要があります。 check_extract = GCSObjectsWithPrefixExistenceSensor( task_id="check_extract", bucket="{YOUR_BUCKET}", prefix="{YOUR_PREFIX}", ) Loadの検証 ELTとETL処理、Loadするタイミングは異なりますが、検証の方法(データがデータウェアハウスあるいはデータマートにロードされたか)は同じです。よく使われているデータウェアハウスサービスBigQueryだと、Airflowのairflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperatorを利用できます。 https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/contrib/operators/bigquery_check_operator/index.html#airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator 下記のタスクをLoad処理の後ろに追加すれば良いです。 check_load = BigQueryCheckOperator( task_id="check_load", sql={YOUR_SQL}, use_legacy_sql=False, params={ "bq_suffix": Variable.get("bq_suffix"), "dataset": setting.bq.dataset, "table": setting.bq.table_name, }, location="asia-northeast1", ) SQLは検証用のクエリとなっており、BigQueryのメタデータテーブルがよく用いられます。例えばこの記事が紹介されたクエリでテーブルが空になっているかを確認できます。 SELECT total_rows FROM ${dataset_id}.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '${table_name}' ORDER BY last_modified_time DESC LIMIT 1; GCPについて簡単に説明しましたが、AWSとAzureも似たようなことはできるはずです。 皆さんのワークフロー設計にご参考になれば幸いです。

November 1, 2022 · Me

自作PythonラッパーでGoogleグループのメンバーシップをより便利に管理する

Googleグループのメンバーシップを管理する方法 Googleグループを管理するのは Workspaceのadmin SDKを利用する https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups CLI gcloudを利用する https://cloud.google.com/sdk/gcloud/reference/identity/groups/memberships Cloud Identity APIを叩く https://cloud.google.com/identity/docs/how-to/setup 3つの方法があります CLI gcloudはCloud Identity APIを叩いているので、方法2と方法3は実質同じです。 方法1との違いはなんでしょう? ドキュメント読んで実際に試したところ、大きな違いは WorkspaceはGCPで作ったサービスアカウントをメンバーとして追加できない(それはそう) gcloudあるいはCloud Identity APIだと有効期限付きでメンバー追加することが可能 Googleグループによってセンシティブな情報のアクセス制御する際に、有効期限付きでメンバー追加する機能を利用すると、セキュリティを担保できるので、使い勝手がかなり良いと思います。 シェルスクリプトを書いてGoogleグループ管理を自動化する場合であれば、ローカル環境やリモート環境(EC2やCloud Runなどを含めて)問わず、gcloudは断然便利です。下記のコマンドで一発aliceさんを1時間有効期限付きでグループgroup_name@email.comに追加できます。 gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='1h' 一方、アプリケーションなどに組み込みたい場合は、Cloud Identity APIを叩くことになります。 現時点(2022.10.1)Cloud IdentityはBigqueyなどのサービスのような公式SDKが存在しなく、APIのドキュメントも若干わかりづらいので、この度より便利にアプリケーションからGoogleグループを管理するために、Pythonラッパーを作ってみました。簡単に修正すれば他言語でも利用できる(はず)です。 Pythonラッパー 認証 Cloud Identity APIを叩くのに、言うまでもなく認証が必要です。 キーによる認証は公式のドキュメントにあるため、コピペすればOKです。 https://cloud.google.com/identity/docs/how-to/setup アプリケーションをCloud Runなどにデプロイしている場合、キーを使わずにサービスアカウントとして認証する方法もあります。 from httplib2 import Http from oauth2client.client import GoogleCredentials from googleapiclient.discovery import build def create_service(): credentials = GoogleCredentials.get_application_default() service_name = "cloudidentity" api_version = "v1" return build( service_name, api_version, http=credentials....

October 2, 2022 · Me

AirflowのBigQueryInsertJobOperatorのオプションについて

先日「AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法」についての記事を書きましたが、BigQueryInsertJobOperatorにフォーマットのオプションを追加する際に、実際の挙動は公式ドキュメントの記述と若干違うので、追記しておきます。 なぜ気づいたのか 外部テーブルを取り組む際に、閉じるカッコが見つからないというエラーが出て、 Error detected while parsing row starting at position: 4204. Error: Missing close double quote (") character. 調査してみた結果、改行記号が含まれているのが原因のようでした。stack overflowによるとallowQuotedNewlinesといったCSVのオプションを追加すると解決できるらしいです。 https://stackoverflow.com/questions/65782572/how-to-avoid-missing-close-double-quote-character-error-in-google-bigquer https://qiita.com/sawanoboly/items/410b3346518e569da581 ただし、configurationにcsvOptionsを追加しても解決されず(同じエラーが出た)、困っていました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*.csv"], "autodetect": True, "sourceFormat": "CSV", "hivePartitioningOptions": { "mode": "AUTO", "sourceUriPrefix": "gs://my-bucket/biostats/", }, "csvOptions": { "allowQuotedNewlines": True }, } }, 解決法 CLI bq loadを利用す際にどうなっているかを確認したら、どうやらcsvOptionsを明示的に書く必要がなく--allow_quoted_newlinesだけ追加すとでうまくいきました。AirflowのBigQueryInsertJobOperatorで同じことをすると、問題なくloadできました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*....

August 30, 2022 · Me

AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法

GCS外部パーティションテーブルをBigQueryに取り込む BigQueryでデータレイクを構築する際に、GCS、Google Drive Data、Bigtableから外部テーブルを取り組む場面は少なくないと思います。下記のような外部テーブルがパーティションで分けられている場合(Hive partitioned dataと呼ばれている)は少しややこしくなりますが、 gs://myBucket/myTable/dt=2019-10-31/lang=en/foo gs://myBucket/myTable/dt=2018-10-31/lang=fr/bar 公式ドキュメントは取り組み方をわかりやすくまとめてくれています。bqというcliでコマンド2行で解決できます。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#partition_schema_detection_modes テーブル定義を作成 bq mkdef \ --autodetect \ --source_format=CSV \ --hive_partitioning_mode=AUTO \ --hive_partitioning_source_uri_prefix=gs://your-bucket/your-table/ \ "gs://you-bucket/you-table/*.csv" > /tmp/MyTableDefFile テーブルを作成 bq mk --external_table_definition=/tmp/MyTableDefFile \ my_dataset.my_table Airflowで取り込むとどうなる? まず、簡単に背景を補足すると現在筆者が構築しているデータ基盤はCloud Composer (Airflow)でワークフローを管理しており、AirflowでGCSから外部テーブルを取り込むといった需要があります。パーティションではない普通のテーブルであれば、BigQueryCreateExternalTableOperator利用すると一発で解決できます。 しかし、パーティションのある場合はAirflowから事前用意された関数がなく(2022年7月時点)、他の方法を考えないといけないです。 GCP公式ドキュメントもConosole, bq, API, Java 4つの方法しか書いておらず、PythonのSDKはまだ実装されていないようです。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#creating_an_external_table_for_hive_partitioned_data BigQueryInsertJobOperatorを利用 いろいろ調査した結果、筆者と同じことに困っている人がいたようです。 https://github.com/apache/airflow/issues/13626 その下に Have you tried to use BigQueryInsertJobOperator? for example, see: #13598 「BigQueryInsertJobOperatorを使ってみたらどう?」というコメントがありました。 さらにリンク先のPR(なぜかクローズされた)に行くと、BigQueryInsertJobOperatorの例をまとめてもらっています。どうやら専用のSDKがない場合BigQueryInsertJobOperatorを使うのは普通らしいです。引数もやや雑で、そのままBigQueryのREST APIに叩くのと近い感じです。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/bigquery/index.html#airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator https://github.com/apache/airflow/pull/13598/files しかしながら、上記のPRは外部パーティションテーブルを取り組む例がなかったため、BigQuery APIのドキュメントを調べて、hivePartitioningOptionsというオプションを見つけました。これを追加すると、もしかしてうまくいくのではないかと思い、早速検証してみました。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions データを用意 https://people.math.sc.edu/Burkardt/datasets/csv/csv.html から適当にcsvファイルをダウロードします。 biostats1.csv "Name", "Sex", "Age", "Height (in)", "Weight (lbs)" "Alex", "M", 41, 74, 170 "Bert", "M", 42, 68, 166 "Carl", "M", 32, 70, 155 "Dave", "M", 39, 72, 167 "Elly", "F", 30, 66, 124 "Fran", "F", 33, 66, 115 "Gwen", "F", 26, 64, 121 "Hank", "M", 30, 71, 158 "Ivan", "M", 53, 72, 175 "Jake", "M", 32, 69, 143 "Kate", "F", 47, 69, 139 "Luke", "M", 34, 72, 163 "Myra", "F", 23, 62, 98 "Neil", "M", 36, 75, 160 "Omar", "M", 38, 70, 145 "Page", "F", 31, 67, 135 "Quin", "M", 29, 71, 176 "Ruth", "F", 28, 65, 131 biostats1....

July 21, 2022 · Me

git switch, git restore, git grep, git blameを使ってみよう

https://git-scm.com/ 毎日息を吸うようにgitを使っているけど、gitのドキュメント見たことないなと思って実際に読んでみました。あまり使われていないが、便利そうなgitコマンドを見つけたので、共有します。 git switch git restore git grep git blame git log -p <filename> git commit --amend --no-edit git switch ブランチを切り替えるコマンドです。git checkoutでできるのに、なぜわざわざgit switchを使うのかって思っていましたが、この記事を読んで意図を理解しました。 https://www.banterly.net/2021/07/31/new-in-git-switch-and-restore/ 簡単に説明するとgit checkoutは git checkout <branch_name> ブランチ・コミットを切り替える git checkout -- <file_name> ファイルの変更を廃棄する(直近のcommitに戻す) 2つの機能を持っているので、少し紛らわしいですね。数年前初めてgitを勉強した頃、けっこうハマっていた記憶があります。 version 2.23にブランチ・コミットを切り替える機能のみ持っている機能git switchがリリースされました。 # git checkout your_branchと同じ git switch your_branch ただ、新しくブランチを切る際のオプションは-bではなく、-cになっています。 # git checkout -b your_branchと同じ git switch -c new_branch git restore git restoreは、ファイルの変更を廃棄して、直近のcommitに戻すというgit checkoutの残り半分の機能を持っています。 # git checkout -- file.txtと同じ git restore -- file.txt git checkoutをやめてgit switchとgit restoreを使ったほうがわかりやすいと思います。特に初心者の方はぜひ試してみてください。...

July 10, 2022 · Me

Cloud FunctionsにおけるSlack APIの3秒レスポンス問題の対処法

Slack APIはUXのため著名な3秒レスポンスルールを設けています。初期設定のようなものではなく、自ら伸ばすことはできません。(厳しい) https://api.slack.com/interactivity/slash-commands If you need to respond outside of the 3 second window provided by the request responses above, you still have plenty of options for keeping the workflow alive. Slack Appsを開発したことのある人にとって最初は少し戸惑うでしょうか。(筆者はそうでした。) 最近はCloud Functionsとslack boltで社内の承認アプリを開発していて、試行錯誤した経験を共有したいと思います。 経緯 実装する予定のアプリの最初のステップとして、ショートカットでCloud Functionsを発火させてmodalを開きます。slack boltのドキュメントを読んで、以下の実装をおこないました。 https://slack.dev/bolt-python/concepts (一部のコード) @app.shortcut("/hogehoge") def open_application_modal(ack, body: dict, client: WebClient): ack() result = process() client.views_open( trigger_id=body["trigger_id"], view={ "type": "modal", # View identifier "callback_id": "application_form_view", "title": {"type": "plain_text", "text": "APP Title"}, "submit": {"type": "plain_text", "text": "Submit"}, "blocks": create_blocks(result), }, ) # ....

June 12, 2022 · Me

Cloud Functions 2nd Genにおけるsecret keyの管理

GCPのFaaS(Functions-as-a-Service)Cloud Functionsは最近第二世代がリリースされ、(まだbeta期間中ですが)インフラ周りがけっこう強化されました。Cloud StorageとBigQueryにある大規模データを処理するために、HTTPトリガーのアップタイムが60分までに、イベントトリガーも10分まで伸びました。 https://cloud.google.com/functions/docs/2nd-gen/overview 筆者はこの数週間ちょうどCloud Functionsを利用して承認システムを開発しています。Cloud Functions 2nd Genにおけるsecret keyの管理について少し困っていたので、調べたものを共有します。 何が問題か secret keyなどを明文でCloud Functionsの環境変数に保存するは流石に危ないので、他の方法が必要です。 we recommend that you review the best practices for secret management. Note that there is no Cloud Functions-specific integration with Cloud KMS. Cloud Functions公式ドキュメントによると、secret keyを管理するベストプラクティスはSecret Managementを利用することです。また、Cloud Functions特有のCloud KMSは存在しません。 https://cloud.google.com/functions/docs/configuring/env-var#managing_secrets 1st genの場合はデプロイする際に-set-secretsというオプションでSecret Management上保存しているキーを追加すれば良いですが、2nd genで同じようなことをやると、-set-secretsはまだ実装されていない(2022年5月)ので、怒られます。 ERROR: (gcloud.beta.functions.deploy) --set-secrets is not yet supported in Cloud Functions V2. Cloud KMSを使う Secret Managementはまだ存在しない時代(と言っても3年前)に書かれた記事を読んで、参考になりました。 考え方としては、 Cloud KMSで認証情報(文字列)を暗号化する 暗号化した認証情報をCloud Functionsの環境変数に登録する Cloud Functionsの関数内で暗号化された環境変数を復号化する https://www.apps-gcp.com/kms-with-functions/...

June 1, 2022 · Me

TerraformでCloud Storageのバケット名を変更する際の罠

最近よくデータ基盤構築の仕事をしているため、TerraformでGCPをいじったりしています。今回はCloud Storageのバケット名を変更する際に罠にはまりやすいところを紹介します。もちろん、Cloud Storageだけでなく、他のサービスやAWSなどにも活用できそうな箇所があるではないかと思います。 環境 Terraform v1.1.2 on darwin_arm64 provider registry.terraform.io/hashicorp/google v4.15.0 他の環境は正しく動作するかは未検証です。 経緯 下記のバケットを作って検証環境に反映した後、PRを出しました。 resource "google_storage_bucket" "my_bucket" { name = "my-bucket" storage_class = var.gcs_storage_class.coldline project = var.project_id location = var.gcs_location force_destroy = false uniform_bucket_level_access = true retention_policy { is_locked = true retention_period = 30 } lifecycle_rule { condition { age = 30 } action { type = "Delete" } } } チームメンバーのレビュー受けて、バケット名はmy-bucketよりmy-bucket-hogeの方が良いと言われたので、my-bucketをmy-bucket-hoge変更しterraform applyしたらエラーになりました。もちろんのことですが、最初はforce_destroy = falseに設定していて、バケット内オブジェクトが入っているので、強制変更はできません。 force_destroy - (Optional, Default: false) When deleting a bucket, this boolean option will delete all contained objects....

May 27, 2022 · Me

ワードクラウドを自作ニュースアプリに追加してみた

はじめに この間Node.jsで多言語Webニュースアプリ作ってみました https://www.multitrue.news ニュースのタイトルと概要だけではつまらないので、単語の出現頻度によって直近一週間のニュースからワードクラウド作って、一目で世の中の出来事を確認できたら面白そうじゃないかと思いながら、ワードクラウドをニュースアプリに追加してみました。 詳細 ソースはこちらから確認できます。 https://github.com/aibazhang/multitrue 下準備 日本語と中国語などは英語と異なり、単語と単語の間スペースがないので、形態素解析が必要です。簡略化するために、今回は英語のワードクラウドのみを作ることにしました。人称代名詞や助動詞のようなStop Wordsをワードクラウドに出しても意味がないので、NLTKの英語Stop Words辞書を利用します。 https://gist.github.com/sebleier/554280 また、単語の出現頻度を集計するヘルプ関数を作成します。いい感じライブラリもありますが、できるだけdependencyを減らしたいので、自分で実装することにしました。ただ、全ての単語を最終的に大文字に変換します。 src/utils/countWordsFrequency.js const stopwords = require('./stopwords-en.json'); // English stopwords via https://gist.github.com/sebleier/554280 const countWordsFrequency = (sentences) => { const result = {}; // remove punctuation and split by space const terms = sentences.toLowerCase().match(/[a-zA-Z]+/g); terms.forEach((e) => { if (!stopwords.stopwords.includes(e)) { const name = e.toUpperCase(); if (result[name]) { result[name] += 1; } else { result[name] = 1; } } }); return Object....

May 14, 2022 · Me

Herokuにデプロイする際のTips

2022年11月28日をもってHerokuフリープランが終了しました https://blog.heroku.com/new-low-cost-plans 個人開発する際にコスト面と手軽さを考えるとAWSにデプロイするのはあまりふさわしくないでしょう。Herokuだともっと手軽くデプロイできるし無料枠もあるので、個人で開発したおもちゃをデプロイするのはちょうどいい感じだと思います。この間筆者が開発した多言語WebニュースアプリもHerokuにデプロイしています。 本記事では、Herokuにデプロイする際のTipsをいくつかを紹介したいと思います。少しでもお役に立てれば幸いです。 Tip 1: GitHub Actionsで自動デプロイ https://devcenter.heroku.com/articles/git コマンドラインでいちいち打つのは面倒くさいので、自動デプロイしたほうが断然便利ですね。もちろんHerokuのGUIからも自動デプロイできますが、せっかくソースコードもGitHubで管理しているので、GitHub Actions を使ったほう便利だと思います。 heroku-deployというアクションを利用すると、Herokuのメールアドレスとheroku_api_keyさえわかれば、GitHub Actionsから自動デプロイできます。ちなみに、heroku_api_keyはこちらから取得できます。 GitHub Actionsの設定についてですが、他の記事を見たかぎりon: [push, pull_request]に設定している方もいました。本番にデプロイするという意味合いもあるので、個人的にはrelease(リリース作成するたびに、パイプラインが発火される)に設定したほうがいいではないかと思います。ミスってpushしちゃったのも防げます。 on: release: types: published name: Deploy to PROD jobs: build: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - uses: akhileshns/heroku-deploy@v3.12.12 with: heroku_api_key: ${{secrets.HEROKU_API_KEY}} heroku_app_name: 'your_app_name' heroku_email: ${{secrets.HEROKU_EMAIL}} Tip 2: 無料枠を使いこなす 現時点(2022年5月)Herokuのプランはこうなっています。 https://devcenter.heroku.com/articles/free-dyno-hours Personal accounts are given a base of 550 free dyno hours each month. In addition to these base hours, accounts which verify with a credit card will receive an additional 450 hours added to the monthly free dyno quota....

May 4, 2022 · Me