AirflowからAirbyteをトリッガーする際にハマるポイント

https://docs.airbyte.com/operator-guides/using-the-airflow-airbyte-operator/ AirflowからAirbyte Operatorを利用するための設定について、Airbyte公式の記事は既にわかりやすくまとめています。実際に試してみて、少しハマったところがあったので、その知見を共有したいと思います。 1. Airflowを2.3.0以上にアップグレードする必要がある apache-airflow-providers-airbyte[http]を利用するのにAirflowを2.3.0以上に上げないといけません。(apache-airflow-providers-airbyte[http]をdocker-composer.ymlの_PIP_ADDITIONAL_REQUIREMENTSに追加することも忘れずに) Cloud Composerなどを利用している場合、GUIからアップグレード可能です。 https://airflow.apache.org/docs/apache-airflow-providers-airbyte/stable/index.html version: '3' x-airflow-common: &airflow-common image: apache/airflow:2.3.4-python3.8 environment: &airflow-common-env PYTHONPATH: /opt/airflow/dags AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:password@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:password@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' # 追加 _PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-airbyte[http]==3.2.0 2. Airflowの古いバージョンから2.3.4上げるとdocker-composeがバグる airflow 2.2.xでは問題なく環境構築できていましたが、イメージをapache/airflow:2.3.4-python3.8に変更してdocker compose up airflow-initを実行したら怒られます。 You are running pip as root. Please use 'airflow' user to run pip! Airflowの古いdocker-composer.ymlのバグのようなので、 https://github.com/apache/airflow/pull/23517/files services -> airflow-init -> environmentに_PIP_ADDITIONAL_REQUIREMENTS: ''を追加すれば解決できます。 ... environment: <<: *airflow-common-env _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-password} # 追加 _PIP_ADDITIONAL_REQUIREMENTS: '' ....

March 6, 2023 · Me

Cloud Composerでmax_active_tasks_per_dagのデフォルト値が機能していない問題

問題 先日Cloud Composerの環境を↓にバージョンアップしました。 Cloud Composer 2.0.32 Airflow 2.2.5 core.max_active_tasks_per_dagという一つのDAG内同時に処理できるタスクの上限を設定するパラメータがデフォルト値16のままになっているのにも関わらず、実行するタスクの上限が明らかに16を超えています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag ローカルにあるAirflow 2.2.5環境では何の異常もなく、ComposerのAirflow Configurationを確認したところ、なぜかcore.dag_concurrencyが100に設定されています。 [core] dags_folder = /home/airflow/gcs/dags plugins_folder = /home/airflow/gcs/plugins executor = CeleryExecutor dags_are_paused_at_creation = True load_examples = False donot_pickle = True dagbag_import_timeout = 300 default_task_retries = 2 killed_task_cleanup_time = 3570 parallelism = 0 non_pooled_task_slot_count = 100000 dag_concurrency = 100 .... core.dag_concurrencyの役割はcore.max_active_tasks_per_dagと同じく、一つのDAG内同時に処理できるタスクの上限を設定しています。Airflow 2.2.0からはすでにDeprecatedになったはずなのに、なぜか残り続いています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-concurrency-deprecated 試み 手動で削除しようと思ったですけど、バージョンを上げたのでCloud Composer -> AIRFLOW CONFIGURATION OVERRIDESにcore.dag_concurrencyというパラメータすら存在しませんでした。 仕方なく、GCSから設定ファイルgs://asia-northeast1-colossus-wo-xxxxxxx-bucket/airflow.cfgを直接編集してみました。しかし、gcloud composer environments storage dags importを実行すると初期化が処理が実行され、core.dag_concurrencyが再び出てきました。 解決 デフォルト値ではなく、手動でcore.max_active_tasks_per_dagを明示的に16に指定すると、実行するタスクの上限が期待通りに動作しました。 ザクッとComposerのリリースノートを確認してこのバグまだ修正されていないようです。...

February 8, 2023 · Me

Cloud SQLにある大量なテーブルをBigQueryに入れる話

