GCS composeで32以上のオブジェクトをまとめる方法

先日の記事にて パーティション化されたCSVオブジェクトをCloudSQLにimportする方法 を紹介しました。 SDKを利用して32より多いオブジェクトをまとめる場合、GCPのコミュニティのチュートリアルのコードをそのまま使っていました。(2023.8.10にアーカイブされ済み) https://github.com/GoogleCloudPlatform/community/blob/master/archived/cloud-storage-infinite-compose/index.md 文章最後に書いてあるように This code is offered for demonstration purposes only, and should not be considered production-ready. Applying it to your production workloads is up to you! コードはデモ用途のため、そのまま本番環境で使うのはで推奨しないです。先日この文を見逃して自分の環境にデプロイし、平常運転1ヶ月後、パーティション化されたCSVオブジェクトの数が増えたらバグが出ました。 事象 起きていた事象は、composeによって作られた中間オブジェクトが消されず残り続け、最終的に数TBのとてつもなく大きいオブジェクトが作成されてしまいました。一つのオブジェクトは5TiBまでというGCSの上限を超えてしまうため、処理が失敗しました。 どこがバグ 問題は関数compose_and_cleanupから呼び出している関数delete_objects_concurrentにあります。オブジェクトをまとめた後、毎回中間オブジェクトを削除していますが、そのdelete処理自体が非同期処理ですべての処理が終了するのを待たずに次のblob.composeの実行が始まります。 The delete_objects_concurrent function is very simple, using fire-and-forget delete tasks in an executor. A more robust implementation might check the futures from the submitted tasks. チュートリアルの中にちゃんと書いてあります。(もっとロバスト性のある実装は、submit済みのタスクのfuturesをチェックするとのこと) まとめようとするオブジェクトの数が少ない場合はほとんど問題がないですが、1000オブジェクトあたりを超えるとdeleteがcompose処理より遅くなるため、next_chunkが永遠に存在しているままwhileループから脱出できない状態になります。 delete処理自体は特に制限ないようですが、数百のオブジェクトを削除する場合時間かかるとドキュメントに記載されています。 https://cloud.google.com/storage/docs/deleting-objects#delete-objects-in-bulk 解決方法 最初に試したことは、記事に書いてあるようにsubmit済みのタスクのfuturesをwait処理でチェックします。つまりすべてのdelete処理が終わるまでに、compose処理を行いません。 (python並行処理の詳細は公式ドキュメントまたは他の記事をご覧ください) from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait def delete_objects_concurrent(blobs, executor, client) -> None: """Delete Cloud Storage objects concurrently. Args: blobs (List[storage.Blob]): The objects to delete. executor (Executor): An executor to schedule the deletions in. client (storage.Client): Cloud Storage client to use. """ futures = [] for blob in blobs: logger.info("Deleting slice {}".format(blob.name)) futures.append(executor.submit(blob.delete, client=client)) wait(futures, return_when=ALL_COMPLETED) logger.info(f"Deleted {len(blobs)} objects") しかし大量なオブジェクトを1つずつ削除していること自体は変わらないので、試してみたら処理が途中でtimeoutになりました。 batch を利用して削除処理をバッチにまとめる方法もありますが、ロジックをシンプルにしたいので、他の方法を考えました。 ...

August 17, 2023 · Me

Apache Airflowのコミッターになった話

