問題
パーティション化されたCSVファイルをCloudSQLにimportする場面は時々あると思います。 残念ながらCloudSQLはBigQueryのようにwildcardsによるimportを対応していません。需要はあるようですが↓
https://issuetracker.google.com/issues/132058570?pli=1
ファイルごとにimportするとオーバーヘッドが毎回発生するため、速度的に実用性があまりないと思います。一方、importはオペレーションの1種なので、並列処理はできません。
https://cloud.google.com/sql/docs/troubleshooting#import-export
HTTP Error 409: Operation failed because another operation was already in progress. There is already a pending operation for your instance. Only one operation is allowed at a time. Try your request after the current operation is complete.
なので、ファイルを結合してimportするのはより現実的な解決策だと思います。
gsutil compose
gsutil compose
を利用すると、GCSにある複数のファイルを結合できます。
cliのみならず、SDK(google.cloud.storage.Blob.compose
)も同じ機能が提供されています。
https://cloud.google.com/storage/docs/composing-objects#create-composite-client-libraries
https://cloud.google.com/storage/docs/gsutil/commands/compose
ただし、結合できるファイルは最大32個という制約があります。
There is a limit (currently 32) to the number of components that can be composed in a single operation.
32より多いファイルを結合したい
32より多いファイルを結合したい場合どうすれば良いでしょうか。 cliからだと、この方法で簡単にできます。 https://nakano-tomofumi.hatenablog.com/entry/2021/02/16/190626
バッチ処理あるいはアプリケーションにファイルを結合するロジックを追加したい場合は、SDKを利用するのは普通でしょう。GCPのコミュニティではすでに良さそうなチュートリアルが投稿されています。
https://cloud.google.com/community/tutorials/cloud-storage-infinite-compose
Accumulator PatternとAccumulator Tree二つの方法が紹介されており、ファイルが多い場合の実行効率を考えるとAccumulator Treeがより実用的に見えます。 しかし、手法についてチュートリアルでは詳しく説明してもらっていますが、フルなソースがなかったたま、そのままでは使えません。
検証を含めて自分なりにソースコードをまとめてみました。
認証を正常に行ったあと、main関数内の変数project_id
, bucket_name
, prefix
を編集すれば、そのまま実行できます。下記の3つのファイルを結合する場合、prefix
はaaa/bbb/hoge
になります。
aaa/bbb/hoge001.csv
aaa/bbb/hoge002.csv
aaa/bbb/hoge003.csv
requirements.txt
google-cloud-logging==2.7.1
google-cloud-storage==2.9.0
まとめるオブジェクトが多い場合、チュートリアル中のコードはdelete処理のtimeoutによるバグが発生します。 対応方法は こちらの記事 にご参照ください
import logging
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from itertools import count
from time import sleep
from typing import Any, Iterable, List
import google.cloud.logging
from google.cloud import storage
client = google.cloud.logging.Client()
client.setup_logging()
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s %(message)s"
)
logger = logging.getLogger(__name__)
def generate_composition_chunks(slices: List, chunk_size: int = 31) -> Iterable[List]:
"""Given an indefinitely long list of blobs, return the list in 31-item chunks.
Arguments:
slices {List} -- A list of blobs, which are slices of a desired final blob.
Returns:
Iterable[List] -- An iteration of 31-item chunks of the input list.
Yields:
Iterable[List] -- A 31-item chunk of the input list.
"""
while len(slices):
chunk = slices[:chunk_size]
yield chunk
slices = slices[chunk_size:]
def generate_hex_sequence() -> Iterable[str]:
"""Generate an indefinite sequence of hexadecimal integers.
Yields:
Iterator[Iterable[str]]: The sequence of hex digits, as strings.
"""
for i in count(0):
yield hex(i)[2:]
def delete_objects_concurrent(blobs, executor, client) -> None:
"""Delete Cloud Storage objects concurrently.
Args:
blobs (List[storage.Blob]): The objects to delete.
executor (Executor): An executor to schedule the deletions in.
client (storage.Client): Cloud Storage client to use.
"""
for blob in blobs:
logger.info("Deleting slice {}".format(blob.name))
executor.submit(blob.delete, client=client)
sleep(0.005) # quick and dirty ramp-up (Sorry, Dijkstra.)
def ensure_results(maybe_futures: List[Any]) -> List[Any]:
"""Pass in a list that may contain futures, and if so, wait for
the result of the future; for all other types in the list,
simply append the value.
Args:
maybe_futures (List[Any]): A list which may contain futures.
Returns:
List[Any]: A list with the values passed in, or Future.result() values.
"""
results = []
for mf in maybe_futures:
if isinstance(mf, Future):
results.append(mf.result())
else:
results.append(mf)
return results
def compose_and_cleanup(
blob: storage.Blob,
chunk: List[storage.Blob],
client: storage.Client,
executor: Executor,
):
"""Compose a blob and clean up its components. Cleanup tasks are
scheduled in the provided executor and the composed blob immediately
returned.
Args:
blob (storage.Blob): The blob to be composed.
chunk (List[storage.Blob]): The component blobs.
client (storage.Client): A Cloud Storage client.
executor (Executor): An executor in which to schedule cleanup tasks.
Returns:
storage.Blob: The composed blob.
"""
# wait on results if the chunk has any futures
chunk = ensure_results(chunk)
blob.compose(chunk, client=client)
# clean up components, no longer need them
delete_objects_concurrent(chunk, executor, client)
return blob
def compose(
object_path: str,
slices: List[storage.Blob],
client: storage.Client,
executor: Executor,
) -> storage.Blob:
"""Compose an object from an indefinite number of slices. Composition is
performed concurrently using a tree of accumulators. Cleanup is
performed concurrently using the provided executor.
Arguments:
object_path {str} -- The path for the final composed blob.
slices {List[storage.Blob]} -- A list of the slices that should
compose the blob, in order.
client {storage.Client} -- A Cloud Storage client to use.
executor {Executor} -- A concurrent.futures.Executor to use for
cleanup execution.
Returns:
storage.Blob -- The composed blob.
"""
logger.info("Composing")
chunks = generate_composition_chunks(slices)
next_chunks = []
identifier = generate_hex_sequence()
while len(next_chunks) > 32 or not next_chunks: # falsey empty list is ok
for chunk in chunks:
# make intermediate accumulator
intermediate_accumulator = storage.Blob.from_string(
object_path + next(identifier)
)
logger.info("Intermediate composition: %s", intermediate_accumulator)
future_iacc = executor.submit(
compose_and_cleanup, intermediate_accumulator, chunk, client, executor
)
# store reference for next iteration
next_chunks.append(future_iacc)
# go again with intermediate accumulators
chunks = generate_composition_chunks(next_chunks)
# Now can do final compose
final_blob = storage.Blob.from_string(object_path)
# chunks is a list of lists, so flatten it
final_chunk = [blob for sublist in chunks for blob in sublist]
logger.info("Final composition: %s", final_blob)
compose_and_cleanup(final_blob, final_chunk, client, executor)
logger.info("Composition complete")
return final_blob
def main():
project_id = "your_project_id"
bucket_name = "your_bucket_name"
prefix = "your_prefix"
final_blob_name = 'result.csv'
storage_clinet = storage.Client(project_id)
blobs = list(storage_clinet.list_blobs(bucket_name, prefix=prefix))
with ThreadPoolExecutor(max_workers=4) as executor:
compose(
f"gs://{bucket_name}/{prefix}/{final_blob_name}",
blobs,
storage_clinet,
executor,
)
以上ご参考になれば幸いです。