AirflowからDataformにdata_interval_endなどのcontext変数を渡す方法

先日GCPのDataformがGAリリースされました。 せっかくなので、まずAirflowにある既存ワークフローの一部をDataformで書き換えようと思いました。 AirflowからDataformをトリッガーする ドキュメントを調べると、AirflowからDataformをトリッガーするoperatorはすでに存在しています。 https://cloud.google.com/dataform/docs/schedule-executions-composer#create_an_airflow_dag_that_schedules_workflow_invocations 簡単にまとめると DataformCreateCompilationResultOperator: sqlxをsqlにコンパイルする DataformCreateWorkflowInvocationOperator: sqlを実行する しかし、どのようにAirflowからDataformへ変数を渡すかについてはドキュメントに記載されていません。 Dataformに変数を渡す まず、Dataformの設定ファイルdataform.jsonに変数varsを追加しておきましょう。 { "defaultSchema": "dataform", "assertionSchema": "dataform_assertions", "warehouse": "bigquery", "defaultDatabase": "project-stg", "defaultLocation": "asia-northeast1", "vars": { "bq_suffix": "_stg", "execution_date": "2023-05-24" } } DataformCreateCompilationResultOperatorのソースを調べてみたところ、compilation_resultという引数があることを発見しました。 https://github.com/apache/airflow/blob/739e6b5d775412f987a3ff5fb71c51fbb7051a89/airflow/providers/google/cloud/operators/dataform.py#LL73C29-L73C46 compilation_resultの中身を確認するため、APIの詳細を調べました。 https://cloud.google.com/dataform/reference/rest/v1beta1/CodeCompilationConfig CodeCompilationConfig内にvarsという変数を指定できるようです。 { "defaultDatabase": string, "defaultSchema": string, "defaultLocation": string, "assertionSchema": string, "vars": { string: string, ... }, "databaseSuffix": string, "schemaSuffix": string, "tablePrefix": string } BigQueryのsuffixをcode_compilation_configのvarsへ渡してみたら問題なく実行できました。ちなみに、Dataform側からはdataform.projectConfig.vars.bq_suffixで変数を呼び出せます。 DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "vars": { "bq_suffix": "_stg", } }, }, ) Dataformにcontext変数を渡す 増分処理する際によくdata_interval_endなどのcontext変数を利用して当日の差分だけ取り入れます。 しかし、DataformCreateCompilationResultOperatorではtemplate_fieldsが実装されていないため、直接{{ data_interval_end }}のようなjinjaテンプレートを渡すことはできません。...

May 24, 2023 · Me

Apache Airflowのコミッターになった話

Google Providersのバグを見つけた 先日DAGを開発中にGoogle Providers (apache-airflow-providers-google==8.9.0)のCloudDataTransferServiceJobStatusSensorを使用したところ、 project_idはオプション引数であるにも関わらず、省略するとエラーが発生するというバグに遭遇しました。 [2023-03-09, 02:31:24 UTC] {taskinstance.py:1774} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 236, in execute while not self.poke(context): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py", line 91, in poke operations = hook.list_transfer_operations( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 380, in list_transfer_operations request_filter = self._inject_project_id(request_filter, FILTER, FILTER_PROJECT_ID) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 459, in _inject_project_id raise AirflowException( airflow.exceptions.AirflowException: The project id must be passed either as `project_id` key in `filter` parameter or as project_id extra in Google Cloud connection definition....

May 11, 2023 · Me

Airflowの単体テストを書きましょう

データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、品質の担保は言うまでもなく重要ですね。品質を確保するには、ワークフローの監視・検証、ワークフローのテスト、そして加工用クエリのテストがいずれも欠かせません。この記事では、ワークフロー(Airflow)の単体テスト方法について紹介します。また、ワークフローの監視・検証に関しては、過去の記事も合わせてご覧いただけると幸いです。 ワークフローの監視 ワークフローの検証 DAGの単体テスト まずは、DAGの単体テストについて説明します。厳密に言えば、DAGの実行ではなく、DAGが正確に構築されたかどうかのテストを行います。 https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#unit-tests Airflowの公式ベストプラクティスでは簡潔に紹介されていますが、具体例を挙げてさらに詳しく説明しましょう。 importのテスト importが正常にできることを確認する(importが失敗するとWeb UIからも確認できるが、単体テストする時点で確認するともっと便利) import時間を制限する。(冗長なDAGがあると解析するのに時間がかかるので、import時間を制限することで、事前に冗長なDAGを発見できる) 最低でも1つのタスクが含まれていることを確認する。 import unittest from datetime import timedelta from airflow.models import DagBag class TestImportDags(unittest.TestCase): IMPORT_TIMEOUT = 120 @classmethod def setUpClass(cls) -> None: cls.dagbag = DagBag() cls.stats = cls.dagbag.dagbag_stats def test_import_dags(self): self.assertFalse( len(self.dagbag.import_errors), f"DAG import failures. Errors: {self.dagbag.import_errors}", ) def test_import_dags_time(self): duration = sum((o.duration for o in self.stats), timedelta()).total_seconds() self.assertLess(duration, self.IMPORT_TIMEOUT) def test_dags_have_at_least_one_task(self): for key, dag in self.dagbag.dags.items(): self.assertTrue(dag, f"DAG {key} not exsit") self....

April 6, 2023 · Me

AirflowからAirbyteをトリッガーする際にハマるポイント

https://docs.airbyte.com/operator-guides/using-the-airflow-airbyte-operator/ AirflowからAirbyte Operatorを利用するための設定について、Airbyte公式の記事は既にわかりやすくまとめています。実際に試してみて、少しハマったところがあったので、その知見を共有したいと思います。 1. Airflowを2.3.0以上にアップグレードする必要がある apache-airflow-providers-airbyte[http]を利用するのにAirflowを2.3.0以上に上げないといけません。(apache-airflow-providers-airbyte[http]をdocker-composer.ymlの_PIP_ADDITIONAL_REQUIREMENTSに追加することも忘れずに) Cloud Composerなどを利用している場合、GUIからアップグレード可能です。 https://airflow.apache.org/docs/apache-airflow-providers-airbyte/stable/index.html version: '3' x-airflow-common: &airflow-common image: apache/airflow:2.3.4-python3.8 environment: &airflow-common-env PYTHONPATH: /opt/airflow/dags AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:password@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:password@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' # 追加 _PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-airbyte[http]==3.2.0 2. Airflowの古いバージョンから2.3.4上げるとdocker-composeがバグる airflow 2.2.xでは問題なく環境構築できていましたが、イメージをapache/airflow:2.3.4-python3.8に変更してdocker compose up airflow-initを実行したら怒られます。 You are running pip as root. Please use 'airflow' user to run pip! Airflowの古いdocker-composer.ymlのバグのようなので、 https://github.com/apache/airflow/pull/23517/files services -> airflow-init -> environmentに_PIP_ADDITIONAL_REQUIREMENTS: ''を追加すれば解決できます。 ... environment: <<: *airflow-common-env _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-password} # 追加 _PIP_ADDITIONAL_REQUIREMENTS: '' ....

March 6, 2023 · Me

Cloud Composerでmax_active_tasks_per_dagのデフォルト値が機能していない問題

問題 先日Cloud Composerの環境を↓にバージョンアップしました。 Cloud Composer 2.0.32 Airflow 2.2.5 core.max_active_tasks_per_dagという一つのDAG内同時に処理できるタスクの上限を設定するパラメータがデフォルト値16のままになっているのにも関わらず、実行するタスクの上限が明らかに16を超えています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag ローカルにあるAirflow 2.2.5環境では何の異常もなく、ComposerのAirflow Configurationを確認したところ、なぜかcore.dag_concurrencyが100に設定されています。 [core] dags_folder = /home/airflow/gcs/dags plugins_folder = /home/airflow/gcs/plugins executor = CeleryExecutor dags_are_paused_at_creation = True load_examples = False donot_pickle = True dagbag_import_timeout = 300 default_task_retries = 2 killed_task_cleanup_time = 3570 parallelism = 0 non_pooled_task_slot_count = 100000 dag_concurrency = 100 .... core.dag_concurrencyの役割はcore.max_active_tasks_per_dagと同じく、一つのDAG内同時に処理できるタスクの上限を設定しています。Airflow 2.2.0からはすでにDeprecatedになったはずなのに、なぜか残り続いています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-concurrency-deprecated 試み 手動で削除しようと思ったですけど、バージョンを上げたのでCloud Composer -> AIRFLOW CONFIGURATION OVERRIDESにcore.dag_concurrencyというパラメータすら存在しませんでした。 仕方なく、GCSから設定ファイルgs://asia-northeast1-colossus-wo-xxxxxxx-bucket/airflow.cfgを直接編集してみました。しかし、gcloud composer environments storage dags importを実行すると初期化が処理が実行され、core.dag_concurrencyが再び出てきました。 解決 デフォルト値ではなく、手動でcore.max_active_tasks_per_dagを明示的に16に指定すると、実行するタスクの上限が期待通りに動作しました。 ザクッとComposerのリリースノートを確認してこのバグまだ修正されていないようです。...