Google Providersのバグを見つけた 先日DAGを開発中にGoogle Providers (apache-airflow-providers-google==8.9.0)のCloudDataTransferServiceJobStatusSensorを使用したところ、 project_idはオプション引数であるにも関わらず、省略するとエラーが発生するというバグに遭遇しました。 [2023-03-09, 02:31:24 UTC] {taskinstance.py:1774} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 236, in execute while not self.poke(context): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py", line 91, in poke operations = hook.list_transfer_operations( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 380, in list_transfer_operations request_filter = self._inject_project_id(request_filter, FILTER, FILTER_PROJECT_ID) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 459, in _inject_project_id raise AirflowException( airflow.exceptions.AirflowException: The project id must be passed either as `project_id` key in `filter` parameter or as project_id extra in Google Cloud connection definition. Both are not set! 修正自体はそれほど困難に見えなかったため、Airflowにissueを報告するよりも、自分で直接修正に取り組むことにしました。 Contributor手順を読んで環境構築する むやみにコーディングするより、まずCONTRIBUTINGを読んだほうが良いと思い、下記のドキュメンを見つけました。 https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst けっこう長いので、前半をさらさらと読んでContribution Workflowを参照しながら、ローカルの開発環境を問題なく構築しました。躓きそうなところ基本的にドキュメントにまとめてもらっています。 開発 https://github.com/apache/airflow/pull/30035/files#diff-2118fb849310fd85b9768e6732ab2dfa60ed75c751b5b9d0e176bcd1f950b6bbR75-R109 まず他のところを真似してproject_idを指定しない場合の単体テスクを書きます。何も実装していないので、もちろんテストはコケます。その後、CloudDataTransferServiceJobStatusSensorの実装を下記のようにproject_idを明示的に指定しない場合、hook.project_idから取得できるように変更します。 - request_filter={"project_id": self.project_id, "job_names": [self.job_name]} + request_filter={"project_id": self.project_id or hook.project_id, "job_names": [self.job_name]} これで終わり!PRを投げてPRを待ちます。 一週間もかからずApache Software Foundationメンバーの方からApproveをもらいました。 受け入れテスト 2週間後「 apache-airflow-providers-google 8.12.0rc1 をリリースされたので、リリースのテストをお願いします」の連絡がissueから来ました。 https://github.com/apache/airflow/issues/30427 8.12.0rc1をインストールし実際にCloudDataTransferServiceJobStatusSensorの動作を検証してみたら特に問題なかったので、うまく動いたよと返信しました。 数日後8.12.0が無事リリースされて、 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/index.html#id5 Support CloudDataTransferServiceJobStatusSensor without specifying a project_id (#30035) 修正がちゃんとリリースノートに書かれています。これでcoreにコミットしたわけではないですが、Apache Airflowのコミッターになりました。 感想 微力ながらずっとお世話になっているAirflowに貢献できてよかったです。理解を深めてモチベーション向上に繋がったのではないかと思います。 修正できるところまだまだたくさんありそうなので、今後も引き続きコミットしていきたいと思います。

May 11, 2023 · Me

Slack BoltとCloud Functionsでデータ基盤アクセス承認システムを開発

会社のTech Blogに記事を書きました Slack BoltとCloud Functionsでデータ基盤アクセス承認システムを開発

April 20, 2023 · Me

PythonデコレータのSyntactic Sugarなぜ便利かを理解した

Pythonデコレータを利用する場合、@decoratorというSyntactic Sugarを関数やメソッドの先頭に付けるのが一般的ですが、なぜ便利なのかいまいち理解できていないので、調べてみました。 デコレータの詳細は公式ドキュメントあるいは他の方がすでに紹介されているので、本記事では割愛します。ちなみにおすすめの記事はこちらです。 https://rednafi.github.io/digressions/python/2020/05/13/python-decorators まず適当に文字列の両側に<b>を追加してくれる簡単なデコレータを書きましょう。num_bは片方に追加する<b>の数を表しており、デフォルトは1となっています。 from functools import partial, wraps class Emphasis: def __init__(self) -> None: pass def add_b(self, func=None, num_b=1): if func is None: return partial(self.add_b, num_b=num_b) @wraps(func) def wrap(*args, **kwargs): ret = func(*args, **kwargs) return "<b>" * num_b + ret + "<b>" * num_b return wrap デコレータ引数なし・関数引数なしの場合 最初は一番簡単なパターンで見ていきましょう。デコレータep.add_b(hello)の返り値は関数なので、一回コール()すればSyntactic Sugarと同じことができるので、むしろSyntactic Sugarを使わないほうがわかりすそうですね。 ep = Emphasis() def hello(): return "Hello, There" @ep.add_b def hello_with_sugar(): return "Hello, There" print(ep.add_b(hello)()) print(hello_with_sugar()) <b>Hello, There<b> <b>Hello, There<b> デコレータ引数あり・関数引数ありの場合 しかしデコレータと関数両方引数がある場合はep.add_b(num_b=2)(hello)("Taro")を書かないといけません。言葉で説明すると、hello関数を引数としてデコレータep.add_b(num_b=2)に入れて返された関数に引数"taro"を渡してコールするという意味です。書けなくはないですが、ややこしくなるので、Syntactic Sugarのほうが断然簡単ですね ep = Emphasis() def hello(name): return f"Hello, {name}" @ep.add_b(num_b=2) def hello_with_sugar(name): return f"Hello, {name}" print(ep.add_b(num_b=2)(hello)("Taro")) print(hello_with_sugar("Taro")) <b><b>Hello, Taro<b><b> <b><b>Hello, Taro<b><b> おわりに Syntactic Sugarのおかげで、デコレータと関数両方引数がある場合でも簡単にデコレータを使えるようになりました。ただ個人的にデコレータを初めて触る方はまずあえてSyntactic Sugarを使わず試してみたら、デコレータの理解が捗るのではないかと思います。 ご参考になれば幸いです。 ...