経緯 こんにちは、データエンジニアのjcです。 昨年度から大規模なデータ分析基盤の構築に携わっています。最近Cloud SQLにある6つのDBの数百個のテーブルを日次洗い替えでBigQueryにあるデータ基盤に入れるタスクを取り組んでいます。 Cloud SQLとBigQuery両方ともGCPのサービスのため、federated queriesを利用すると簡単にできそうに見えますが、 https://cloud.google.com/bigquery/docs/federated-queries-intro 実際に行ってみると、以下の3つの課題を気づきました。 BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない プロダクトの進化とともにテーブル・カラムが頻繁に作成・変更されるため、BigQuery側でも対応しないといけない Cloud SQLにあるテーブルの定義をそのまま取ってきてもBigQueryではMySQLとPostgreSQLの一部の型が対応されていない https://cloud.google.com/bigquery/docs/tables#sql_1 少し苦労していましたが、幸い解決方法を見つけました。 今後躓く方もいるかもしれないので、知見を共有したいと思います。 BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない BigQueryはクエリの結果によってテーブルを作成できるので、事前にテーブルを作っておく必要がありません。 https://cloud.google.com/bigquery/docs/tables#sql_1 大量なテーブルを一々作成するのは現実的ではない課題の解決法としてはDBのメタ情報(descriptionを含めて)をそのまま生かしてテーブル作成用クエリを生成し、テーブルを作成します。 例えばPostgreSQLの場合、まずはテーブルのメタ情報 SELECT schemaname, relname AS table_name, obj_description(relid) AS description FROM pg_catalog.pg_statio_all_tables WHERE schemaname = '{YOUR_SCHEMA}' とカラムのメタ情報を取得します。 SELECT c.table_schema, c.table_name, c.column_name, c.data_type, pgd.description FROM pg_catalog.pg_statio_all_tables AS st INNER JOIN pg_catalog.pg_description pgd on ( pgd.objoid = st.relid ) RIGHT JOIN information_schema.columns c ON ( pgd.objsubid = c.ordinal_position and c.table_schema = st.schemaname and c....

February 2, 2023 · Me

M1 MacでDocker DesktopからRancher Desktopに移行

https://www.docker.com/pricing/october-2022-pricing-change-faq/ The list price of the Docker Business subscription will go up by $3, to $24 per user per month 2022年10月のお知らせですが、Docker Desktop Business subscriptionがなんと8倍値上げ!! コスト面の理由でRancher Desktopに移行することになりました 移行する際に、Rancher Desktopの2つバグを見つけました。これから躓く人もいると思うので、一旦バグ内容と解決法を共有します。 バグ1:volumesをマウントする際にchownからpermission deniedエラーが出る https://github.com/rancher-sandbox/rancher-desktop/issues/1209 issue自体はまだ解決されていない(2023年1月)ですが、 ~/Library/Application\ Support/rancher-desktop/lima/_config/override.yamlに下記の設定を追加すれば回避できます。 mountType: 9p mounts: - location: "~" 9p: securityModel: mapped-xattr cache: "mmap" バグ2:M1 MacはMonterey 12.4以上に上げないと、割り当てられるメモリは最大3GBになる Rancher DesktopのGUIからメモリを32GBに設定したにもかかわらず、 docker infoで確認すると、CPUは設定通りですが、メモリは2.9GiBしか割り当てられていませんでした。 Architecture: aarch64 CPUs: 6 Total Memory: 2.909GiB Name: lima-rancher-desktop https://github.com/rancher-sandbox/rancher-desktop/issues/2855 Rancher Desktopがlimaという仮想マシンを利用しているので、どうやらMonterey 12.4に上げないといけません。 解決 arm64の対応がまだ難しそうなので、他の方法を考えました。 minikubeを使うとDocker DesktopあるいはRancher Desktopを経由せず、Dockerエンジンをインストールする方法もあります。 しかしM1 Mac(arm64)はhyperkitのインストールがうまくいきませんでした。 https://dhwaneetbhatt....

