Cloud ComposerのDAGでデータ基盤の転送パイプラインを監視

会社のTech Blogに記事を書きました Cloud ComposerのDAGでデータ基盤の転送パイプラインを監視

August 18, 2022 · Me

AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法

GCS外部パーティションテーブルをBigQueryに取り込む BigQueryでデータレイクを構築する際に、GCS、Google Drive Data、Bigtableから外部テーブルを取り組む場面は少なくないと思います。下記のような外部テーブルがパーティションで分けられている場合(Hive partitioned dataと呼ばれている)は少しややこしくなりますが、 gs://myBucket/myTable/dt=2019-10-31/lang=en/foo gs://myBucket/myTable/dt=2018-10-31/lang=fr/bar 公式ドキュメントは取り組み方をわかりやすくまとめてくれています。bqというcliでコマンド2行で解決できます。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#partition_schema_detection_modes テーブル定義を作成 bq mkdef \ --autodetect \ --source_format=CSV \ --hive_partitioning_mode=AUTO \ --hive_partitioning_source_uri_prefix=gs://your-bucket/your-table/ \ "gs://you-bucket/you-table/*.csv" > /tmp/MyTableDefFile テーブルを作成 bq mk --external_table_definition=/tmp/MyTableDefFile \ my_dataset.my_table Airflowで取り込むとどうなる? まず、簡単に背景を補足すると現在筆者が構築しているデータ基盤はCloud Composer (Airflow)でワークフローを管理しており、AirflowでGCSから外部テーブルを取り込むといった需要があります。パーティションではない普通のテーブルであれば、BigQueryCreateExternalTableOperator利用すると一発で解決できます。 しかし、パーティションのある場合はAirflowから事前用意された関数がなく(2022年7月時点)、他の方法を考えないといけないです。 GCP公式ドキュメントもConosole, bq, API, Java 4つの方法しか書いておらず、PythonのSDKはまだ実装されていないようです。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#creating_an_external_table_for_hive_partitioned_data BigQueryInsertJobOperatorを利用 いろいろ調査した結果、筆者と同じことに困っている人がいたようです。 https://github.com/apache/airflow/issues/13626 その下に Have you tried to use BigQueryInsertJobOperator? for example, see: #13598 「BigQueryInsertJobOperatorを使ってみたらどう?」というコメントがありました。 さらにリンク先のPR(なぜかクローズされた)に行くと、BigQueryInsertJobOperatorの例をまとめてもらっています。どうやら専用のSDKがない場合BigQueryInsertJobOperatorを使うのは普通らしいです。引数もやや雑で、そのままBigQueryのREST APIに叩くのと近い感じです。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/bigquery/index.html#airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator https://github.com/apache/airflow/pull/13598/files しかしながら、上記のPRは外部パーティションテーブルを取り組む例がなかったため、BigQuery APIのドキュメントを調べて、hivePartitioningOptionsというオプションを見つけました。これを追加すると、もしかしてうまくいくのではないかと思い、早速検証してみました。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions データを用意 https://people.math.sc.edu/Burkardt/datasets/csv/csv.html から適当にcsvファイルをダウロードします。 biostats1.csv "Name", "Sex", "Age", "Height (in)", "Weight (lbs)" "Alex", "M", 41, 74, 170 "Bert", "M", 42, 68, 166 "Carl", "M", 32, 70, 155 "Dave", "M", 39, 72, 167 "Elly", "F", 30, 66, 124 "Fran", "F", 33, 66, 115 "Gwen", "F", 26, 64, 121 "Hank", "M", 30, 71, 158 "Ivan", "M", 53, 72, 175 "Jake", "M", 32, 69, 143 "Kate", "F", 47, 69, 139 "Luke", "M", 34, 72, 163 "Myra", "F", 23, 62, 98 "Neil", "M", 36, 75, 160 "Omar", "M", 38, 70, 145 "Page", "F", 31, 67, 135 "Quin", "M", 29, 71, 176 "Ruth", "F", 28, 65, 131 biostats1.csvを少しいじって疑似パーティションbiostats2.csvを作成します ...

