生成AI時代のSRE【サポーターズCoLab】に登壇した

4月3日、 生成AI時代のSRE【サポーターズCoLab】 に登壇しました。 資料を公開します。

April 11, 2025 · Me

GKEでArgo WorkflowsにWorkflow Archiveを導入する

副業先のtech blogに記事を書きました GKEでArgo WorkflowsにWorkflow Archiveを導入する

July 9, 2024 · Me

GCS composeで32以上のオブジェクトをまとめる方法

先日の記事にて パーティション化されたCSVオブジェクトをCloudSQLにimportする方法 を紹介しました。 SDKを利用して32より多いオブジェクトをまとめる場合、GCPのコミュニティのチュートリアルのコードをそのまま使っていました。(2023.8.10にアーカイブされ済み) https://github.com/GoogleCloudPlatform/community/blob/master/archived/cloud-storage-infinite-compose/index.md 文章最後に書いてあるように This code is offered for demonstration purposes only, and should not be considered production-ready. Applying it to your production workloads is up to you! コードはデモ用途のため、そのまま本番環境で使うのはで推奨しないです。先日この文を見逃して自分の環境にデプロイし、平常運転1ヶ月後、パーティション化されたCSVオブジェクトの数が増えたらバグが出ました。 事象 起きていた事象は、composeによって作られた中間オブジェクトが消されず残り続け、最終的に数TBのとてつもなく大きいオブジェクトが作成されてしまいました。一つのオブジェクトは5TiBまでというGCSの上限を超えてしまうため、処理が失敗しました。 どこがバグ 問題は関数compose_and_cleanupから呼び出している関数delete_objects_concurrentにあります。オブジェクトをまとめた後、毎回中間オブジェクトを削除していますが、そのdelete処理自体が非同期処理ですべての処理が終了するのを待たずに次のblob.composeの実行が始まります。 The delete_objects_concurrent function is very simple, using fire-and-forget delete tasks in an executor. A more robust implementation might check the futures from the submitted tasks. チュートリアルの中にちゃんと書いてあります。(もっとロバスト性のある実装は、submit済みのタスクのfuturesをチェックするとのこと) まとめようとするオブジェクトの数が少ない場合はほとんど問題がないですが、1000オブジェクトあたりを超えるとdeleteがcompose処理より遅くなるため、next_chunkが永遠に存在しているままwhileループから脱出できない状態になります。 delete処理自体は特に制限ないようですが、数百のオブジェクトを削除する場合時間かかるとドキュメントに記載されています。 https://cloud.google.com/storage/docs/deleting-objects#delete-objects-in-bulk 解決方法 最初に試したことは、記事に書いてあるようにsubmit済みのタスクのfuturesをwait処理でチェックします。つまりすべてのdelete処理が終わるまでに、compose処理を行いません。 (python並行処理の詳細は公式ドキュメントまたは他の記事をご覧ください) from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait def delete_objects_concurrent(blobs, executor, client) -> None: """Delete Cloud Storage objects concurrently. Args: blobs (List[storage.Blob]): The objects to delete. executor (Executor): An executor to schedule the deletions in. client (storage.Client): Cloud Storage client to use. """ futures = [] for blob in blobs: logger.info("Deleting slice {}".format(blob.name)) futures.append(executor.submit(blob.delete, client=client)) wait(futures, return_when=ALL_COMPLETED) logger.info(f"Deleted {len(blobs)} objects") しかし大量なオブジェクトを1つずつ削除していること自体は変わらないので、試してみたら処理が途中でtimeoutになりました。 batch を利用して削除処理をバッチにまとめる方法もありますが、ロジックをシンプルにしたいので、他の方法を考えました。 ...

August 17, 2023 · Me

パーティション化されたCSVファイルをCloudSQLにimportする方法