January 19, 2023 · Me

dbtでBigQuery上に構築したデータ基盤のメタデータ管理

データ基盤におけるETL/ELT開発のT(Transform)を担うツールdbtは最近注目を浴びています。dbtでデータモデリングする方法既に多く紹介されたので、この記事では手を動かしながらdbtでBigQuery上に構築したデータ基盤のメタデータを管理する方法を紹介します。 環境構築 dbt公式はHomebrewを推していますが、ローカル環境が汚染されるのをなるべく避けたいので、Dockerで環境構築します。 dbtのプロジェクトとプロファイルの設定ファイルを用意しておかないと、公式のドキュメントそのまま実行したらコケます。しかし設定ファイルの生成は環境を構築する必要があるので無限ループになっています。 https://docs.getdbt.com/docs/get-started/docker-install そのため、公式のサンプルプロジェクトをforkし、事前にローカル環境で生成した設定ファイルを追加しました。 https://github.com/aibazhang/dbt-metadata-management profiles.ymlを編集 {YOUR_DATASET_NAME}と{YOUR_PROJECT_ID}を置き換えます。 複数のデータセットのメタデータも作成可能ですが、一旦任意のデータセット名を指定する必要があります。dbtの問題点でもありますが、後ほど説明します。 Dockerイメージをプル docker pull ghcr.io/dbt-labs/dbt-bigquery:1.2.0 コンテナを立ち上げる git clone https://github.com/aibazhang/dbt-metadata-management cd dbt-metadata-management gcloud認証 認証済みの場合、このステップは不要です。 gcloud auth login --no-launch-browser gcloud auth application-default login --no-launch-browser コンテナを立ち上げる docker run --rm \ --network=host \ --platform linux/amd64 \ --mount type=bind,source=`PWD`,target=/usr/app \ --mount type=bind,source=`PWD`/profiles.yml,target=/root/.dbt/profiles.yml \ --mount type=bind,source=$HOME/.config/gcloud/application_default_credentials.json,target=/root/.config/gcloud/application_default_credentials.json \ ghcr.io/dbt-labs/dbt-bigquery:1.2.0 \ ls データモデルのリストが表示されたら、環境構築が無事終了です。 ドキュメントを生成する docs generate 以下のコマンドを実行すれば、models/配下のクエリとメタデータ(yamlファイル)を参照して、target/配下にメタデータのドキュメントが生成されます。 docker run --rm \ --network=host \ --platform linux/amd64 \ --mount type=bind,source=`PWD`,target=/usr/app \ --mount type=bind,source=`PWD`/profiles....

December 11, 2022 · Me

PythonデコレータのSyntactic Sugarなぜ便利かを理解した

Pythonデコレータを利用する場合、@decoratorというSyntactic Sugarを関数やメソッドの先頭に付けるのが一般的ですが、なぜ便利なのかいまいち理解できていないので、調べてみました。 デコレータの詳細は公式ドキュメントあるいは他の方がすでに紹介されているので、本記事では割愛します。ちなみにおすすめの記事はこちらです。 https://rednafi.github.io/digressions/python/2020/05/13/python-decorators まず適当に文字列の両側に<b>を追加してくれる簡単なデコレータを書きましょう。num_bは片方に追加する<b>の数を表しており、デフォルトは1となっています。 from functools import partial, wraps class Emphasis: def __init__(self) -> None: pass def add_b(self, func=None, num_b=1): if func is None: return partial(self.add_b, num_b=num_b) @wraps(func) def wrap(*args, **kwargs): ret = func(*args, **kwargs) return "<b>" * num_b + ret + "<b>" * num_b return wrap デコレータ引数なし・関数引数なしの場合 最初は一番簡単なパターンで見ていきましょう。デコレータep.add_b(hello)の返り値は関数なので、一回コール()すればSyntactic Sugarと同じことができるので、むしろSyntactic Sugarを使わないほうがわかりすそうですね。 ep = Emphasis() def hello(): return "Hello, There" @ep.add_b def hello_with_sugar(): return "Hello, There" print(ep.add_b(hello)()) print(hello_with_sugar()) <b>Hello, There<b> <b>Hello, There<b> デコレータ引数あり・関数引数ありの場合 しかしデコレータと関数両方引数がある場合はep....