November 23, 2022 · Me

Telegramで多言語ニュースチャットボットを作った話

先日 多言語Webニュースアプリ を作りました。 通勤電車で当日のニュースをチェックしながら、外国語を勉強できるので、まあまあ使いやすかったです。 実はこのWebを開発する1年ほど前に、一度Telegram botで多言語ニュースチャットボットを開発していました。今日はそれについて紹介したいと思います。 https://github.com/aibazhang/multitrue-bot 事前準備 Telegram bot Telegramは日本ではあまり人気がないですが、Telegramのアカウントを作るだけでAPIキーを取得できるので、ざくっとボット作りたい場合は使い勝手がけっこういいです。また、良さげのSDKもあります↓。そういう意味ではSlackも同じですが、Telegramは感覚的にLineに近いので手軽さで勝っていると思います。 https://github.com/python-telegram-bot/python-telegram-bot/tree/master/examples ニュースの取得 ニュースの取得方法は先日開発したWebアプリと同じく NewsAPI 利用して取得しています。なぜNewsAPIを採用したかについては 半年前の記事 で詳細を説明したので、割愛します。 実装 https://core.telegram.org/ に参照してボットの初期設定が完了したら、実装に入ります。 フローは以下となります。 /startでニュースボットを起動 国・地域を選ぶ ニュースのジャンルを選ぶ 終了あるいは2.に戻る 全体フロー SDKが提供してくれた下記3つのhandlerを利用しています。 ConversationHandler: 先ほど設計したフローに基づいてhandlerを定義する。 CommandHandler: コマンドによって発火される。今回は「入り口 (entry point)」として使う CallbackQueryHandler: 他のhandlerの返り値によって発火される def main(): updater = Updater( token=json.load(open(KEY_PATH / "keys.json", "r"))["telegram_key"], use_context=True, ) dispatcher = updater.dispatcher country_pattern = "^us|jp|cn|tw|kr|gb$" headlines_pattern = "^us|jp|cn|tw|kr|gb business|entertainment|general|health|science|sports|technology|$" conv_handler = ConversationHandler( entry_points=[CommandHandler("start", start)], states={ "CATEGORY": [CallbackQueryHandler(select_category, pattern=country_pattern)], "HEADLINES": [CallbackQueryHandler(get_news, pattern=headlines_pattern)], "START OVER OR NOT": [ CallbackQueryHandler(start_over, pattern="^start over$"), CallbackQueryHandler(end, pattern="^end$"), ], }, fallbacks=[CommandHandler("start", start)], ) dispatcher.add_handler(conv_handler) updater.start_polling() updater.idle() メニュー 続いてはcallback関数を見ていきましょう。start関数はwelcomeメッセージをユーザに送って、ユーザに国・地域を選んでもらいます。その後、select_category関数でニュースのジャンルを選んでもらいます。 引数contextとupdate context.user_data: ボットを利用してユーザの情報を取得する context.bot: ボット自体を操作する(今回はメッセージを送るだけ)のに使う update: メッセージを更新するのに使う def start(update, context): user = update.message.from_user logger.info("User {} started the conversation.".format(user)) for i, v in vars(user).items(): context.user_data[i] = v welcome_message = ( "Hello, {}\n" "This is JC News bot🗞️🤖\n\n" "You can get Top News Headlines for a Country and a Category from here. \n\n".format(user.first_name) ) print(context) keyborad = [ [ InlineKeyboardButton("🇺🇸", callback_data="us"), InlineKeyboardButton("🇯🇵", callback_data="jp"), InlineKeyboardButton("🇹🇼", callback_data="tw"), ], [ InlineKeyboardButton("🇰🇷", callback_data="kr"), InlineKeyboardButton("🇬🇧", callback_data="gb"), InlineKeyboardButton("🇨🇳", callback_data="cn"), ], ] reply_markup = InlineKeyboardMarkup(keyborad) context.bot.send_message(chat_id=update.effective_chat.id, text=welcome_message) update.message.reply_text("Please Choose a Country🤖", reply_markup=reply_markup) return "CATEGORY" def select_category(update, context): logger.info("User data from context {}".format(context.user_data)) logger.info("Chat data from context {}".format(context.chat_data)) logger.info("Bot data from context {}".format(context.bot_data)) query = update.callback_query query.answer() country = query.data keyborad = [ [ InlineKeyboardButton("👩🏼‍💻Technology", callback_data=country + " technology"), InlineKeyboardButton("🧑‍💼Business", callback_data=country + " business"), ], [ InlineKeyboardButton("👨🏻‍🎤Entertainment", callback_data=country + " entertainment"), InlineKeyboardButton("👩🏻‍⚕️Health", callback_data=country + " health"), ], [ InlineKeyboardButton("👨🏿‍🔬Science", callback_data=country + " science"), InlineKeyboardButton("🏋🏼‍♂️Sports", callback_data=country + " sports"), ], [InlineKeyboardButton("🌎General", callback_data=country + " general")], ] reply_markup = InlineKeyboardMarkup(keyborad) query.edit_message_text(text="Please Choose a Category🤖", reply_markup=reply_markup) return "HEADLINES" ニュースを取得 最後は最もコアとなるニュースを取得するロジックです。countryとcategoryによってNewAPIから最新ホットラインニュースを取得しフォーマットした後、ユーザにメッセージを送ります。 具体的な詳細は説明しませんが、NewsAPICollectorというクラスでNewAPIをラッピングしています。 ...

