経緯

こんにちは、データエンジニアのjcです。 昨年度から大規模なデータ分析基盤の構築に携わっています。最近Cloud SQLにある6つのDBの数百個のテーブルを日次洗い替えでBigQueryにあるデータ基盤に入れるタスクを取り組んでいます。 Cloud SQLとBigQuery両方ともGCPのサービスのため、federated queriesを利用すると簡単にできそうに見えますが、

https://cloud.google.com/bigquery/docs/federated-queries-intro

実際に行ってみると、以下の3つの課題を気づきました。

  1. BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない
  2. プロダクトの進化とともにテーブル・カラムが頻繁に作成・変更されるため、BigQuery側でも対応しないといけない
  3. Cloud SQLにあるテーブルの定義をそのまま取ってきてもBigQueryではMySQLとPostgreSQLの一部の型が対応されていない

https://cloud.google.com/bigquery/docs/tables#sql_1

少し苦労していましたが、幸い解決方法を見つけました。 今後躓く方もいるかもしれないので、知見を共有したいと思います。

BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない

BigQueryはクエリの結果によってテーブルを作成できるので、事前にテーブルを作っておく必要がありません。 https://cloud.google.com/bigquery/docs/tables#sql_1

大量なテーブルを一々作成するのは現実的ではない課題の解決法としてはDBのメタ情報(descriptionを含めて)をそのまま生かしてテーブル作成用クエリを生成し、テーブルを作成します。

例えばPostgreSQLの場合、まずはテーブルのメタ情報

SELECT 
    schemaname, 
    relname AS table_name, 
    obj_description(relid) AS description
FROM pg_catalog.pg_statio_all_tables
WHERE schemaname = '{YOUR_SCHEMA}'

とカラムのメタ情報を取得します。

SELECT
    c.table_schema,
    c.table_name,
    c.column_name,
    c.data_type,
    pgd.description
FROM pg_catalog.pg_statio_all_tables AS st
INNER JOIN pg_catalog.pg_description pgd on (
    pgd.objoid = st.relid
)
RIGHT JOIN information_schema.columns c ON (
    pgd.objsubid   = c.ordinal_position and
    c.table_schema = st.schemaname and
    c.table_name   = st.relname
)
WHERE c.table_schema = '{YOUR_SCHEMA}';

上記のメタ情報をfor文で回してテーブル作成用のクエリテンプレートの$columns $pg_schema $table_nameに入れたら、テーブル作成用のクエリが生成されます。Airflowとかで定期的にクエリを実行すれば問題解決です。

from textwrap import dedent
from string import Template

query_template = Template(
    dedent(
        """
        DROP TABLE IF EXISTS {YOUR_BQ_DATASET}.{YOUR_TABLE_NAME};
        CREATE TABLE
        {YOUR_BQ_DATASET}.{YOUR_TABLE_NAME} AS (
            SELECT
                *
            FROM
            EXTERNAL_QUERY("{YOUR_CONNECTION}",
            \"\"\" SELECT $columns FROM $pg_schema.$table_name \"\"\"));
        """
    )
)

また、機会があれば別記事で紹介しますが、もう少し工夫すれば、テーブルとカラムのdescriptionもBigQueryに入れることが可能です。

https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#alter_table_set_options_statement

プロダクトの進化とともにテーブル・カラムが頻繁に作成・変更されるため、BigQuery側でも対応しないといけない

上記のクエリテンプレートを利用すればこの課題も解決できます。しかし、毎回テーブルを作り直しているため、予想外な通信エラーなどによってCREATE文の実行が失敗し、DROP文だけ実行された場合、テーブルが消えます。BigQueryのTransactionsを使えばいいじゃんって言われそうなところでしたが、 https://cloud.google.com/bigquery/docs/reference/standard-sql/transactions

残念ながら、外部クエリEXTERNAL_QUERYを利用している場合、Transactionsを使えません。 回避策としては、DROPをする前に一回SELECTで外部クエリをテストすると良いでしょう。 クエリテンプレートはこんな感じになりました。

from textwrap import dedent
from string import Template

init_query_temp = Template(
    dedent(
        """
        SELECT
            *
        FROM
        EXTERNAL_QUERY("{connection}",
        \"\"\" SELECT $columns FROM $pg_schema.$table_name LIMIT 1 \"\"\");
        DROP TABLE IF EXISTS {YOUR_BQ_DATASET}.{YOUR_TABLE_NAME};
        CREATE TABLE {YOUR_BQ_DATASET}.{YOUR_TABLE_NAME} AS (
            SELECT
                *
            FROM
            EXTERNAL_QUERY("{connection}",
            \"\"\" SELECT $columns FROM $pg_schema.$table_name \"\"\"));
        """
    )
)

Transactionsと比べたら確実な方法ではないですが、現時点ではもっと良さそうな方法を見つけていません。

BigQueryで対応されていないデータ型がある

そもそもCloudSQLとBigQueryの用途が異なるため、BigQueryで対応されていないデータ型が存在するのも当たり前ですね。

https://cloud.google.com/bigquery/docs/reference/standard-sql/federated_query_functions#unsupported_mysql_and_postgresql_data_types

Unsupported type: money, time with time zone, inet, path, pg_lsn, point, polygon, tsquery, tsvector, txid_snapshot, uuid, box, cidr, circle, interval, jsonb, line, lseg, macaddr, macaddr8

全ての非対応型を文字列に変換するというシンプルな方法で解決できます。 pandasで実装したスクリプトを共有しておきます。

import bigquery
from textwrap import dedent
from string import Template

BIGQUERY_UNSUPPORTED_TYPE = set(
    [
        "money",
        "time with time zone",
        "inet",
        "path",
        "pg_lsn",
        "point",
        "polygon",
        "tsquery",
        "tsvector",
        "txid_snapshot",
        "uuid",
        "box",
        "cidr",
        "circle",
        "interval",
        "jsonb",
        "line",
        "lseg",
        "macaddr",
        "macaddr8",
    ]
)

pg_columns_meta_sql = Template(
    dedent(
        """
        SELECT
            *
        FROM EXTERNAL_QUERY("$connection",
        \"\"\"
        SELECT
            c.table_schema,
            c.table_name,
            c.column_name,
            c.data_type,
            pgd.description
        FROM pg_catalog.pg_statio_all_tables AS st
        INNER JOIN pg_catalog.pg_description pgd on (
            pgd.objoid = st.relid
        )
        RIGHT JOIN information_schema.columns c ON (
            pgd.objsubid   = c.ordinal_position and
            c.table_schema = st.schemaname and
            c.table_name   = st.relname
        )
        WHERE c.table_schema = '$pg_schema';
        \"\"\");
        """
    )
)

bq_client = bigquery.Client(credentials="{YOUR_CREDENTIALS}", project="{YOUR_PROJECT_ID}")

columns_meta_df = (
        bq_client.query(
            pg_columns_meta_sql.substitute(connection=connection, pg_schema=pg_schema),
        )
        .result()
        .to_dataframe()
    )

columns_meta_df["casted_column_name"] = columns_meta_df["column_name"].where(
        ~columns_meta_df["data_type"].isin(BIGQUERY_UNSUPPORTED_TYPE),
        columns_meta_df["column_name"] + "::varchar",
    )

まとめ

CloudSQLにあるデータをBigQueryに入れるのは簡単そうに見えますが、テーブル数が多くなると、意外と課題がありました。 以上です。ご参考になれば幸いです。