Airflowの単体テストを書きましょう

データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、品質の担保は言うまでもなく重要ですね。品質を確保するには、ワークフローの監視・検証、ワークフローのテスト、そして加工用クエリのテストがいずれも欠かせません。この記事では、ワークフロー(Airflow)の単体テスト方法について紹介します。また、ワークフローの監視・検証に関しては、過去の記事も合わせてご覧いただけると幸いです。 ワークフローの監視 ワークフローの検証 DAGの単体テスト まずは、DAGの単体テストについて説明します。厳密に言えば、DAGの実行ではなく、DAGが正確に構築されたかどうかのテストを行います。 https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#unit-tests Airflowの公式ベストプラクティスでは簡潔に紹介されていますが、具体例を挙げてさらに詳しく説明しましょう。 importのテスト importが正常にできることを確認する(importが失敗するとWeb UIからも確認できるが、単体テストする時点で確認するともっと便利) import時間を制限する。(冗長なDAGがあると解析するのに時間がかかるので、import時間を制限することで、事前に冗長なDAGを発見できる) 最低でも1つのタスクが含まれていることを確認する。 import unittest from datetime import timedelta from airflow.models import DagBag class TestImportDags(unittest.TestCase): IMPORT_TIMEOUT = 120 @classmethod def setUpClass(cls) -> None: cls.dagbag = DagBag() cls.stats = cls.dagbag.dagbag_stats def test_import_dags(self): self.assertFalse( len(self.dagbag.import_errors), f"DAG import failures. Errors: {self.dagbag.import_errors}", ) def test_import_dags_time(self): duration = sum((o.duration for o in self.stats), timedelta()).total_seconds() self.assertLess(duration, self.IMPORT_TIMEOUT) def test_dags_have_at_least_one_task(self): for key, dag in self.dagbag.dags.items(): self.assertTrue(dag, f"DAG {key} not exsit") self....

April 6, 2023 · Me