From 08418174337044b461002c8e25c549e1986c077d Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 11 Jul 2024 20:53:12 +0200 Subject: [PATCH] Improve airflow integration (#11) --- opendbt/airflow/__init__.py | 16 +++++----- .../airflow/dags/dbt_tests_workflow.py | 32 +++++++++++++++++++ tests/resources/airflow/dags/dbt_workflow.py | 8 ++--- 3 files changed, 44 insertions(+), 12 deletions(-) create mode 100644 tests/resources/airflow/dags/dbt_tests_workflow.py diff --git a/opendbt/airflow/__init__.py b/opendbt/airflow/__init__.py index be2cbf5..e53fe19 100644 --- a/opendbt/airflow/__init__.py +++ b/opendbt/airflow/__init__.py @@ -60,8 +60,8 @@ def load_dbt_tasks(self, end_node: BaseOperator = None, tag: str = None, resource_type="all", - run_dbt_seeds=False, - run_singular_tests=False) -> Tuple[BaseOperator, BaseOperator]: + include_dbt_seeds=False, + include_singular_tests=False) -> Tuple[BaseOperator, BaseOperator]: """ This method is used to add dbt tasks to Given DAG. @@ -81,7 +81,7 @@ def load_dbt_tasks(self, dag=dag) end_node = end_node if end_node else EmptyOperator(task_id='dbt-%s-end' % self.project_dir.name, dag=dag) - if run_dbt_seeds: + if include_dbt_seeds: # add dbt seeds job after start node abd before all other dbt jobs first_node = start_node start_node = OpenDbtExecutorOperator(dag=dag, @@ -106,7 +106,7 @@ def load_dbt_tasks(self, if resource_type == "test" and not str(node.name).startswith("source_"): if node.resource_type == "test": dbt_tasks[node.unique_id] = OpenDbtExecutorOperator(dag=dag, - task_id=node.unique_id.rsplit('.', 1)[0], + task_id=node.unique_id, project_dir=self.project_dir, profiles_dir=self.profiles_dir, target=self.target, @@ -132,7 +132,7 @@ def load_dbt_tasks(self, # we are skipping model tests because they are included above with model execution( `build` command) # source table tests dbt_tasks[node.unique_id] = OpenDbtExecutorOperator(dag=dag, - task_id=node.unique_id.rsplit('.', 1)[0], + task_id=node.unique_id, project_dir=self.project_dir, profiles_dir=self.profiles_dir, target=self.target, @@ -153,7 +153,7 @@ def load_dbt_tasks(self, task.set_upstream(dbt_tasks[upstream_id]) singular_tests = None - if run_singular_tests: + if include_singular_tests: singular_tests = OpenDbtExecutorOperator(dag=dag, task_id=f"{self.project_dir.name}_singular_tests", project_dir=self.project_dir, @@ -167,7 +167,7 @@ def load_dbt_tasks(self, # set downstream dependencies for the end nodes. self.log.debug(f"Setting downstream of {task.task_id} -> {end_node.task_id}") - if run_singular_tests and singular_tests: + if include_singular_tests and singular_tests: task.set_downstream(singular_tests) else: task.set_downstream(end_node) @@ -177,6 +177,6 @@ def load_dbt_tasks(self, self.log.debug(f"Setting upstream of {task.task_id} -> {start_node}") task.set_upstream(start_node) - if run_singular_tests: + if include_singular_tests: singular_tests.set_downstream(end_node) return start_node, end_node diff --git a/tests/resources/airflow/dags/dbt_tests_workflow.py b/tests/resources/airflow/dags/dbt_tests_workflow.py new file mode 100644 index 0000000..5037914 --- /dev/null +++ b/tests/resources/airflow/dags/dbt_tests_workflow.py @@ -0,0 +1,32 @@ +from pathlib import Path + +from airflow import DAG +from airflow.operators.empty import EmptyOperator +from airflow.utils.dates import days_ago + +from opendbt.airflow import OpenDbtAirflowProject + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1 +} + +with DAG( + dag_id='dbt_tests_workflow', + default_args=default_args, + description='DAG To run dbt tests', + schedule_interval=None, + start_date=days_ago(3), + catchup=False, + max_active_runs=1 +) as dag: + start = EmptyOperator(task_id="start") + end = EmptyOperator(task_id="end") + + DBT_PROJ_DIR = Path("/opt/dbttest") + + p = OpenDbtAirflowProject(project_dir=DBT_PROJ_DIR, profiles_dir=DBT_PROJ_DIR, target='dev') + p.load_dbt_tasks(dag=dag, start_node=start, end_node=end, resource_type='test') diff --git a/tests/resources/airflow/dags/dbt_workflow.py b/tests/resources/airflow/dags/dbt_workflow.py index 2edf045..8e3a51f 100644 --- a/tests/resources/airflow/dags/dbt_workflow.py +++ b/tests/resources/airflow/dags/dbt_workflow.py @@ -1,9 +1,9 @@ from pathlib import Path +from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago -from airflow import DAG from opendbt.airflow import OpenDbtAirflowProject default_args = { @@ -26,7 +26,7 @@ start = EmptyOperator(task_id="start") end = EmptyOperator(task_id="end") - DBTTEST_DIR = Path("/opt/dbttest") + DBT_PROJ_DIR = Path("/opt/dbttest") - p = OpenDbtAirflowProject(project_dir=DBTTEST_DIR, profiles_dir=DBTTEST_DIR, target='dev') - p.load_dbt_tasks(dag=dag, start_node=start, end_node=end) + p = OpenDbtAirflowProject(project_dir=DBT_PROJ_DIR, profiles_dir=DBT_PROJ_DIR, target='dev') + p.load_dbt_tasks(dag=dag, start_node=start, end_node=end, include_singular_tests=True, include_dbt_seeds=True)