November 3, 2022 · Me

GitHub Issueだけで自分のルーティングを管理し、そして草を生やす

先日シェルスクリプトで 個人ナレッジマネジメントツール を作った話しを投稿して、予想以上に需要がありました。 ルーティングをGitHub Posterに生成 似たような発想でルーティング管理アプリを使わずに、GitHubのcontributionsのように自分のルーティング(例えば読書、ランニング、LeetCode、外国語の勉強)を管理できると面白くない?と思いながら、GitHub上で検索したらyihong0618さんが開発したGitHubPosterを発見しました。 https://github.com/yihong0618/GitHubPoster GitHub Isssue、Duolingo、Twitter、Kindleなど20個以上のAPIで履歴を取得し、GitHub svg poster(aka. 皆さんが大好きなGitHubの草)を生成します。 実際使ってみよう ローダーは20個以上あり、とりあえずissueで今年年始以来の読書ルーティングの草を生やしてみました。 Issueを書く issueフォーマットは↓に従う必要でがあります。 {整数} {内容} 今年は1月から、日課をこなした日に当該Issueに↓のコメント追加していました。 2 「データ指向アプリケーションデザイン」 環境構築 pip install -U 'github_poster[all]' 実行 GitHubをトークンを取得し、下記コマンドを実行するだけです。 github_poster issue --issue_number ${issue_number} --repo_name ${repo_name} --token ${github_token} また、オプション --special-color1 --special-color2 ---stand-with-ukraine などによって色を指定することも可能です。(ウクライナをサポートする配色もあるようですね) 結果 最終的に生成されたGitHub Poster(.svgファイル)はこんな感じです。単位はtimes(回数)になっているが、hours(時間)が正しいです。 確認してみると、 今年今まで336時間読書していて、週の真ん中あまり本を読んでいないと気づきました。 使った感想 余計なモバイルアプリを使わずに、ルーティング管理できるのはミニマムリスト的には最高 生成されたファイルは.svgなので、自分のサイトや他のところに取り入れるのも簡単そう 新しいローダーの開発に貢献してみたい

November 2, 2022 · Me

自作PythonラッパーでGoogleグループのメンバーシップをより便利に管理する