問題 パーティション化されたCSVファイルをCloudSQLにimportする場面は時々あると思います。 残念ながらCloudSQLはBigQueryのようにwildcardsによるimportを対応していません。需要はあるようですが↓ https://issuetracker.google.com/issues/132058570?pli=1 ファイルごとにimportするとオーバーヘッドが毎回発生するため、速度的に実用性があまりないと思います。一方、importはオペレーションの1種なので、並列処理はできません。 https://cloud.google.com/sql/docs/troubleshooting#import-export HTTP Error 409: Operation failed because another operation was already in progress. There is already a pending operation for your instance. Only one operation is allowed at a time. Try your request after the current operation is complete. なので、ファイルを結合してimportするのはより現実的な解決策だと思います。 gsutil compose gsutil composeを利用すると、GCSにある複数のファイルを結合できます。 cliのみならず、SDK(google.cloud.storage.Blob.compose)も同じ機能が提供されています。 https://cloud.google.com/storage/docs/composing-objects#create-composite-client-libraries https://cloud.google.com/storage/docs/gsutil/commands/compose ただし、結合できるファイルは最大32個という制約があります。 There is a limit (currently 32) to the number of components that can be composed in a single operation. ...

August 16, 2023 · Me

AirflowからDataformにdata_interval_endなどのcontext変数を渡す方法

先日GCPのDataformがGAリリースされました。 せっかくなので、まずAirflowにある既存ワークフローの一部をDataformで書き換えようと思いました。 AirflowからDataformをトリッガーする ドキュメントを調べると、AirflowからDataformをトリッガーするoperatorはすでに存在しています。 https://cloud.google.com/dataform/docs/schedule-executions-composer#create_an_airflow_dag_that_schedules_workflow_invocations 簡単にまとめると DataformCreateCompilationResultOperator: sqlxをsqlにコンパイルする DataformCreateWorkflowInvocationOperator: sqlを実行する しかし、どのようにAirflowからDataformへ変数を渡すかについてはドキュメントに記載されていません。 Dataformに変数を渡す まず、Dataformの設定ファイルdataform.jsonに変数varsを追加しておきましょう。 { "defaultSchema": "dataform", "assertionSchema": "dataform_assertions", "warehouse": "bigquery", "defaultDatabase": "project-stg", "defaultLocation": "asia-northeast1", "vars": { "bq_suffix": "_stg", "execution_date": "2023-05-24" } } DataformCreateCompilationResultOperatorのソースを調べてみたところ、compilation_resultという引数があることを発見しました。 https://github.com/apache/airflow/blob/739e6b5d775412f987a3ff5fb71c51fbb7051a89/airflow/providers/google/cloud/operators/dataform.py#LL73C29-L73C46 compilation_resultの中身を確認するため、APIの詳細を調べました。 https://cloud.google.com/dataform/reference/rest/v1beta1/CodeCompilationConfig CodeCompilationConfig内にvarsという変数を指定できるようです。 { "defaultDatabase": string, "defaultSchema": string, "defaultLocation": string, "assertionSchema": string, "vars": { string: string, ... }, "databaseSuffix": string, "schemaSuffix": string, "tablePrefix": string } BigQueryのsuffixをcode_compilation_configのvarsへ渡してみたら問題なく実行できました。ちなみに、Dataform側からはdataform.projectConfig.vars.bq_suffixで変数を呼び出せます。 DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "vars": { "bq_suffix": "_stg", } }, }, ) Dataformにcontext変数を渡す 増分処理する際によくdata_interval_endなどの context変数 を利用して当日の差分だけ取り入れます。 しかし、DataformCreateCompilationResultOperatorではtemplate_fieldsが実装されていないため、直接{{ data_interval_end }}のようなjinjaテンプレートを渡すことはできません。 TaskFlow でDataformCreateCompilationResultOperatorをラッピングすれば前述の問題を解決できます。data_interval_endはcontextから取得します。ポイントとしてはDataformCreateCompilationResultOperatorを返す際にexecute()を呼び出す必要があります。 from airflow.decorators import task @task() def create_compilation_result(**context): execute_date = ( context["data_interval_end"].in_timezone("Asia/Tokyo").strftime("%Y-%m-%d") ) return DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "vars": { "execute_date": execute_date, "bq_suffix": Variable.get("bq_suffix"), } }, }, ).execute(context=context) 最終的なDAGは以下のようになります。 ...

May 24, 2023 · Me

Slack BoltとCloud Functionsでデータ基盤アクセス承認システムを開発

会社のTech Blogに記事を書きました Slack BoltとCloud Functionsでデータ基盤アクセス承認システムを開発

April 20, 2023 · Me

Cloud Composerでmax_active_tasks_per_dagのデフォルト値が機能していない問題

