先日の記事にてパーティション化されたCSVオブジェクトをCloudSQLにimportする方法を紹介しました。

SDKを利用して32より多いオブジェクトをまとめる場合、GCPのコミュニティのチュートリアルのコードをそのまま使っていました。(2023.8.10にアーカイブされ済み)

https://github.com/GoogleCloudPlatform/community/blob/master/archived/cloud-storage-infinite-compose/index.md

文章最後に書いてあるように

This code is offered for demonstration purposes only, and should not be considered production-ready. Applying it to your production workloads is up to you!

コードはデモ用途のため、そのまま本番環境で使うのはで推奨しないです。先日この文を見逃して自分の環境にデプロイし、平常運転1ヶ月後、パーティション化されたCSVオブジェクトの数が増えたらバグが出ました。

事象

起きていた事象は、composeによって作られた中間オブジェクトが消されず残り続け、最終的に数TBのとてつもなく大きいオブジェクトが作成されてしまいました。一つのオブジェクトは5TiBまでというGCSの上限を超えてしまうため、処理が失敗しました。

どこがバグ

問題は関数compose_and_cleanupから呼び出している関数delete_objects_concurrentにあります。オブジェクトをまとめた後、毎回中間オブジェクトを削除していますが、そのdelete処理自体が非同期処理ですべての処理が終了するのを待たずに次のblob.composeの実行が始まります。

The delete_objects_concurrent function is very simple, using fire-and-forget delete tasks in an executor. A more robust implementation might check the futures from the submitted tasks.

チュートリアルの中にちゃんと書いてあります。(もっとロバスト性のある実装は、submit済みのタスクのfuturesをチェックするとのこと)

まとめようとするオブジェクトの数が少ない場合はほとんど問題がないですが、1000オブジェクトあたりを超えるとdeleteがcompose処理より遅くなるため、next_chunkが永遠に存在しているままwhileループから脱出できない状態になります。

delete処理自体は特に制限ないようですが、数百のオブジェクトを削除する場合時間かかるとドキュメントに記載されています。

https://cloud.google.com/storage/docs/deleting-objects#delete-objects-in-bulk

解決方法

最初に試したことは、記事に書いてあるようにsubmit済みのタスクのfuturesをwait処理でチェックします。つまりすべてのdelete処理が終わるまでに、compose処理を行いません。 (python並行処理の詳細は公式ドキュメントまたは他の記事をご覧ください)

from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait

def delete_objects_concurrent(blobs, executor, client) -> None:
    """Delete Cloud Storage objects concurrently.
    Args:
        blobs (List[storage.Blob]): The objects to delete.
        executor (Executor): An executor to schedule the deletions in.
        client (storage.Client): Cloud Storage client to use.
    """
    futures = []
    for blob in blobs:
        logger.info("Deleting slice {}".format(blob.name))
        futures.append(executor.submit(blob.delete, client=client))
    wait(futures, return_when=ALL_COMPLETED)
    logger.info(f"Deleted {len(blobs)} objects")

しかし大量なオブジェクトを1つずつ削除していること自体は変わらないので、試してみたら処理が途中でtimeoutになりました。 batchを利用して削除処理をバッチにまとめる方法もありますが、ロジックをシンプルにしたいので、他の方法を考えました。

再度ソースコードを確認したらそもそもこのdelete処理が不要なのでは?と気づきました。 確かにcompose処理に生成された中間オブジェクトをその場で削除するとストレージを節約できますが、同じことをするのであればバケットのlifecycleを設定したほうが楽だと思います。

なので、delete処理をなくして、他のところも軽く修正しました。

import logging
from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait
from itertools import count
from typing import Iterable, List

import google.cloud.logging
from google.cloud import storage

client = google.cloud.logging.Client()
client.setup_logging()

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s %(message)s"
)
logger = logging.getLogger(__name__)