July 21, 2022 · Me

git switch, git restore, git grep, git blameを使ってみよう

https://git-scm.com/ 毎日息を吸うようにgitを使っているけど、gitのドキュメント見たことないなと思って実際に読んでみました。あまり使われていないが、便利そうなgitコマンドを見つけたので、共有します。 git switch git restore git grep git blame git log -p <filename> git commit --amend --no-edit git switch ブランチを切り替えるコマンドです。git checkoutでできるのに、なぜわざわざgit switchを使うのかって思っていましたが、この記事を読んで意図を理解しました。 https://www.banterly.net/2021/07/31/new-in-git-switch-and-restore/ 簡単に説明するとgit checkoutは git checkout <branch_name> ブランチ・コミットを切り替える git checkout -- <file_name> ファイルの変更を廃棄する(直近のcommitに戻す) 2つの機能を持っているので、少し紛らわしいですね。数年前初めてgitを勉強した頃、けっこうハマっていた記憶があります。 version 2.23にブランチ・コミットを切り替える機能のみ持っている機能git switchがリリースされました。 # git checkout your_branchと同じ git switch your_branch ただ、新しくブランチを切る際のオプションは-bではなく、-cになっています。 # git checkout -b your_branchと同じ git switch -c new_branch git restore git restoreは、ファイルの変更を廃棄して、直近のcommitに戻すというgit checkoutの残り半分の機能を持っています。 # git checkout -- file.txtと同じ git restore -- file.txt git checkoutをやめてgit switchとgit restoreを使ったほうがわかりやすいと思います。特に初心者の方はぜひ試してみてください。 ...

July 10, 2022 · Me

Cloud FunctionsにおけるSlack APIの3秒レスポンス問題の対処法

Slack APIはUXのため著名な3秒レスポンスルールを設けています。初期設定のようなものではなく、自ら伸ばすことはできません。(厳しい) https://api.slack.com/interactivity/slash-commands If you need to respond outside of the 3 second window provided by the request responses above, you still have plenty of options for keeping the workflow alive. Slack Appsを開発したことのある人にとって最初は少し戸惑うでしょうか。(筆者はそうでした。) 最近はCloud Functionsとslack boltで社内の承認アプリを開発していて、試行錯誤した経験を共有したいと思います。 経緯 実装する予定のアプリの最初のステップとして、ショートカットでCloud Functionsを発火させてmodalを開きます。slack boltのドキュメントを読んで、以下の実装をおこないました。 https://slack.dev/bolt-python/concepts (一部のコード) @app.shortcut("/hogehoge") def open_application_modal(ack, body: dict, client: WebClient): ack() result = process() client.views_open( trigger_id=body["trigger_id"], view={ "type": "modal", # View identifier "callback_id": "application_form_view", "title": {"type": "plain_text", "text": "APP Title"}, "submit": {"type": "plain_text", "text": "Submit"}, "blocks": create_blocks(result), }, ) # .... しかし、最初ショートカットをクリックしてmodalが出てこないが、数回試してみたら出てくるという謎な事象が起きていました。modal出てからのステップ(申請フォーマットの提出、Google APIを叩く処理)は正常に動作します。 Cloud Functionsのログを調べてみたら、expired_trigger_idが出ているので、トリガーの有効期限が切れて、3秒レスポンスルールを違反しているようです。 The server responded with: {'ok': False, 'error': 'expired_trigger_id'}) Traceback (most recent call last): File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_bolt/listener/thread_runner.py", line 65, in run returned_value = listener.run_ack_function( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_bolt/listener/custom_listener.py", line 50, in run_ack_function return self.ack_function( File "/workspace/src/app.py", line 68, in open_application_modal client.views_open( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/client.py", line 4333, in views_open return self.api_call("views.open", json=kwargs) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/base_client.py", line 160, in api_call return self._sync_send(api_url=api_url, req_args=req_args) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/base_client.py", line 197, in _sync_send return self._urllib_api_call( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/base_client.py", line 331, in _urllib_api_call return SlackResponse( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/slack_sdk/web/slack_response.py", line 205, in validate raise e.SlackApiError(message=msg, response=self) slack_sdk.errors.SlackApiError: The request to the Slack API failed. (url: https://www.slack.com/api/views.open 数年前の英語記事・GCPのドキュメントを調べてみたところ、Pub/Subによって非同期処理を実装すると3秒レスポンス問題を解決できるようです。 https://dev.to/googlecloud/getting-around-api-timeouts-with-cloud-functions-and-cloud-pub-sub-47o3 ...