問題 先日Cloud Composerの環境を↓にバージョンアップしました。 Cloud Composer 2.0.32 Airflow 2.2.5 core.max_active_tasks_per_dagという一つのDAG内同時に処理できるタスクの上限を設定するパラメータがデフォルト値16のままになっているのにも関わらず、実行するタスクの上限が明らかに16を超えています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag ローカルにあるAirflow 2.2.5環境では何の異常もなく、ComposerのAirflow Configurationを確認したところ、なぜかcore.dag_concurrencyが100に設定されています。 [core] dags_folder = /home/airflow/gcs/dags plugins_folder = /home/airflow/gcs/plugins executor = CeleryExecutor dags_are_paused_at_creation = True load_examples = False donot_pickle = True dagbag_import_timeout = 300 default_task_retries = 2 killed_task_cleanup_time = 3570 parallelism = 0 non_pooled_task_slot_count = 100000 dag_concurrency = 100 .... core.dag_concurrencyの役割はcore.max_active_tasks_per_dagと同じく、一つのDAG内同時に処理できるタスクの上限を設定しています。Airflow 2.2.0からはすでにDeprecatedになったはずなのに、なぜか残り続いています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-concurrency-deprecated 試み 手動で削除しようと思ったですけど、バージョンを上げたのでCloud Composer -> AIRFLOW CONFIGURATION OVERRIDESにcore.dag_concurrencyというパラメータすら存在しませんでした。 仕方なく、GCSから設定ファイルgs://asia-northeast1-colossus-wo-xxxxxxx-bucket/airflow.cfgを直接編集してみました。しかし、gcloud composer environments storage dags importを実行すると初期化が処理が実行され、core.dag_concurrencyが再び出てきました。 解決 デフォルト値ではなく、手動でcore.max_active_tasks_per_dagを明示的に16に指定すると、実行するタスクの上限が期待通りに動作しました。 ザクッとComposerのリリースノートを確認してこのバグまだ修正されていないようです。 ...

February 8, 2023 · Me

Cloud SQLにある大量なテーブルをBigQueryに入れる話

経緯 昨年度から大規模なデータ分析基盤の構築に携わっています。最近Cloud SQLにある6つのDBの数百個のテーブルを日次洗い替えでBigQueryにあるデータ基盤に入れるタスクを取り組んでいます。 Cloud SQLとBigQuery両方ともGCPのサービスのため、federated queriesを利用すると簡単にできそうに見えますが、 https://cloud.google.com/bigquery/docs/federated-queries-intro 実際に行ってみると、以下の3つの課題を気づきました。 BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない プロダクトの進化とともにテーブル・カラムが頻繁に作成・変更されるため、BigQuery側でも対応しないといけない Cloud SQLにあるテーブルの定義をそのまま取ってきてもBigQueryではMySQLとPostgreSQLの一部の型が対応されていない https://cloud.google.com/bigquery/docs/tables#sql_1 少し苦労していましたが、幸い解決方法を見つけました。 今後躓く方もいるかもしれないので、知見を共有したいと思います。 BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない BigQueryはクエリの結果によってテーブルを作成できるので、事前にテーブルを作っておく必要がありません。 https://cloud.google.com/bigquery/docs/tables#sql_1 大量なテーブルを一々作成するのは現実的ではない課題の解決法としてはDBのメタ情報(descriptionを含めて)をそのまま生かしてテーブル作成用クエリを生成し、テーブルを作成します。 例えばPostgreSQLの場合、まずはテーブルのメタ情報 SELECT schemaname, relname AS table_name, obj_description(relid) AS description FROM pg_catalog.pg_statio_all_tables WHERE schemaname = '{YOUR_SCHEMA}' とカラムのメタ情報を取得します。 SELECT c.table_schema, c.table_name, c.column_name, c.data_type, pgd.description FROM pg_catalog.pg_statio_all_tables AS st INNER JOIN pg_catalog.pg_description pgd on ( pgd.objoid = st.relid ) RIGHT JOIN information_schema.columns c ON ( pgd.objsubid = c.ordinal_position and c.table_schema = st.schemaname and c.table_name = st.relname ) WHERE c.table_schema = '{YOUR_SCHEMA}'; 上記のメタ情報をfor文で回してテーブル作成用のクエリテンプレートの$columns $pg_schema $table_nameに入れたら、テーブル作成用のクエリが生成されます。Airflowとかで定期的にクエリを実行すれば問題解決です。 ...

February 2, 2023 · Me