November 23, 2022 · Me

アラートを出す際にAirflowのContextから誤ったtask idが取得されてしまうバグの対処法

先日投稿した記事はAirflow DAGのon_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する方法を紹介しました。 Contextから誤ったtask idが取得されてしまう contextからdag_runの情報を取得してチャットツールやメールにアラートを出すのは一般的です。Slackにアラートを出す際の例ですが、dag_id, run_id, task_id, reason, log_urlを取得して、webhookでSlackの特定なチャンネルに投稿し、log_urlをクリックするだけですぐローカルあるいはクラウド環境(例えばCloud Composer)で失敗したtaskのログを確認できるので、アラート解消の効率化に繋がります。 ソースは以下となります。 from slack_sdk.webhook import WebhookClient from airflow.models import Variable from textwrap import dedent def notify_error(workflow: str, context: dict) -> None: webhook = WebhookClient(Variable.get("slack_webhook_access_token")) log_url = context.get("task_instance").log_url message = dedent( f""" :x: Task has failed. *Workflow*: {workflow} *DAG*: {context.get('task_instance').dag_id} *Run ID* {context.get('dag_run').run_id} *Task*: {context.get('task_instance').task_id} *Reason*: {context.get('reason')} <{log_url}| *Log URL*> """ ) webhook.send( text="alert", blocks=[ { "type": "section", "text": {"type": "mrkdwn", "text": message}, } ], ) しかし、数回検証してみた結果、実行が失敗したタスクtask_idではなく、誤ったtask_idが取得されてしまう事象がしばしば発生します。Airflowの既知バグで、現時点(2022....

November 21, 2022 · Me

Airflowのon_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する

https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 この記事では新しくDAGを作成してデータの転送処理が遅れているかを監視する方法が紹介されました。監視と実行用のDAGを分離することでスッキリにはなりますが、DAGが増えることによって管理の手間が生じます(特にDAGが大量にある場合)。 https://stackoverflow.com/questions/69312630/airflow-triggering-the-on-failure-callback-when-the-dagrun-timeout-is-exceed を参考にして DAGの引数on_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する方法を試してみました。 from datetime import datetime, timedelta from airflow import DAG from airflow.models import TaskInstance from airflow.operators.bash import BashOperator def _on_dag_run_fail(context): print("***DAG failed!! do something***") print(f"The DAG failed because: {context['reason']}") print(context) default_args = { "owner": "mi_empresa" } with DAG( dag_id="failure_callback_example", start_date=datetime(2021, 9, 7), schedule_interval=None, default_args=default_args, catchup=False, on_failure_callback=_on_dag_run_fail, dagrun_timeout=timedelta(seconds=45), ) as dag: delayed = BashOperator( task_id="delayed", bash_command='echo "waiting..";sleep 60; echo "Done!!"', ) will_fail = BashOperator( task_id="will_fail", bash_command="exit 1" ) delayed >> will_fail 実行時間が45秒超えると、_on_dag_run_failが実行され失敗したDAGのコンテキスト情報がプリントされます。誰かの役に立つかもしれないので、試しながら気づいた2つのポイントを共有します...

November 17, 2022 · Me

Telegramで多言語ニュースチャットボットを作った話

先日多言語Webニュースアプリを作りました。 通勤電車で当日のニュースをチェックしながら、外国語を勉強できるので、まあまあ使いやすかったです。 実はこのWebを開発する1年ほど前に、一度Telegram botで多言語ニュースチャットボットを開発していました。今日はそれについて紹介したいと思います。 https://github.com/aibazhang/multitrue-bot 事前準備 Telegram bot Telegramは日本ではあまり人気がないですが、Telegramのアカウントを作るだけでAPIキーを取得できるので、ざくっとボット作りたい場合は使い勝手がけっこういいです。また、良さげのSDKもあります↓。そういう意味ではSlackも同じですが、Telegramは感覚的にLineに近いので手軽さで勝っていると思います。 https://github.com/python-telegram-bot/python-telegram-bot/tree/master/examples ニュースの取得 ニュースの取得方法は先日開発したWebアプリと同じくNewsAPI利用して取得しています。なぜNewsAPIを採用したかについては半年前の記事で詳細を説明したので、割愛します。 実装 https://core.telegram.org/ に参照してボットの初期設定が完了したら、実装に入ります。 フローは以下となります。 /startでニュースボットを起動 国・地域を選ぶ ニュースのジャンルを選ぶ 終了あるいは2.に戻る 全体フロー SDKが提供してくれた下記3つのhandlerを利用しています。 ConversationHandler: 先ほど設計したフローに基づいてhandlerを定義する。 CommandHandler: コマンドによって発火される。今回は「入り口 (entry point)」として使う CallbackQueryHandler: 他のhandlerの返り値によって発火される def main(): updater = Updater( token=json.load(open(KEY_PATH / "keys.json", "r"))["telegram_key"], use_context=True, ) dispatcher = updater.dispatcher country_pattern = "^us|jp|cn|tw|kr|gb$" headlines_pattern = "^us|jp|cn|tw|kr|gb business|entertainment|general|health|science|sports|technology|$" conv_handler = ConversationHandler( entry_points=[CommandHandler("start", start)], states={ "CATEGORY": [CallbackQueryHandler(select_category, pattern=country_pattern)], "HEADLINES": [CallbackQueryHandler(get_news, pattern=headlines_pattern)], "START OVER OR NOT": [ CallbackQueryHandler(start_over, pattern="^start over$"), CallbackQueryHandler(end, pattern="^end$"), ], }, fallbacks=[CommandHandler("start", start)], ) dispatcher....

November 3, 2022 · Me

GitHub Issueだけで自分のルーティングを管理し、そして草を生やす

先日シェルスクリプトで個人ナレッジマネジメントツールを作った話しを投稿して、予想以上に需要がありました。 ルーティングをGitHub Posterに生成 似たような発想でルーティング管理アプリを使わずに、GitHubのcontributionsのように自分のルーティング(例えば読書、ランニング、LeetCode、外国語の勉強)を管理できると面白くない?と思いながら、GitHub上で検索したらyihong0618さんが開発したGitHubPosterを発見しました。 https://github.com/yihong0618/GitHubPoster GitHub Isssue、Duolingo、Twitter、Kindleなど20個以上のAPIで履歴を取得し、GitHub svg poster(aka. 皆さんが大好きなGitHubの草)を生成します。 実際使ってみよう ローダーは20個以上あり、とりあえずissueで今年年始以来の読書ルーティングの草を生やしてみました。 Issueを書く issueフォーマットは↓に従う必要でがあります。 {整数} {内容} 今年は1月から、日課をこなした日に当該Issueに↓のコメント追加していました。 2 「データ指向アプリケーションデザイン」 環境構築 pip install -U 'github_poster[all]' 実行 GitHubをトークンを取得し、下記コマンドを実行するだけです。 github_poster issue --issue_number ${issue_number} --repo_name ${repo_name} --token ${github_token} また、オプション --special-color1 --special-color2 ---stand-with-ukraine などによって色を指定することも可能です。(ウクライナをサポートする配色もあるようですね) 結果 最終的に生成されたGitHub Poster(.svgファイル)はこんな感じです。単位はtimes(回数)になっているが、hours(時間)が正しいです。 確認してみると、 今年今まで336時間読書していて、週の真ん中あまり本を読んでいないと気づきました。 使った感想 余計なモバイルアプリを使わずに、ルーティング管理できるのはミニマムリスト的には最高 生成されたファイルは.svgなので、自分のサイトや他のところに取り入れるのも簡単そう 新しいローダーの開発に貢献してみたい

November 2, 2022 · Me