Elementaryアップデートの試行錯誤
副業先のtech blogに記事を書きました Elementaryアップデートの試行錯誤
副業先のtech blogに記事を書きました Elementaryアップデートの試行錯誤
マネージド型k8sのDaemonSet EKSなどのKubernetesマネージドサービスは、DaemonSetを介してNodeにkube-proxy, ebs csi, cniなどのPodを適切に配置してくれます。 k8sドキュメントに記載している 自動的に追加されるTolerations 以外で、ワイルドカードのTolerationsもデフォルトで入っています。 tolerations: - operator: Exists EKS管轄外のDaemonSet しかし、DatadogなどEKS外でデプロイされたDaemonSet Podsを入れる際に、TaintsがついているNodeにPodがスケジューリングされないため、注意が必要です。 解決方法としては、ドキュメントに書かれているように https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/#taints-and-tolerations You can add your own tolerations to the Pods of a DaemonSet as well, by defining these in the Pod template of the DaemonSet. DaemonSetのPodテンプレートにtolerationsを定義することです。 tolerations: - operator: Exists また、このようなワイルドカードのTolerationsを追加するのは便利かもしれないですが、(EKSの場合)Fargateにスケジューリングされてしまうので、Affinityもちゃんと書きましょう affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: eks.amazonaws.com/compute-type operator: NotIn values: - fargat 余談ですが、同じくDaemonSet介してデプロイされるfluent bitでは、不必要なTolerationsによるバグが過去起きていました。 https://github.com/aws-samples/amazon-cloudwatch-container-insights/issues/61
会社のtech blogに記事を書きました Helmを使用せずにKustomizeでAirbyteをKubernetes上に構築する
本記事に書かれていることを実践する際は自己責任でお願いします。不都合などが生じた場合、責任を負いかねます。 背景 日本のインターネット環境は比較的自由ですが、海外のサービスを利用際には大人の諸事情によってサイトがリダイレクトされて利用できない場合があります。 制限されるのはあまり気持ちよく感じないのとWeb閲覧時自分のプライバシーを保護するため、 先日自作プロキシサーバを自作しました。 本記事では主にHowに焦点を当てて紹介します。関連技術のWhatとWhyについては深く言及しないので、公式ドキュメントまたは他の解説記事をご参照ください。 雑なアーキテクチャ図 利用技術・サービス AWS Lightsail 仮想プライベートサーバ 3ヶ月無料 $3.5/月 ムームードメイン 3000~4000円/年(申請するドメインによる) gost OSS Webサービスを装うために使う Cloudflare gostを使うのに必要 無料プランで十分 SwitchyOmega OSS クライアント側の設定 あくまでも一例で最適解ではないです。 事前準備 VPS(仮想プライベートサーバ)を契約 今回は、AWSが提供しているLightsailという軽量仮想プライベートサーバのサービスを利用しますが、EC2やHerokuなど他のサービスを利用する場合もまったく問題ありません。リージョンは日本以外(例えばUS)に設定しておきます。AWSのCLIを利用する場合、AWS IAMなどを設定する必要がありますが、今回は基本的にいじらなくても良いです。 Lightsailでサーバを立ち上げたら、AWS CloudShellからサーバにアクセスできます。ローカルに慣れている方はキーをダウンロードして、sshでアクセスしても構いません。 念のため、 curl ipinfo.io を実行して、設定しているリージョンの住所と一致しているか確認します。 また、HTTPSを利用するため、Networking -> IPv4 FirewallからHTTPSを追加しておきます。 ドメインを取得 昨今の円安の影響で海外のドメインレジスターサービス(例えばGoDaddy)がかなり高くなっているため、国内サービス ムームードメイン を利用してドメインを取得しました。 Cloudflareアカウントを作成 WebサービスでもないなのになぜわざわざCloudflare使う理由 簡単に言えば、VPSのIPがブロックされる可能性はゼロではないのとプロバイダ側にIPアドレスを変更してくれないケースが多いためです。CloudflareなどのCDNサービスを挟むことでわずかな遅延が発生するかもしれませんが、可用性を向上させることができます。また、Cloudflareは無料枠を提供しており、WebSocketプロトコル (gostを使うのに必要なもの)に対応しているため、今回ユースケースに適しています。 ログインした後ガイダンスにしたがってネームサーバを設定します。 続いて、ムームードメインからデアフォルトのネームサーバを変更します。 詳細は ネームサーバのセットアップ方法(GMOペパボ以外のサービス) にご参照ください。 最後はサブドメインのレコードを追加して、立ち上げたサーバのIPアドレスと紐付けます。 IPv4 FirewallにHTTPSを追加しておかないと動かないので気をつけてください。 https://developers.cloudflare.com/dns/manage-dns-records/how-to/create-subdomain/ 事前準備が完了したので、いよいよサーバ側を設定を始めます。 サーバ側の設定 Dockerエンジンをインストール Docker公式ドキュメント を参考にしてDockerエンジンをインストールします。 # Add Docker's official GPG key: sudo apt-get update sudo apt-get install ca-certificates curl gnupg sudo install -m 0755 -d /etc/apt/keyrings curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg sudo chmod a+r /etc/apt/keyrings/docker.gpg # Add the repository to Apt sources: echo \ "deb [arch="$(dpkg --print-architecture)" signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \ "$(. /etc/os-release && echo "$VERSION_CODENAME")" stable" | \ sudo tee /etc/apt/sources.list.d/docker.list > /dev/null sudo apt-get update sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin TCP BBR (Bottleneck Bandwidth and Round-trip propagation time)をオンにする TCP BBRはGoogleが開発した輻輳制御アルゴリズムで、TCP通信の高速化を実現できます。詳細は下記の記事にご参照ください。 ...
先日の記事にて パーティション化されたCSVオブジェクトをCloudSQLにimportする方法 を紹介しました。 SDKを利用して32より多いオブジェクトをまとめる場合、GCPのコミュニティのチュートリアルのコードをそのまま使っていました。(2023.8.10にアーカイブされ済み) https://github.com/GoogleCloudPlatform/community/blob/master/archived/cloud-storage-infinite-compose/index.md 文章最後に書いてあるように This code is offered for demonstration purposes only, and should not be considered production-ready. Applying it to your production workloads is up to you! コードはデモ用途のため、そのまま本番環境で使うのはで推奨しないです。先日この文を見逃して自分の環境にデプロイし、平常運転1ヶ月後、パーティション化されたCSVオブジェクトの数が増えたらバグが出ました。 事象 起きていた事象は、composeによって作られた中間オブジェクトが消されず残り続け、最終的に数TBのとてつもなく大きいオブジェクトが作成されてしまいました。一つのオブジェクトは5TiBまでというGCSの上限を超えてしまうため、処理が失敗しました。 どこがバグ 問題は関数compose_and_cleanupから呼び出している関数delete_objects_concurrentにあります。オブジェクトをまとめた後、毎回中間オブジェクトを削除していますが、そのdelete処理自体が非同期処理ですべての処理が終了するのを待たずに次のblob.composeの実行が始まります。 The delete_objects_concurrent function is very simple, using fire-and-forget delete tasks in an executor. A more robust implementation might check the futures from the submitted tasks. チュートリアルの中にちゃんと書いてあります。(もっとロバスト性のある実装は、submit済みのタスクのfuturesをチェックするとのこと) まとめようとするオブジェクトの数が少ない場合はほとんど問題がないですが、1000オブジェクトあたりを超えるとdeleteがcompose処理より遅くなるため、next_chunkが永遠に存在しているままwhileループから脱出できない状態になります。 delete処理自体は特に制限ないようですが、数百のオブジェクトを削除する場合時間かかるとドキュメントに記載されています。 https://cloud.google.com/storage/docs/deleting-objects#delete-objects-in-bulk 解決方法 最初に試したことは、記事に書いてあるようにsubmit済みのタスクのfuturesをwait処理でチェックします。つまりすべてのdelete処理が終わるまでに、compose処理を行いません。 (python並行処理の詳細は公式ドキュメントまたは他の記事をご覧ください) from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait def delete_objects_concurrent(blobs, executor, client) -> None: """Delete Cloud Storage objects concurrently. Args: blobs (List[storage.Blob]): The objects to delete. executor (Executor): An executor to schedule the deletions in. client (storage.Client): Cloud Storage client to use. """ futures = [] for blob in blobs: logger.info("Deleting slice {}".format(blob.name)) futures.append(executor.submit(blob.delete, client=client)) wait(futures, return_when=ALL_COMPLETED) logger.info(f"Deleted {len(blobs)} objects") しかし大量なオブジェクトを1つずつ削除していること自体は変わらないので、試してみたら処理が途中でtimeoutになりました。 batch を利用して削除処理をバッチにまとめる方法もありますが、ロジックをシンプルにしたいので、他の方法を考えました。 ...
問題 パーティション化されたCSVファイルをCloudSQLにimportする場面は時々あると思います。 残念ながらCloudSQLはBigQueryのようにwildcardsによるimportを対応していません。需要はあるようですが↓ https://issuetracker.google.com/issues/132058570?pli=1 ファイルごとにimportするとオーバーヘッドが毎回発生するため、速度的に実用性があまりないと思います。一方、importはオペレーションの1種なので、並列処理はできません。 https://cloud.google.com/sql/docs/troubleshooting#import-export HTTP Error 409: Operation failed because another operation was already in progress. There is already a pending operation for your instance. Only one operation is allowed at a time. Try your request after the current operation is complete. なので、ファイルを結合してimportするのはより現実的な解決策だと思います。 gsutil compose gsutil composeを利用すると、GCSにある複数のファイルを結合できます。 cliのみならず、SDK(google.cloud.storage.Blob.compose)も同じ機能が提供されています。 https://cloud.google.com/storage/docs/composing-objects#create-composite-client-libraries https://cloud.google.com/storage/docs/gsutil/commands/compose ただし、結合できるファイルは最大32個という制約があります。 There is a limit (currently 32) to the number of components that can be composed in a single operation. ...
先日GCPのDataformがGAリリースされました。 せっかくなので、まずAirflowにある既存ワークフローの一部をDataformで書き換えようと思いました。 AirflowからDataformをトリッガーする ドキュメントを調べると、AirflowからDataformをトリッガーするoperatorはすでに存在しています。 https://cloud.google.com/dataform/docs/schedule-executions-composer#create_an_airflow_dag_that_schedules_workflow_invocations 簡単にまとめると DataformCreateCompilationResultOperator: sqlxをsqlにコンパイルする DataformCreateWorkflowInvocationOperator: sqlを実行する しかし、どのようにAirflowからDataformへ変数を渡すかについてはドキュメントに記載されていません。 Dataformに変数を渡す まず、Dataformの設定ファイルdataform.jsonに変数varsを追加しておきましょう。 { "defaultSchema": "dataform", "assertionSchema": "dataform_assertions", "warehouse": "bigquery", "defaultDatabase": "project-stg", "defaultLocation": "asia-northeast1", "vars": { "bq_suffix": "_stg", "execution_date": "2023-05-24" } } DataformCreateCompilationResultOperatorのソースを調べてみたところ、compilation_resultという引数があることを発見しました。 https://github.com/apache/airflow/blob/739e6b5d775412f987a3ff5fb71c51fbb7051a89/airflow/providers/google/cloud/operators/dataform.py#LL73C29-L73C46 compilation_resultの中身を確認するため、APIの詳細を調べました。 https://cloud.google.com/dataform/reference/rest/v1beta1/CodeCompilationConfig CodeCompilationConfig内にvarsという変数を指定できるようです。 { "defaultDatabase": string, "defaultSchema": string, "defaultLocation": string, "assertionSchema": string, "vars": { string: string, ... }, "databaseSuffix": string, "schemaSuffix": string, "tablePrefix": string } BigQueryのsuffixをcode_compilation_configのvarsへ渡してみたら問題なく実行できました。ちなみに、Dataform側からはdataform.projectConfig.vars.bq_suffixで変数を呼び出せます。 DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "vars": { "bq_suffix": "_stg", } }, }, ) Dataformにcontext変数を渡す 増分処理する際によくdata_interval_endなどの context変数 を利用して当日の差分だけ取り入れます。 しかし、DataformCreateCompilationResultOperatorではtemplate_fieldsが実装されていないため、直接{{ data_interval_end }}のようなjinjaテンプレートを渡すことはできません。 TaskFlow でDataformCreateCompilationResultOperatorをラッピングすれば前述の問題を解決できます。data_interval_endはcontextから取得します。ポイントとしてはDataformCreateCompilationResultOperatorを返す際にexecute()を呼び出す必要があります。 from airflow.decorators import task @task() def create_compilation_result(**context): execute_date = ( context["data_interval_end"].in_timezone("Asia/Tokyo").strftime("%Y-%m-%d") ) return DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "vars": { "execute_date": execute_date, "bq_suffix": Variable.get("bq_suffix"), } }, }, ).execute(context=context) 最終的なDAGは以下のようになります。 ...
Google Providersのバグを見つけた 先日DAGを開発中にGoogle Providers (apache-airflow-providers-google==8.9.0)のCloudDataTransferServiceJobStatusSensorを使用したところ、 project_idはオプション引数であるにも関わらず、省略するとエラーが発生するというバグに遭遇しました。 [2023-03-09, 02:31:24 UTC] {taskinstance.py:1774} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 236, in execute while not self.poke(context): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py", line 91, in poke operations = hook.list_transfer_operations( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 380, in list_transfer_operations request_filter = self._inject_project_id(request_filter, FILTER, FILTER_PROJECT_ID) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 459, in _inject_project_id raise AirflowException( airflow.exceptions.AirflowException: The project id must be passed either as `project_id` key in `filter` parameter or as project_id extra in Google Cloud connection definition. Both are not set! 修正自体はそれほど困難に見えなかったため、Airflowにissueを報告するよりも、自分で直接修正に取り組むことにしました。 Contributor手順を読んで環境構築する むやみにコーディングするより、まずCONTRIBUTINGを読んだほうが良いと思い、下記のドキュメンを見つけました。 https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst けっこう長いので、前半をさらさらと読んでContribution Workflowを参照しながら、ローカルの開発環境を問題なく構築しました。躓きそうなところ基本的にドキュメントにまとめてもらっています。 開発 https://github.com/apache/airflow/pull/30035/files#diff-2118fb849310fd85b9768e6732ab2dfa60ed75c751b5b9d0e176bcd1f950b6bbR75-R109 まず他のところを真似してproject_idを指定しない場合の単体テスクを書きます。何も実装していないので、もちろんテストはコケます。その後、CloudDataTransferServiceJobStatusSensorの実装を下記のようにproject_idを明示的に指定しない場合、hook.project_idから取得できるように変更します。 - request_filter={"project_id": self.project_id, "job_names": [self.job_name]} + request_filter={"project_id": self.project_id or hook.project_id, "job_names": [self.job_name]} これで終わり!PRを投げてPRを待ちます。 一週間もかからずApache Software Foundationメンバーの方からApproveをもらいました。 受け入れテスト 2週間後「 apache-airflow-providers-google 8.12.0rc1 をリリースされたので、リリースのテストをお願いします」の連絡がissueから来ました。 https://github.com/apache/airflow/issues/30427 8.12.0rc1をインストールし実際にCloudDataTransferServiceJobStatusSensorの動作を検証してみたら特に問題なかったので、うまく動いたよと返信しました。 数日後8.12.0が無事リリースされて、 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/index.html#id5 Support CloudDataTransferServiceJobStatusSensor without specifying a project_id (#30035) 修正がちゃんとリリースノートに書かれています。これでcoreにコミットしたわけではないですが、Apache Airflowのコミッターになりました。 感想 微力ながらずっとお世話になっているAirflowに貢献できてよかったです。理解を深めてモチベーション向上に繋がったのではないかと思います。 修正できるところまだまだたくさんありそうなので、今後も引き続きコミットしていきたいと思います。