問題

パーティション化されたCSVファイルをCloudSQLにimportする場面は時々あると思います。 残念ながらCloudSQLはBigQueryのようにwildcardsによるimportを対応していません。需要はあるようですが↓

https://issuetracker.google.com/issues/132058570?pli=1

ファイルごとにimportするとオーバーヘッドが毎回発生するため、速度的に実用性があまりないと思います。一方、importはオペレーションの1種なので、並列処理はできません。

https://cloud.google.com/sql/docs/troubleshooting#import-export

HTTP Error 409: Operation failed because another operation was already in progress. There is already a pending operation for your instance. Only one operation is allowed at a time. Try your request after the current operation is complete.

なので、ファイルを結合してimportするのはより現実的な解決策だと思います。

gsutil compose

gsutil composeを利用すると、GCSにある複数のファイルを結合できます。 cliのみならず、SDK(google.cloud.storage.Blob.compose)も同じ機能が提供されています。

https://cloud.google.com/storage/docs/composing-objects#create-composite-client-libraries

https://cloud.google.com/storage/docs/gsutil/commands/compose

ただし、結合できるファイルは最大32個という制約があります。

There is a limit (currently 32) to the number of components that can be composed in a single operation.

32より多いファイルを結合したい

32より多いファイルを結合したい場合どうすれば良いでしょうか。 cliからだと、この方法で簡単にできます。 https://nakano-tomofumi.hatenablog.com/entry/2021/02/16/190626

バッチ処理あるいはアプリケーションにファイルを結合するロジックを追加したい場合は、SDKを利用するのは普通でしょう。GCPのコミュニティではすでに良さそうなチュートリアルが投稿されています。

https://cloud.google.com/community/tutorials/cloud-storage-infinite-compose

Accumulator PatternとAccumulator Tree二つの方法が紹介されており、ファイルが多い場合の実行効率を考えるとAccumulator Treeがより実用的に見えます。 しかし、手法についてチュートリアルでは詳しく説明してもらっていますが、フルなソースがなかったたま、そのままでは使えません。

検証を含めて自分なりにソースコードをまとめてみました。 認証を正常に行ったあと、main関数内の変数project_id, bucket_name, prefixを編集すれば、そのまま実行できます。下記の3つのファイルを結合する場合、prefixaaa/bbb/hogeになります。

aaa/bbb/hoge001.csv
aaa/bbb/hoge002.csv
aaa/bbb/hoge003.csv

requirements.txt

google-cloud-logging==2.7.1
google-cloud-storage==2.9.0

まとめるオブジェクトが多い場合、チュートリアル中のコードはdelete処理のtimeoutによるバグが発生します。 対応方法は こちらの記事 にご参照ください

import logging
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from itertools import count
from time import sleep
from typing import Any, Iterable, List

import google.cloud.logging
from google.cloud import storage

client = google.cloud.logging.Client()
client.setup_logging()

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s %(message)s"
)
logger = logging.getLogger(__name__)


def generate_composition_chunks(slices: List, chunk_size: int = 31) -> Iterable[List]:
    """Given an indefinitely long list of blobs, return the list in 31-item chunks.

    Arguments:
        slices {List} -- A list of blobs, which are slices of a desired final blob.

    Returns:
        Iterable[List] -- An iteration of 31-item chunks of the input list.

    Yields:
        Iterable[List] -- A 31-item chunk of the input list.
    """
    while len(slices):
        chunk = slices[:chunk_size]
        yield chunk
        slices = slices[chunk_size:]


def generate_hex_sequence() -> Iterable[str]:
    """Generate an indefinite sequence of hexadecimal integers.

    Yields:
        Iterator[Iterable[str]]: The sequence of hex digits, as strings.
    """
    for i in count(0):
        yield hex(i)[2:]