Googleグループのメンバーシップを管理する方法 Googleグループを管理するのは Workspaceのadmin SDKを利用する https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups CLI gcloudを利用する https://cloud.google.com/sdk/gcloud/reference/identity/groups/memberships Cloud Identity APIを叩く https://cloud.google.com/identity/docs/how-to/setup 3つの方法があります CLI gcloudはCloud Identity APIを叩いているので、方法2と方法3は実質同じです。 方法1との違いはなんでしょう? ドキュメント読んで実際に試したところ、大きな違いは WorkspaceはGCPで作ったサービスアカウントをメンバーとして追加できない(それはそう) gcloudあるいはCloud Identity APIだと有効期限付きでメンバー追加することが可能 Googleグループによってセンシティブな情報のアクセス制御する際に、有効期限付きでメンバー追加する機能を利用すると、セキュリティを担保できるので、使い勝手がかなり良いと思います。 シェルスクリプトを書いてGoogleグループ管理を自動化する場合であれば、ローカル環境やリモート環境(EC2やCloud Runなどを含めて)問わず、gcloudは断然便利です。下記のコマンドで一発aliceさんを1時間有効期限付きでグループgroup_name@email.comに追加できます。 gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='1h' 一方、アプリケーションなどに組み込みたい場合は、Cloud Identity APIを叩くことになります。 現時点(2022.10.1)Cloud IdentityはBigqueyなどのサービスのような公式SDKが存在しなく、APIのドキュメントも若干わかりづらいので、この度より便利にアプリケーションからGoogleグループを管理するために、Pythonラッパーを作ってみました。簡単に修正すれば他言語でも利用できる(はず)です。 Pythonラッパー 認証 Cloud Identity APIを叩くのに、言うまでもなく認証が必要です。 キーによる認証は公式のドキュメントにあるため、コピペすればOKです。 https://cloud.google.com/identity/docs/how-to/setup アプリケーションをCloud Runなどにデプロイしている場合、キーを使わずにサービスアカウントとして認証する方法もあります。 from httplib2 import Http from oauth2client.client import GoogleCredentials from googleapiclient.discovery import build def create_service(): credentials = GoogleCredentials.get_application_default() service_name = "cloudidentity" api_version = "v1" return build( service_name, api_version, http=credentials.authorize(Http()), cache_discovery=False, ) Group IDとMembership ID 実際にメンバーシップを取得するAPIを叩いてみたらわかると思います。memberships_idとgroup_idによってメンバーシップのnameが決められます。後ほどグループからメンバーを削除する際に、ここのnameが必須です。 ...

October 2, 2022 · Me

M1 Mac (macOS Monterey 12.2.1)でpyenv/Python開発環境構築

はじめに 先日M1のMBPを入手したので、早速Pythonの開発環境を構築しました。 Intel Macと比べて大きく違うので、昔メモったIntel Macの環境構築手順が使えなくなり、新しく環境構築の記事を書こうと思いました。 少しでもお役に立てれば幸いです。 この記事 では Homebrewは使わないで!(動くけど難易度高い) pyenvは使わないで!(動かない=2021年2月時点) と記述していましたが、おそらく2021年2月時点ではHomebrewやpyenvなどはまだM1 Macに対応していない(?)と考えられます。現在(2022年4月)では、少し設定は必要ですが、問題なく動作できるようになりました。 Homebrew まずはHomebrewをインストールします /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh) M1 MacではHomebrewによってインストールされたパッケージは/usr/local/bin/hogeではなく、/opt/homebrew/bin/hoge保存されるので、brewをインストールした後は環境変数を設定する必要があります。 以下を.bashrcか.zshrcに追加して、ターミナルを再起動すると、brew installは問題なく動作します。 export PATH=/opt/homebrew/bin:$PATH export PATH=/opt/homebrew/sbin:$PATH fishユーザはこれを実行してください。 set PATH /opt/homebrew/bin /opt/homebrew/sbin $PATH pyenv pyenvとvirtualenvをインストールします brew install pyenv brew install pyenv-virtualenv ここまでは問題ないはずです。 続いてバージョンしてPythonをインストールします。 たとえば Python 3.8.12 pyenv install 3.8.12 しかし、うまくいきません pyenv install 3.8.12 python-build: use openssl@1.1 from homebrew python-build: use readline from homebrew Installing Python-3.8.12... python-build: use readline from homebrew python-build: use zlib from xcode sdk BUILD FAILED (OS X 12.2.1 using python-build 20180424) Inspect or clean up the working tree at /var/folders/f1/3g092d_11zl8xnf67bp0n79r0000gp/T/python-build.20220318172446.23811 Results logged to /var/folders/f1/3g092d_11zl8xnf67bp0n79r0000gp/T/python-build.20220318172446.23811.log Last 10 log lines: checking for --with-cxx-main=<compiler>... no checking for clang++... no configure: By default, distutils will build C++ extension modules with "clang++". If this is not intended, then set CXX on the configure command line. checking for the platform triplet based on compiler characteristics... darwin configure: error: internal configure error for the platform triplet, please file a bug report make: *** No targets specified and no makefile found. Stop. いろいろ調べた結果、 https://issueantenna.com/repo/pyenv/pyenv/issues/2284 ...

April 22, 2022 · Me