From 543f271798757472cca9aabe89fcdfd4ad794d34 Mon Sep 17 00:00:00 2001 From: Vitor Bellini Date: Fri, 12 Jul 2024 20:27:23 -0300 Subject: [PATCH] add inlets and outlets to db_to_db_operator (#193) * add inlets and outlets to db_to_db_operator * add py lib openmetadata-ingestion --------- Co-authored-by: vitor --- fastetl/operators/db_to_db_operator.py | 88 ++++++++++++++++++-------- requirements.txt | 3 +- setup.py | 3 +- 3 files changed, 67 insertions(+), 27 deletions(-) diff --git a/fastetl/operators/db_to_db_operator.py b/fastetl/operators/db_to_db_operator.py index 037726d..5e1c470 100644 --- a/fastetl/operators/db_to_db_operator.py +++ b/fastetl/operators/db_to_db_operator.py @@ -82,50 +82,88 @@ TypeError: If `source` or `destination` is not a dictionary. """ +import random from datetime import datetime from typing import Dict +from airflow.hooks.base import BaseHook from airflow.models.baseoperator import BaseOperator +from airflow.utils.decorators import apply_defaults +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.source.pipeline.airflow.lineage_parser import OMEntity from fastetl.hooks.db_to_db_hook import DbToDbHook + class DbToDbOperator(BaseOperator): - template_fields = ['source'] + template_fields = ["source"] + @apply_defaults def __init__( - self, - source: Dict[str, str], - destination: Dict[str, str], - columns_to_ignore: list = None, - destination_truncate: bool = True, - chunksize: int = 1000, - copy_table_comments: bool = False, - is_incremental: bool = False, - table: str = None, - date_column: str = None, - key_column: str = None, - since_datetime: datetime = None, - sync_exclusions: bool = False, - *args, **kwargs) -> None: + self, + source: Dict[str, str], + destination: Dict[str, str], + columns_to_ignore: list = None, + destination_truncate: bool = True, + chunksize: int = 1000, + copy_table_comments: bool = False, + is_incremental: bool = False, + table: str = None, + date_column: str = None, + key_column: str = None, + since_datetime: datetime = None, + sync_exclusions: bool = False, + *args, + **kwargs, + ) -> None: super().__init__(*args, **kwargs) - self.source = source - self.destination = destination self.columns_to_ignore = columns_to_ignore self.destination_truncate = destination_truncate self.chunksize = chunksize self.copy_table_comments = copy_table_comments - self.is_incremental=is_incremental - self.table=table - self.date_column=date_column - self.key_column=key_column - self.since_datetime=since_datetime - self.sync_exclusions=sync_exclusions + self.is_incremental = is_incremental + self.table = table + self.date_column = date_column + self.key_column = key_column + self.since_datetime = since_datetime + self.sync_exclusions = sync_exclusions + # any value that needs to be the same for inlets and outlets + key = str(random.randint(10000000, 99999999)) + if source.get("om_service", None): + self.inlets = [OMEntity(entity=Table, fqn=self._get_fqn(source), key=key)] + if destination.get("om_service", None): + self.outlets = [ + OMEntity(entity=Table, fqn=self._get_fqn(destination), key=key) + ] + # rename if schema_name is present + if source.get("schema_name", None): + source["schema"] = source.pop("schema_name") + if destination.get("schema_name", None): + destination["schema"] = destination.pop("schema_name") + # filter to keys accepted by DbToDbHook + self.source = { + key: source[key] + for key in ["conn_id", "schema", "table", "query"] + if key in source + } + self.destination = { + key: destination[key] + for key in ["conn_id", "schema", "table"] + if key in destination + } + + def _get_fqn(self, data): + data["database"] = BaseHook.get_connection(data["conn_id"]).schema + fqn = ( + f'{data["om_service"]}.{data["database"]}.{data["schema"]}.{data["table"]}' + ) + return fqn def execute(self, context): hook = DbToDbHook( source=self.source, destination=self.destination, - ) + ) if self.is_incremental: hook.incremental_copy( @@ -142,5 +180,5 @@ def execute(self, context): columns_to_ignore=self.columns_to_ignore, destination_truncate=self.destination_truncate, chunksize=self.chunksize, - copy_table_comments=self.copy_table_comments + copy_table_comments=self.copy_table_comments, ) diff --git a/requirements.txt b/requirements.txt index e32ea93..9cdcedf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ psycopg2==2.9.5 pygsheets==2.0.5 google-api-python-client==2.102.0 google-auth==2.23.2 -Markdown==3.6 \ No newline at end of file +Markdown==3.6 +openmetadata-ingestion==1.4.1.0 \ No newline at end of file diff --git a/setup.py b/setup.py index caab54b..2013a79 100644 --- a/setup.py +++ b/setup.py @@ -43,7 +43,8 @@ "pytz>=2022.6", "requests>=2.28.1", "SQLAlchemy>=1.4.44", - "PyYAML==6.0" + "PyYAML==6.0", + "openmetadata-ingestion==1.4.1.0" ], setup_requires=["setuptools", "wheel"], author="Time de Dados CGINF",