def delete_objects_concurrent(blobs, executor, client) -> None:
    """Delete Cloud Storage objects concurrently.

    Args:
        blobs (List[storage.Blob]): The objects to delete.
        executor (Executor): An executor to schedule the deletions in.
        client (storage.Client): Cloud Storage client to use.
    """
    for blob in blobs:
        logger.info("Deleting slice {}".format(blob.name))
        executor.submit(blob.delete, client=client)
        sleep(0.005)  # quick and dirty ramp-up (Sorry, Dijkstra.)


def ensure_results(maybe_futures: List[Any]) -> List[Any]:
    """Pass in a list that may contain futures, and if so, wait for
    the result of the future; for all other types in the list,
    simply append the value.

    Args:
        maybe_futures (List[Any]): A list which may contain futures.

    Returns:
        List[Any]: A list with the values passed in, or Future.result() values.
    """
    results = []
    for mf in maybe_futures:
        if isinstance(mf, Future):
            results.append(mf.result())
        else:
            results.append(mf)
    return results


def compose_and_cleanup(
    blob: storage.Blob,
    chunk: List[storage.Blob],
    client: storage.Client,
    executor: Executor,
):
    """Compose a blob and clean up its components. Cleanup tasks are
    scheduled in the provided executor and the composed blob immediately
    returned.

    Args:
        blob (storage.Blob): The blob to be composed.
        chunk (List[storage.Blob]): The component blobs.
        client (storage.Client): A Cloud Storage client.
        executor (Executor): An executor in which to schedule cleanup tasks.

    Returns:
        storage.Blob: The composed blob.
    """
    # wait on results if the chunk has any futures
    chunk = ensure_results(chunk)
    blob.compose(chunk, client=client)
    # clean up components, no longer need them
    delete_objects_concurrent(chunk, executor, client)
    return blob


def compose(
    object_path: str,
    slices: List[storage.Blob],
    client: storage.Client,
    executor: Executor,
) -> storage.Blob:
    """Compose an object from an indefinite number of slices. Composition is
    performed concurrently using a tree of accumulators. Cleanup is
    performed concurrently using the provided executor.

    Arguments:
        object_path {str} -- The path for the final composed blob.
        slices {List[storage.Blob]} -- A list of the slices that should
            compose the blob, in order.
        client {storage.Client} -- A Cloud Storage client to use.
        executor {Executor} -- A concurrent.futures.Executor to use for
            cleanup execution.

    Returns:
        storage.Blob -- The composed blob.
    """
    logger.info("Composing")
    chunks = generate_composition_chunks(slices)
    next_chunks = []
    identifier = generate_hex_sequence()

    while len(next_chunks) > 32 or not next_chunks:  # falsey empty list is ok
        for chunk in chunks:
            # make intermediate accumulator
            intermediate_accumulator = storage.Blob.from_string(
                object_path + next(identifier)
            )
            logger.info("Intermediate composition: %s", intermediate_accumulator)
            future_iacc = executor.submit(
                compose_and_cleanup, intermediate_accumulator, chunk, client, executor
            )
            # store reference for next iteration
            next_chunks.append(future_iacc)
        # go again with intermediate accumulators
        chunks = generate_composition_chunks(next_chunks)

    # Now can do final compose
    final_blob = storage.Blob.from_string(object_path)
    # chunks is a list of lists, so flatten it
    final_chunk = [blob for sublist in chunks for blob in sublist]
    logger.info("Final composition: %s", final_blob)
    compose_and_cleanup(final_blob, final_chunk, client, executor)

    logger.info("Composition complete")

    return final_blob


def main():
    project_id = "your_project_id"
    bucket_name = "your_bucket_name"
    prefix = "your_prefix"
    final_blob_name = 'result.csv'

    storage_clinet = storage.Client(project_id)
    blobs = list(storage_clinet.list_blobs(bucket_name, prefix=prefix))

    with ThreadPoolExecutor(max_workers=4) as executor:
        compose(
            f"gs://{bucket_name}/{prefix}/{final_blob_name}",
            blobs,
            storage_clinet,
            executor,
        )

以上ご参考になれば幸いです。