dbtでBigQuery上に構築したデータ基盤のメタデータ管理

データ基盤におけるETL/ELT開発のT(Transform)を担うツール dbt は最近注目を浴びています。dbtでデータモデリングする方法既に多く紹介されたので、この記事では手を動かしながらdbtでBigQuery上に構築したデータ基盤のメタデータを管理する方法を紹介します。 環境構築 dbt公式はHomebrewを推していますが、ローカル環境が汚染されるのをなるべく避けたいので、Dockerで環境構築します。 dbtのプロジェクトとプロファイルの設定ファイルを用意しておかないと、公式のドキュメントそのまま実行したらコケます。しかし設定ファイルの生成は環境を構築する必要があるので無限ループになっています。 https://docs.getdbt.com/docs/get-started/docker-install そのため、公式のサンプルプロジェクトをforkし、事前にローカル環境で生成した設定ファイルを追加しました。 https://github.com/aibazhang/dbt-metadata-management profiles.ymlを編集 {YOUR_DATASET_NAME}と{YOUR_PROJECT_ID}を置き換えます。 複数のデータセットのメタデータも作成可能ですが、一旦任意のデータセット名を指定する必要があります。dbtの問題点でもありますが、後ほど説明します。 Dockerイメージをプル docker pull ghcr.io/dbt-labs/dbt-bigquery:1.2.0 コンテナを立ち上げる git clone https://github.com/aibazhang/dbt-metadata-management cd dbt-metadata-management gcloud認証 認証済みの場合、このステップは不要です。 gcloud auth login --no-launch-browser gcloud auth application-default login --no-launch-browser コンテナを立ち上げる docker run --rm \ --network=host \ --platform linux/amd64 \ --mount type=bind,source=`PWD`,target=/usr/app \ --mount type=bind,source=`PWD`/profiles.yml,target=/root/.dbt/profiles.yml \ --mount type=bind,source=$HOME/.config/gcloud/application_default_credentials.json,target=/root/.config/gcloud/application_default_credentials.json \ ghcr.io/dbt-labs/dbt-bigquery:1.2.0 \ ls データモデルのリストが表示されたら、環境構築が無事終了です。 ドキュメントを生成する docs generate 以下のコマンドを実行すれば、models/配下のクエリとメタデータ(yamlファイル)を参照して、target/配下にメタデータのドキュメントが生成されます。 docker run --rm \ --network=host \ --platform linux/amd64 \ --mount type=bind,source=`PWD`,target=/usr/app \ --mount type=bind,source=`PWD`/profiles.yml,target=/root/.dbt/profiles.yml \ --mount type=bind,source=$HOME/.config/gcloud/application_default_credentials.json,target=/root/.config/gcloud/application_default_credentials.json \ ghcr.io/dbt-labs/dbt-bigquery:1.2.0 \ docs generate さらにdbt docs serveを実行すると、localhostからメタデータのドキュメントを確認できます↓。schema dbt_testとdbt_test_2はBigQueryのデータセットに該当します。 ...

December 11, 2022 · Me

データ基盤におけるGoogleグループ・IAMによるアクセス制御

会社のTech Blogに記事を書きました データ基盤におけるGoogleグループ・IAMによるアクセス制御

November 18, 2022 · Me

Airflowで構築したワークフローを検証する

