先日GCPのDataformがGAリリースされました。 せっかくなので、まずAirflowにある既存ワークフローの一部をDataformで書き換えようと思いました。
AirflowからDataformをトリッガーする
ドキュメントを調べると、AirflowからDataformをトリッガーするoperatorはすでに存在しています。 https://cloud.google.com/dataform/docs/schedule-executions-composer#create_an_airflow_dag_that_schedules_workflow_invocations
簡単にまとめると
DataformCreateCompilationResultOperator
: sqlxをsqlにコンパイルするDataformCreateWorkflowInvocationOperator
: sqlを実行する
しかし、どのようにAirflowからDataformへ変数を渡すかについてはドキュメントに記載されていません。
Dataformに変数を渡す
まず、Dataformの設定ファイルdataform.jsonに変数varsを追加しておきましょう。
{
"defaultSchema": "dataform",
"assertionSchema": "dataform_assertions",
"warehouse": "bigquery",
"defaultDatabase": "project-stg",
"defaultLocation": "asia-northeast1",
"vars": {
"bq_suffix": "_stg",
"execution_date": "2023-05-24"
}
}
DataformCreateCompilationResultOperatorのソースを調べてみたところ、compilation_resultという引数があることを発見しました。
compilation_resultの中身を確認するため、APIの詳細を調べました。 https://cloud.google.com/dataform/reference/rest/v1beta1/CodeCompilationConfig
CodeCompilationConfig
内にvars
という変数を指定できるようです。
{
"defaultDatabase": string,
"defaultSchema": string,
"defaultLocation": string,
"assertionSchema": string,
"vars": {
string: string,
...
},
"databaseSuffix": string,
"schemaSuffix": string,
"tablePrefix": string
}
BigQueryのsuffixをcode_compilation_config
のvars
へ渡してみたら問題なく実行できました。ちなみに、Dataform側からはdataform.projectConfig.vars.bq_suffix
で変数を呼び出せます。
DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": {
"vars": {
"bq_suffix": "_stg",
}
},
},
)
Dataformにcontext変数を渡す
増分処理する際によくdata_interval_end
などのcontext変数を利用して当日の差分だけ取り入れます。
しかし、DataformCreateCompilationResultOperator
ではtemplate_fields
が実装されていないため、直接{{ data_interval_end }}
のようなjinjaテンプレートを渡すことはできません。
TaskFlowでDataformCreateCompilationResultOperator
をラッピングすれば前述の問題を解決できます。data_interval_end
はcontext
から取得します。ポイントとしてはDataformCreateCompilationResultOperator
を返す際にexecute()
を呼び出す必要があります。
from airflow.decorators import task
@task()
def create_compilation_result(**context):
execute_date = (
context["data_interval_end"].in_timezone("Asia/Tokyo").strftime("%Y-%m-%d")
)
return DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": {
"vars": {
"execute_date": execute_date,
"bq_suffix": Variable.get("bq_suffix"),
}
},
},
).execute(context=context)
最終的なDAGは以下のようになります。
from datetime import datetime
from airflow import models
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform_demo"
PROJECT_ID = "project-stg"
REPOSITORY_ID = "dataform-demo"
REGION = "asia-northeast1"
GIT_COMMITISH = "main"
@task()
def create_compilation_result(**context):
execute_date = (
context["data_interval_end"].in_timezone("Asia/Tokyo").strftime("%Y-%m-%d")
)
return DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": {
"vars": {
"execute_date": execute_date,
"bq_suffix": Variable.get("bq_suffix"),
}
},
},
).execute(context=context)
with models.DAG(
DAG_ID,
schedule_interval="@once",
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["dataform"],
) as dag:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create_workflow_invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result() >> create_workflow_invocation
以上、AirflowからDataformにdata_interval_endなどのcontext変数を渡す方法でした。