February 8, 2023 · Me

アラートを出す際にAirflowのContextから誤ったtask idが取得されてしまうバグの対処法

先日投稿した記事はAirflow DAGのon_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する方法を紹介しました。 Contextから誤ったtask idが取得されてしまう contextからdag_runの情報を取得してチャットツールやメールにアラートを出すのは一般的です。Slackにアラートを出す際の例ですが、dag_id, run_id, task_id, reason, log_urlを取得して、webhookでSlackの特定なチャンネルに投稿し、log_urlをクリックするだけですぐローカルあるいはクラウド環境(例えばCloud Composer)で失敗したtaskのログを確認できるので、アラート解消の効率化に繋がります。 ソースは以下となります。 from slack_sdk.webhook import WebhookClient from airflow.models import Variable from textwrap import dedent def notify_error(workflow: str, context: dict) -> None: webhook = WebhookClient(Variable.get("slack_webhook_access_token")) log_url = context.get("task_instance").log_url message = dedent( f""" :x: Task has failed. *Workflow*: {workflow} *DAG*: {context.get('task_instance').dag_id} *Run ID* {context.get('dag_run').run_id} *Task*: {context.get('task_instance').task_id} *Reason*: {context.get('reason')} <{log_url}| *Log URL*> """ ) webhook.send( text="alert", blocks=[ { "type": "section", "text": {"type": "mrkdwn", "text": message}, } ], ) しかし、数回検証してみた結果、実行が失敗したタスクtask_idではなく、誤ったtask_idが取得されてしまう事象がしばしば発生します。Airflowの既知バグで、現時点(2022....

November 21, 2022 · Me

Airflowのon_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する

https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 この記事では新しくDAGを作成してデータの転送処理が遅れているかを監視する方法が紹介されました。監視と実行用のDAGを分離することでスッキリにはなりますが、DAGが増えることによって管理の手間が生じます(特にDAGが大量にある場合)。 https://stackoverflow.com/questions/69312630/airflow-triggering-the-on-failure-callback-when-the-dagrun-timeout-is-exceed を参考にして DAGの引数on_failure_callbackとdagrun_timeoutを組み合わせることでDAGの遅延を監視する方法を試してみました。 from datetime import datetime, timedelta from airflow import DAG from airflow.models import TaskInstance from airflow.operators.bash import BashOperator def _on_dag_run_fail(context): print("***DAG failed!! do something***") print(f"The DAG failed because: {context['reason']}") print(context) default_args = { "owner": "mi_empresa" } with DAG( dag_id="failure_callback_example", start_date=datetime(2021, 9, 7), schedule_interval=None, default_args=default_args, catchup=False, on_failure_callback=_on_dag_run_fail, dagrun_timeout=timedelta(seconds=45), ) as dag: delayed = BashOperator( task_id="delayed", bash_command='echo "waiting..";sleep 60; echo "Done!!"', ) will_fail = BashOperator( task_id="will_fail", bash_command="exit 1" ) delayed >> will_fail 実行時間が45秒超えると、_on_dag_run_failが実行され失敗したDAGのコンテキスト情報がプリントされます。誰かの役に立つかもしれないので、試しながら気づいた2つのポイントを共有します...

November 17, 2022 · Me

Airflowで構築したワークフローを検証する

データ基盤の監視 データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、監視は言うまでもなく品質を担保するため重要な存在です。データ基盤監視の考え方についてこの2つの記事が紹介しています。 https://tech-blog.monotaro.com/entry/2021/08/24/100000 https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 同じくSQLによるデータ基盤を監視しており、最も大きな違いは自作ツールかAirflowで検証することだけです。本文はAirflowで構築したワークフローの検証についてもう少し紹介したいと思います。 まず、Data Pipelines Pocket Referenceではデータ基盤検証の原則が紹介されました。 Validate Early, Validate Often 要はできるだけ早く、できるだけ頻繁に検証するとのことです。ELTあるいはETL処理においては、Extract, Load, Transformそれぞれのステップが終了した直後に監視するのは最も理想的だと思います。 Transformはデータセットのコンテキストを把握しておかないと検証できないため、データセットごとに対応していく必要があります。ExtractとLoadはnon-contextで汎用的な検証ができるため、データ基盤構築の序盤からやっておいた方が安心だと思います。 Extractの検証 ストレージサービス(例えばGCS)をデータレイクにする場合、データソースからデータレイクにデータがちゃんとレプリケートされたかを検証するためにAirflowのairflow.providers.google.cloud.sensors.gcsを利用すると簡単にできます。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/gcs/index.html 一つのファイルをチェックする場合はほとんどないと思うので、GCSObjectExistenceSensorよりGCSObjectsWithPrefixExistenceSensorの方がもっと実用的でしょう。下記のタスクをExtractと次の処理の間に挟むと、障害の早期発見が期待できます。なお、Extract時点で既に障害が起きている場合はほとんどデータソース側の処理が失敗しているので、アプリケーション側と連携して作業する必要があります。 check_extract = GCSObjectsWithPrefixExistenceSensor( task_id="check_extract", bucket="{YOUR_BUCKET}", prefix="{YOUR_PREFIX}", ) Loadの検証 ELTとETL処理、Loadするタイミングは異なりますが、検証の方法(データがデータウェアハウスあるいはデータマートにロードされたか)は同じです。よく使われているデータウェアハウスサービスBigQueryだと、Airflowのairflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperatorを利用できます。 https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/contrib/operators/bigquery_check_operator/index.html#airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator 下記のタスクをLoad処理の後ろに追加すれば良いです。 check_load = BigQueryCheckOperator( task_id="check_load", sql={YOUR_SQL}, use_legacy_sql=False, params={ "bq_suffix": Variable.get("bq_suffix"), "dataset": setting.bq.dataset, "table": setting.bq.table_name, }, location="asia-northeast1", ) SQLは検証用のクエリとなっており、BigQueryのメタデータテーブルがよく用いられます。例えばこの記事が紹介されたクエリでテーブルが空になっているかを確認できます。 SELECT total_rows FROM ${dataset_id}.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '${table_name}' ORDER BY last_modified_time DESC LIMIT 1; GCPについて簡単に説明しましたが、AWSとAzureも似たようなことはできるはずです。 皆さんのワークフロー設計にご参考になれば幸いです。

November 1, 2022 · Me

AirflowのBigQueryInsertJobOperatorのオプションについて

先日「AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法」についての記事を書きましたが、BigQueryInsertJobOperatorにフォーマットのオプションを追加する際に、実際の挙動は公式ドキュメントの記述と若干違うので、追記しておきます。 なぜ気づいたのか 外部テーブルを取り組む際に、閉じるカッコが見つからないというエラーが出て、 Error detected while parsing row starting at position: 4204. Error: Missing close double quote (") character. 調査してみた結果、改行記号が含まれているのが原因のようでした。stack overflowによるとallowQuotedNewlinesといったCSVのオプションを追加すると解決できるらしいです。 https://stackoverflow.com/questions/65782572/how-to-avoid-missing-close-double-quote-character-error-in-google-bigquer https://qiita.com/sawanoboly/items/410b3346518e569da581 ただし、configurationにcsvOptionsを追加しても解決されず(同じエラーが出た)、困っていました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*.csv"], "autodetect": True, "sourceFormat": "CSV", "hivePartitioningOptions": { "mode": "AUTO", "sourceUriPrefix": "gs://my-bucket/biostats/", }, "csvOptions": { "allowQuotedNewlines": True }, } }, 解決法 CLI bq loadを利用す際にどうなっているかを確認したら、どうやらcsvOptionsを明示的に書く必要がなく--allow_quoted_newlinesだけ追加すとでうまくいきました。AirflowのBigQueryInsertJobOperatorで同じことをすると、問題なくloadできました。 configuration={ "load": { "destinationTable": { "projectId": PROJECT_ID, "datasetId": DATASET_NAME, "tableId": TABLE, }, "sourceUris": ["gs://my-bucket/biostats/*....

August 30, 2022 · Me

AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法

GCS外部パーティションテーブルをBigQueryに取り込む BigQueryでデータレイクを構築する際に、GCS、Google Drive Data、Bigtableから外部テーブルを取り組む場面は少なくないと思います。下記のような外部テーブルがパーティションで分けられている場合(Hive partitioned dataと呼ばれている)は少しややこしくなりますが、 gs://myBucket/myTable/dt=2019-10-31/lang=en/foo gs://myBucket/myTable/dt=2018-10-31/lang=fr/bar 公式ドキュメントは取り組み方をわかりやすくまとめてくれています。bqというcliでコマンド2行で解決できます。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#partition_schema_detection_modes テーブル定義を作成 bq mkdef \ --autodetect \ --source_format=CSV \ --hive_partitioning_mode=AUTO \ --hive_partitioning_source_uri_prefix=gs://your-bucket/your-table/ \ "gs://you-bucket/you-table/*.csv" > /tmp/MyTableDefFile テーブルを作成 bq mk --external_table_definition=/tmp/MyTableDefFile \ my_dataset.my_table Airflowで取り込むとどうなる? まず、簡単に背景を補足すると現在筆者が構築しているデータ基盤はCloud Composer (Airflow)でワークフローを管理しており、AirflowでGCSから外部テーブルを取り込むといった需要があります。パーティションではない普通のテーブルであれば、BigQueryCreateExternalTableOperator利用すると一発で解決できます。 しかし、パーティションのある場合はAirflowから事前用意された関数がなく(2022年7月時点)、他の方法を考えないといけないです。 GCP公式ドキュメントもConosole, bq, API, Java 4つの方法しか書いておらず、PythonのSDKはまだ実装されていないようです。 https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#creating_an_external_table_for_hive_partitioned_data BigQueryInsertJobOperatorを利用 いろいろ調査した結果、筆者と同じことに困っている人がいたようです。 https://github.com/apache/airflow/issues/13626 その下に Have you tried to use BigQueryInsertJobOperator? for example, see: #13598 「BigQueryInsertJobOperatorを使ってみたらどう?」というコメントがありました。 さらにリンク先のPR(なぜかクローズされた)に行くと、BigQueryInsertJobOperatorの例をまとめてもらっています。どうやら専用のSDKがない場合BigQueryInsertJobOperatorを使うのは普通らしいです。引数もやや雑で、そのままBigQueryのREST APIに叩くのと近い感じです。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/bigquery/index.html#airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator https://github.com/apache/airflow/pull/13598/files しかしながら、上記のPRは外部パーティションテーブルを取り組む例がなかったため、BigQuery APIのドキュメントを調べて、hivePartitioningOptionsというオプションを見つけました。これを追加すると、もしかしてうまくいくのではないかと思い、早速検証してみました。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions データを用意 https://people.math.sc.edu/Burkardt/datasets/csv/csv.html から適当にcsvファイルをダウロードします。 biostats1.csv "Name", "Sex", "Age", "Height (in)", "Weight (lbs)" "Alex", "M", 41, 74, 170 "Bert", "M", 42, 68, 166 "Carl", "M", 32, 70, 155 "Dave", "M", 39, 72, 167 "Elly", "F", 30, 66, 124 "Fran", "F", 33, 66, 115 "Gwen", "F", 26, 64, 121 "Hank", "M", 30, 71, 158 "Ivan", "M", 53, 72, 175 "Jake", "M", 32, 69, 143 "Kate", "F", 47, 69, 139 "Luke", "M", 34, 72, 163 "Myra", "F", 23, 62, 98 "Neil", "M", 36, 75, 160 "Omar", "M", 38, 70, 145 "Page", "F", 31, 67, 135 "Quin", "M", 29, 71, 176 "Ruth", "F", 28, 65, 131 biostats1....

July 21, 2022 · Me