データ基盤の監視 データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、監視は言うまでもなく品質を担保するため重要な存在です。データ基盤監視の考え方についてこの2つの記事が紹介しています。 https://tech-blog.monotaro.com/entry/2021/08/24/100000 https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 同じくSQLによるデータ基盤を監視しており、最も大きな違いは自作ツールかAirflowで検証することだけです。本文はAirflowで構築したワークフローの検証についてもう少し紹介したいと思います。 まず、 Data Pipelines Pocket Reference ではデータ基盤検証の原則が紹介されました。 Validate Early, Validate Often 要はできるだけ早く、できるだけ頻繁に検証するとのことです。ELTあるいはETL処理においては、Extract, Load, Transformそれぞれのステップが終了した直後に監視するのは最も理想的だと思います。 Transformはデータセットのコンテキストを把握しておかないと検証できないため、データセットごとに対応していく必要があります。ExtractとLoadはnon-contextで汎用的な検証ができるため、データ基盤構築の序盤からやっておいた方が安心だと思います。 Extractの検証 ストレージサービス(例えばGCS)をデータレイクにする場合、データソースからデータレイクにデータがちゃんとレプリケートされたかを検証するためにAirflowのairflow.providers.google.cloud.sensors.gcsを利用すると簡単にできます。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/gcs/index.html 一つのファイルをチェックする場合はほとんどないと思うので、GCSObjectExistenceSensorよりGCSObjectsWithPrefixExistenceSensorの方がもっと実用的でしょう。下記のタスクをExtractと次の処理の間に挟むと、障害の早期発見が期待できます。なお、Extract時点で既に障害が起きている場合はほとんどデータソース側の処理が失敗しているので、アプリケーション側と連携して作業する必要があります。 check_extract = GCSObjectsWithPrefixExistenceSensor( task_id="check_extract", bucket="{YOUR_BUCKET}", prefix="{YOUR_PREFIX}", ) Loadの検証 ELTとETL処理、Loadするタイミングは異なりますが、検証の方法(データがデータウェアハウスあるいはデータマートにロードされたか)は同じです。よく使われているデータウェアハウスサービスBigQueryだと、Airflowのairflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperatorを利用できます。 https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/contrib/operators/bigquery_check_operator/index.html#airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator 下記のタスクをLoad処理の後ろに追加すれば良いです。 check_load = BigQueryCheckOperator( task_id="check_load", sql={YOUR_SQL}, use_legacy_sql=False, params={ "bq_suffix": Variable.get("bq_suffix"), "dataset": setting.bq.dataset, "table": setting.bq.table_name, }, location="asia-northeast1", ) SQLは検証用のクエリとなっており、BigQueryのメタデータテーブルがよく用いられます。例えば この記事 が紹介されたクエリでテーブルが空になっているかを確認できます。 SELECT total_rows FROM ${dataset_id}.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '${table_name}' ORDER BY last_modified_time DESC LIMIT 1; GCPについて簡単に説明しましたが、AWSとAzureも似たようなことはできるはずです。 皆さんのワークフロー設計にご参考になれば幸いです。

November 1, 2022 · Me

自作PythonラッパーでGoogleグループのメンバーシップをより便利に管理する

Googleグループのメンバーシップを管理する方法 Googleグループを管理するのは Workspaceのadmin SDKを利用する https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups CLI gcloudを利用する https://cloud.google.com/sdk/gcloud/reference/identity/groups/memberships Cloud Identity APIを叩く https://cloud.google.com/identity/docs/how-to/setup 3つの方法があります CLI gcloudはCloud Identity APIを叩いているので、方法2と方法3は実質同じです。 方法1との違いはなんでしょう? ドキュメント読んで実際に試したところ、大きな違いは WorkspaceはGCPで作ったサービスアカウントをメンバーとして追加できない(それはそう) gcloudあるいはCloud Identity APIだと有効期限付きでメンバー追加することが可能 Googleグループによってセンシティブな情報のアクセス制御する際に、有効期限付きでメンバー追加する機能を利用すると、セキュリティを担保できるので、使い勝手がかなり良いと思います。 シェルスクリプトを書いてGoogleグループ管理を自動化する場合であれば、ローカル環境やリモート環境(EC2やCloud Runなどを含めて)問わず、gcloudは断然便利です。下記のコマンドで一発aliceさんを1時間有効期限付きでグループgroup_name@email.comに追加できます。 gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='1h' 一方、アプリケーションなどに組み込みたい場合は、Cloud Identity APIを叩くことになります。 現時点(2022.10.1)Cloud IdentityはBigqueyなどのサービスのような公式SDKが存在しなく、APIのドキュメントも若干わかりづらいので、この度より便利にアプリケーションからGoogleグループを管理するために、Pythonラッパーを作ってみました。簡単に修正すれば他言語でも利用できる(はず)です。 Pythonラッパー 認証 Cloud Identity APIを叩くのに、言うまでもなく認証が必要です。 キーによる認証は公式のドキュメントにあるため、コピペすればOKです。 https://cloud.google.com/identity/docs/how-to/setup アプリケーションをCloud Runなどにデプロイしている場合、キーを使わずにサービスアカウントとして認証する方法もあります。 from httplib2 import Http from oauth2client.client import GoogleCredentials from googleapiclient.discovery import build def create_service(): credentials = GoogleCredentials.get_application_default() service_name = "cloudidentity" api_version = "v1" return build( service_name, api_version, http=credentials.authorize(Http()), cache_discovery=False, ) Group IDとMembership ID 実際にメンバーシップを取得するAPIを叩いてみたらわかると思います。memberships_idとgroup_idによってメンバーシップのnameが決められます。後ほどグループからメンバーを削除する際に、ここのnameが必須です。 ...