June 12, 2022 · Me

Cloud Functions 2nd Genにおけるsecret keyの管理

GCPのFaaS(Functions-as-a-Service)Cloud Functionsは最近第二世代がリリースされ、(まだbeta期間中ですが)インフラ周りがけっこう強化されました。Cloud StorageとBigQueryにある大規模データを処理するために、HTTPトリガーのアップタイムが60分までに、イベントトリガーも10分まで伸びました。 https://cloud.google.com/functions/docs/2nd-gen/overview 筆者はこの数週間ちょうどCloud Functionsを利用して承認システムを開発しています。Cloud Functions 2nd Genにおけるsecret keyの管理について少し困っていたので、調べたものを共有します。 何が問題か secret keyなどを明文でCloud Functionsの環境変数に保存するは流石に危ないので、他の方法が必要です。 we recommend that you review the best practices for secret management. Note that there is no Cloud Functions-specific integration with Cloud KMS. Cloud Functions公式ドキュメントによると、secret keyを管理するベストプラクティスはSecret Managementを利用することです。また、Cloud Functions特有のCloud KMSは存在しません。 https://cloud.google.com/functions/docs/configuring/env-var#managing_secrets 1st genの場合はデプロイする際に-set-secretsというオプションでSecret Management上保存しているキーを追加すれば良いですが、2nd genで同じようなことをやると、-set-secretsはまだ実装されていない(2022年5月)ので、怒られます。 ERROR: (gcloud.beta.functions.deploy) --set-secrets is not yet supported in Cloud Functions V2. Cloud KMSを使う Secret Managementはまだ存在しない時代(と言っても3年前)に書かれた記事を読んで、参考になりました。 考え方としては、 Cloud KMSで認証情報(文字列)を暗号化する 暗号化した認証情報をCloud Functionsの環境変数に登録する Cloud Functionsの関数内で暗号化された環境変数を復号化する https://www.apps-gcp.com/kms-with-functions/ ...

June 1, 2022 · Me

TerraformでCloud Storageのバケット名を変更する際の罠

最近よくデータ基盤構築の仕事をしているため、TerraformでGCPをいじったりしています。今回はCloud Storageのバケット名を変更する際に罠にはまりやすいところを紹介します。もちろん、Cloud Storageだけでなく、他のサービスやAWSなどにも活用できそうな箇所があるではないかと思います。 環境 Terraform v1.1.2 on darwin_arm64 provider registry.terraform.io/hashicorp/google v4.15.0 他の環境は正しく動作するかは未検証です。 経緯 下記のバケットを作って検証環境に反映した後、PRを出しました。 resource "google_storage_bucket" "my_bucket" { name = "my-bucket" storage_class = var.gcs_storage_class.coldline project = var.project_id location = var.gcs_location force_destroy = false uniform_bucket_level_access = true retention_policy { is_locked = true retention_period = 30 } lifecycle_rule { condition { age = 30 } action { type = "Delete" } } } チームメンバーのレビュー受けて、バケット名はmy-bucketよりmy-bucket-hogeの方が良いと言われたので、my-bucketをmy-bucket-hoge変更しterraform applyしたらエラーになりました。もちろんのことですが、最初はforce_destroy = falseに設定していて、バケット内オブジェクトが入っているので、強制変更はできません。 force_destroy - (Optional, Default: false) When deleting a bucket, this boolean option will delete all contained objects. If you try to delete a bucket that contains objects, Terraform will fail that run. 解決方法を二つ見つけました。 ...