def generate_composition_chunks(slices: List, chunk_size: int = 31) -> Iterable[List]:
    """Given an indefinitely long list of blobs, return the list in 31-item chunks.

    Arguments:
        slices {List} -- A list of blobs, which are slices of a desired final blob.

    Returns:
        Iterable[List] -- An iteration of 31-item chunks of the input list.

    Yields:
        Iterable[List] -- A 31-item chunk of the input list.
    """
    while len(slices):
        chunk = slices[:chunk_size]
        yield chunk
        slices = slices[chunk_size:]


def generate_hex_sequence() -> Iterable[str]:
    """Generate an indefinite sequence of hexadecimal integers.

    Yields:
        Iterator[Iterable[str]]: The sequence of hex digits, as strings.
    """
    for i in count(0):
        yield hex(i)[2:]

def compose_concurrent(
    object_path: str,
    chunk: List[storage.Blob],
    client: storage.Client,
) -> storage.Blob:
    """

    Arguments:
        object_path {str} -- The path for the composed blob.
        chunk {List[storage.Blob]} -- A list of the blobs that should compose the blob.
        client {storage.Client} -- A Cloud Storage client to use.
    """
    logger.info(f"Composing {object_path}")
    blob = storage.Blob.from_string(object_path)
    blob.compose(chunk, client=client)
    return blob


def compose(
    bucket_name: str,
    prefix: str,
    object_path: str,
    blobs: List[storage.Blob],
    client: storage.Client,
) -> storage.Blob:
    """Compose an object from an indefinite number of blobs. Composition is
    performed concurrently using a tree of accumulators. Do not Cleanup for simplicity.

    Arguments:
        object_path {str} -- The path for the final composed blob.
        buckets {str} -- The bucket name.
        prefix {str} -- The prefix of the blobs.
        blobs {List[storage.Blob]} -- A list of the slices that should
            compose the blob, in order.
        client {storage.Client} -- A Cloud Storage client to use.

    Returns:
        storage.Blob -- The composed blob.
    """
    logger.info("Composing")

    intermediate_prefix = prefix
    identifier = generate_hex_sequence()
    while len(blobs) > 32:
        chunks = generate_composition_chunks(blobs)
        intermediate_prefix = f"{intermediate_prefix}/intermediate"

        with ThreadPoolExecutor(max_workers=4) as executor:
            futures = []
            for chunk in chunks:
                futures.append(
                    executor.submit(
                        compose_concurrent,
                        f"gs://{bucket_name}/{intermediate_prefix}/{next(identifier)}",
                        chunk,
                        client,
                    )
                )
            wait(futures, return_when=ALL_COMPLETED)
        blobs = list(client.list_blobs(bucket_name, prefix=intermediate_prefix))

    # Now can do final compose
    compose_concurrent(object_path, blobs, client)
    logger.info("Composition complete")


def main(request) -> tuple[str, int]:
    """Main entrypoint for the function."""
    project_id = "your_project_id"
    bucket_name = "your_bucket_name"
    prefix = "your_prefix"
    final_blob_name = 'result.csv'
    
    storage_clinet = storage.Client(project_id)
    blobs = list(storage_clinet.list_blobs(bucket_name, prefix=prefix))

    compose(
	object_path=f"gs://{bucket_name}/{prefix}/{final_blob_name}",
	bucket_name=bucket_name,
	prefix=prefix,
	blobs=blobs,
	client=storage_clinet,
    )

主な変更は

  • delete処理をなくした
  • 中間オブジェクトを/intermediate/配下に保存する。それに伴い再帰処理を少し改修した
    • オブジェクト数が1024を超えた場合(=の深さが3)、2回目に生成された中間オブジェクトは/intermediate/intermediate/配下に保存される。これによって類推する
  • ロジックThreadPoolExecutorをmainからcompose関数に移行

delete処理をなくすことによって、GCSに保存するデータ量が一時的に数倍ほど(32^2 <= オブジェクト数 < 32^3の場合は4倍)になりますが、バグを解消できました。

以上です。