October 2, 2022 · Me

AirflowのBigQueryInsertJobOperatorのオプションについて

先日「 AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法 」についての記事を書きましたが、BigQueryInsertJobOperatorにフォーマットのオプションを追加する際に、実際の挙動は公式ドキュメントの記述と若干違うので、追記しておきます。 なぜ気づいたのか 外部テーブルを取り組む際に、閉じるカッコが見つからないというエラーが出て、 Error detected while parsing row starting at position: 4204. Error: Missing close double quote (") character. 調査してみた結果、改行記号が含まれているのが原因のようでした。stack overflowによるとallowQuotedNewlinesといったCSVのオプションを追加すると解決できるらしいです。 https://stackoverflow.com/questions/65782572/how-to-avoid-missing-close-double-quote-character-error-in-google-bigquer https://qiita.com/sawanoboly/items/410b3346518e569da581 ただし、configurationにcsvOptionsを追加しても解決されず(同じエラーが出た)、困っていました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*.csv"], "autodetect": True, "sourceFormat": "CSV", "hivePartitioningOptions": { "mode": "AUTO", "sourceUriPrefix": "gs://my-bucket/biostats/", }, "csvOptions": { "allowQuotedNewlines": True }, } }, 解決法 CLI bq loadを利用す際にどうなっているかを確認したら、どうやらcsvOptionsを明示的に書く必要がなく--allow_quoted_newlinesだけ追加すとでうまくいきました。AirflowのBigQueryInsertJobOperatorで同じことをすると、問題なくloadできました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*.csv"], "autodetect": True, "sourceFormat": "CSV", "hivePartitioningOptions": { "mode": "AUTO", "sourceUriPrefix": "gs://my-bucket/biostats/", }, # csvOptionsの記述は不要 "allowQuotedNewlines": True } },

August 30, 2022 · Me

Cloud ComposerのDAGでデータ基盤の転送パイプラインを監視

会社のTech Blogに記事を書きました Cloud ComposerのDAGでデータ基盤の転送パイプラインを監視

August 18, 2022 · Me

AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法

GCS外部パーティションテーブルをBigQueryに取り込む BigQueryでデータレイクを構築する際に、GCS、Google Drive Data、Bigtableから外部テーブルを取り組む場面は少なくないと思います。下記のような外部テーブルがパーティションで分けられている場合(Hive partitioned dataと呼ばれている)は少しややこしくなりますが、 gs://myBucket/myTable/dt=2019-10-31/lang=en/foo gs://myBucket/myTable/dt=2018-10-31/lang=fr/bar 公式ドキュメントは取り組み方をわかりやすくまとめてくれています。bqというcliでコマンド2行で解決できます。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#partition_schema_detection_modes テーブル定義を作成 bq mkdef \ --autodetect \ --source_format=CSV \ --hive_partitioning_mode=AUTO \ --hive_partitioning_source_uri_prefix=gs://your-bucket/your-table/ \ "gs://you-bucket/you-table/*.csv" > /tmp/MyTableDefFile テーブルを作成 bq mk --external_table_definition=/tmp/MyTableDefFile \ my_dataset.my_table Airflowで取り込むとどうなる? まず、簡単に背景を補足すると現在筆者が構築しているデータ基盤はCloud Composer (Airflow)でワークフローを管理しており、AirflowでGCSから外部テーブルを取り込むといった需要があります。パーティションではない普通のテーブルであれば、BigQueryCreateExternalTableOperator利用すると一発で解決できます。 しかし、パーティションのある場合はAirflowから事前用意された関数がなく(2022年7月時点)、他の方法を考えないといけないです。 GCP公式ドキュメントもConosole, bq, API, Java 4つの方法しか書いておらず、PythonのSDKはまだ実装されていないようです。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#creating_an_external_table_for_hive_partitioned_data BigQueryInsertJobOperatorを利用 いろいろ調査した結果、筆者と同じことに困っている人がいたようです。 https://github.com/apache/airflow/issues/13626 その下に Have you tried to use BigQueryInsertJobOperator? for example, see: #13598 「BigQueryInsertJobOperatorを使ってみたらどう?」というコメントがありました。 さらにリンク先のPR(なぜかクローズされた)に行くと、BigQueryInsertJobOperatorの例をまとめてもらっています。どうやら専用のSDKがない場合BigQueryInsertJobOperatorを使うのは普通らしいです。引数もやや雑で、そのままBigQueryのREST APIに叩くのと近い感じです。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/bigquery/index.html#airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator https://github.com/apache/airflow/pull/13598/files しかしながら、上記のPRは外部パーティションテーブルを取り組む例がなかったため、BigQuery APIのドキュメントを調べて、hivePartitioningOptionsというオプションを見つけました。これを追加すると、もしかしてうまくいくのではないかと思い、早速検証してみました。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions データを用意 https://people.math.sc.edu/Burkardt/datasets/csv/csv.html から適当にcsvファイルをダウロードします。 biostats1.csv "Name", "Sex", "Age", "Height (in)", "Weight (lbs)" "Alex", "M", 41, 74, 170 "Bert", "M", 42, 68, 166 "Carl", "M", 32, 70, 155 "Dave", "M", 39, 72, 167 "Elly", "F", 30, 66, 124 "Fran", "F", 33, 66, 115 "Gwen", "F", 26, 64, 121 "Hank", "M", 30, 71, 158 "Ivan", "M", 53, 72, 175 "Jake", "M", 32, 69, 143 "Kate", "F", 47, 69, 139 "Luke", "M", 34, 72, 163 "Myra", "F", 23, 62, 98 "Neil", "M", 36, 75, 160 "Omar", "M", 38, 70, 145 "Page", "F", 31, 67, 135 "Quin", "M", 29, 71, 176 "Ruth", "F", 28, 65, 131 biostats1.csvを少しいじって疑似パーティションbiostats2.csvを作成します ...