May 27, 2022 · Me

データ基盤におけるGitHub Actionsを使ったTerraformとCloud ComposerのCI/CD

会社のTech Blogに記事を書きました データ基盤におけるGitHub Actionsを使ったTerraformとCloud ComposerのCI/CD

May 16, 2022 · Me

ワードクラウドを自作ニュースアプリに追加してみた

はじめに この間Node.jsで 多言語Webニュースアプリ 作ってみました https://www.multitrue.news ニュースのタイトルと概要だけではつまらないので、単語の出現頻度によって直近一週間のニュースからワードクラウド作って、一目で世の中の出来事を確認できたら面白そうじゃないかと思いながら、ワードクラウドをニュースアプリに追加してみました。 詳細 ソースはこちらから確認できます。 https://github.com/aibazhang/multitrue 下準備 日本語と中国語などは英語と異なり、単語と単語の間スペースがないので、形態素解析が必要です。簡略化するために、今回は英語のワードクラウドのみを作ることにしました。人称代名詞や助動詞のようなStop Wordsをワードクラウドに出しても意味がないので、NLTKの英語Stop Words辞書を利用します。 https://gist.github.com/sebleier/554280 また、単語の出現頻度を集計するヘルプ関数を作成します。いい感じライブラリもありますが、できるだけdependencyを減らしたいので、自分で実装することにしました。ただ、全ての単語を最終的に大文字に変換します。 src/utils/countWordsFrequency.js const stopwords = require('./stopwords-en.json'); // English stopwords via https://gist.github.com/sebleier/554280 const countWordsFrequency = (sentences) => { const result = {}; // remove punctuation and split by space const terms = sentences.toLowerCase().match(/[a-zA-Z]+/g); terms.forEach((e) => { if (!stopwords.stopwords.includes(e)) { const name = e.toUpperCase(); if (result[name]) { result[name] += 1; } else { result[name] = 1; } } }); return Object.entries(result) .map(([key, value]) => ({ name: key, value, })) .sort((a, b) => b.value - a.value); }; module.exports = countWordsFrequency; ニュース記事の単語出現頻度を計算 続いて、英語ニュースを収集するcontrollerに単語頻度を計算するロジックを組み込みます。今回利用しているNewsAPIはニュースの本文を取得できないため、タイトルと概要から単語出現頻度を計算し、ニュース・メタ情報と一緒にレスポンスに追加します。(収集期間のview-config.jsonから日単位で設定できます)今考えるとやはり単語出現頻度の計算とニュースの取得を分けたほうが良いかもしれません。 src/controllers/viewsController.js exports.getHeadlinesUS = catchAsync(async (req, res) => { const news = await News.find({ category: 'general', country: 'us' }) .sort('-publishedAt') .limit(viewConfig.limit); const articlesTitleDesc = await News.find({ category: 'general', country: 'us', publishedAt: { $gt: Date.now() - viewConfig.wordscloud.dateRangeDay * 24 * 60 * 60 * 1000, }, }).select('title description'); const wordsFrequency = calcWordFrequncyInArticles(articlesTitleDesc); res.status(200).render('index', { countryMeta: { flag: '🇺🇸', title: 'Top Stories', code: 'us', }, news, wordsFrequency, }); }); 計算結果をフロントに渡す echarts-wordcloudというライブラリでワードクラウドを描きます。 https://github.com/ecomfe/echarts-wordcloud ...

May 14, 2022 · Me