先日GCPのDataformがGAリリースされました。 せっかくなので、まずAirflowにある既存ワークフローの一部をDataformで書き換えようと思いました。

AirflowからDataformをトリッガーする

ドキュメントを調べると、AirflowからDataformをトリッガーするoperatorはすでに存在しています。 https://cloud.google.com/dataform/docs/schedule-executions-composer#create_an_airflow_dag_that_schedules_workflow_invocations

簡単にまとめると

  1. DataformCreateCompilationResultOperator: sqlxをsqlにコンパイルする
  2. DataformCreateWorkflowInvocationOperator: sqlを実行する

しかし、どのようにAirflowからDataformへ変数を渡すかについてはドキュメントに記載されていません。

Dataformに変数を渡す

まず、Dataformの設定ファイルdataform.jsonに変数varsを追加しておきましょう。

{
  "defaultSchema": "dataform",
  "assertionSchema": "dataform_assertions",
  "warehouse": "bigquery",
  "defaultDatabase": "project-stg",
  "defaultLocation": "asia-northeast1",
  "vars": {
    "bq_suffix": "_stg",
    "execution_date": "2023-05-24"
  }
}

DataformCreateCompilationResultOperatorのソースを調べてみたところ、compilation_resultという引数があることを発見しました。

https://github.com/apache/airflow/blob/739e6b5d775412f987a3ff5fb71c51fbb7051a89/airflow/providers/google/cloud/operators/dataform.py#LL73C29-L73C46

compilation_resultの中身を確認するため、APIの詳細を調べました。 https://cloud.google.com/dataform/reference/rest/v1beta1/CodeCompilationConfig

CodeCompilationConfig内にvarsという変数を指定できるようです。

{
  "defaultDatabase": string,
  "defaultSchema": string,
  "defaultLocation": string,
  "assertionSchema": string,
  "vars": {
    string: string,
    ...
  },
  "databaseSuffix": string,
  "schemaSuffix": string,
  "tablePrefix": string
}

BigQueryのsuffixをcode_compilation_configvarsへ渡してみたら問題なく実行できました。ちなみに、Dataform側からはdataform.projectConfig.vars.bq_suffixで変数を呼び出せます。

DataformCreateCompilationResultOperator(
    task_id="create_compilation_result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result={
        "git_commitish": GIT_COMMITISH,
        "code_compilation_config": {
            "vars": {
                "bq_suffix": "_stg",
            }
        },
    },
)

Dataformにcontext変数を渡す

増分処理する際によくdata_interval_endなどのcontext変数を利用して当日の差分だけ取り入れます。 しかし、DataformCreateCompilationResultOperatorではtemplate_fieldsが実装されていないため、直接{{ data_interval_end }}のようなjinjaテンプレートを渡すことはできません。

TaskFlowDataformCreateCompilationResultOperatorをラッピングすれば前述の問題を解決できますdata_interval_endcontextから取得します。ポイントとしてはDataformCreateCompilationResultOperatorを返す際にexecute()を呼び出す必要があります。

from airflow.decorators import task

@task()
def create_compilation_result(**context):
    execute_date = (
        context["data_interval_end"].in_timezone("Asia/Tokyo").strftime("%Y-%m-%d")
    )
    return DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": {
                "vars": {
                    "execute_date": execute_date,
                    "bq_suffix": Variable.get("bq_suffix"),
                }
            },
        },
    ).execute(context=context)

最終的なDAGは以下のようになります。

from datetime import datetime

from airflow import models
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform_demo"
PROJECT_ID = "project-stg"
REPOSITORY_ID = "dataform-demo"
REGION = "asia-northeast1"
GIT_COMMITISH = "main"

@task()
def create_compilation_result(**context):
    execute_date = (
        context["data_interval_end"].in_timezone("Asia/Tokyo").strftime("%Y-%m-%d")
    )
    return DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": {
                "vars": {
                    "execute_date": execute_date,
                    "bq_suffix": Variable.get("bq_suffix"),
                }
            },
        },
    ).execute(context=context)

with models.DAG(
    DAG_ID,
    schedule_interval="@once",
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["dataform"],
) as dag:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id="create_workflow_invocation",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

    create_compilation_result() >> create_workflow_invocation

以上、AirflowからDataformにdata_interval_endなどのcontext変数を渡す方法でした。