July 21, 2022 · Me

Cloud FunctionsにおけるSlack APIの3秒レスポンス問題の対処法

Slack APIはUXのため著名な3秒レスポンスルールを設けています。初期設定のようなものではなく、自ら伸ばすことはできません。(厳しい) https://api.slack.com/interactivity/slash-commands If you need to respond outside of the 3 second window provided by the request responses above, you still have plenty of options for keeping the workflow alive. Slack Appsを開発したことのある人にとって最初は少し戸惑うでしょうか。(筆者はそうでした。) 最近はCloud Functionsとslack boltで社内の承認アプリを開発していて、試行錯誤した経験を共有したいと思います。 経緯 実装する予定のアプリの最初のステップとして、ショートカットでCloud Functionsを発火させてmodalを開きます。slack boltのドキュメントを読んで、以下の実装をおこないました。 https://slack.dev/bolt-python/concepts (一部のコード) @app.shortcut("/hogehoge") def open_application_modal(ack, body: dict, client: WebClient): ack() result = process() client.views_open( trigger_id=body["trigger_id"], view={ "type": "modal", # View identifier "callback_id": "application_form_view", "title": {"type": "plain_text", "text": "APP Title"}, "submit": {"type": "plain_text", "text": "Submit"}, "blocks": create_blocks(result), }, ) # .... しかし、最初ショートカットをクリックしてmodalが出てこないが、数回試してみたら出てくるという謎な事象が起きていました。modal出てからのステップ(申請フォーマットの提出、Google APIを叩く処理)は正常に動作します。 Cloud Functionsのログを調べてみたら、expired_trigger_idが出ているので、トリガーの有効期限が切れて、3秒レスポンスルールを違反しているようです。 The server responded with: {'ok': False, 'error': 'expired_trigger_id'}) Traceback (most recent call last): File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_bolt/listener/thread_runner.py", line 65, in run returned_value = listener.run_ack_function( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_bolt/listener/custom_listener.py", line 50, in run_ack_function return self.ack_function( File "/workspace/src/app.py", line 68, in open_application_modal client.views_open( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/client.py", line 4333, in views_open return self.api_call("views.open", json=kwargs) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/base_client.py", line 160, in api_call return self._sync_send(api_url=api_url, req_args=req_args) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/base_client.py", line 197, in _sync_send return self._urllib_api_call( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/base_client.py", line 331, in _urllib_api_call return SlackResponse( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/slack_response.py", line 205, in validate raise e.SlackApiError(message=msg, response=self) slack_sdk.errors.SlackApiError: The request to the Slack API failed. (url: https://www.slack.com/api/views.open 数年前の英語記事・GCPのドキュメントを調べてみたところ、Pub/Subによって非同期処理を実装すると3秒レスポンス問題を解決できるようです。 https://dev.to/googlecloud/getting-around-api-timeouts-with-cloud-functions-and-cloud-pub-sub-47o3 ...

June 12, 2022 · Me