Skip to content

Commit

Permalink
add inlets and outlets to db_to_db_operator (#193)
Browse files Browse the repository at this point in the history
* add inlets and outlets to db_to_db_operator

* add py lib openmetadata-ingestion

---------

Co-authored-by: vitor <[email protected]>
  • Loading branch information
vitorbellini and vitor authored Jul 12, 2024
1 parent a1457ad commit 543f271
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 27 deletions.
88 changes: 63 additions & 25 deletions fastetl/operators/db_to_db_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,50 +82,88 @@
TypeError: If `source` or `destination` is not a dictionary.
"""

import random
from datetime import datetime
from typing import Dict

from airflow.hooks.base import BaseHook
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.source.pipeline.airflow.lineage_parser import OMEntity

from fastetl.hooks.db_to_db_hook import DbToDbHook


class DbToDbOperator(BaseOperator):
template_fields = ['source']
template_fields = ["source"]

@apply_defaults
def __init__(
self,
source: Dict[str, str],
destination: Dict[str, str],
columns_to_ignore: list = None,
destination_truncate: bool = True,
chunksize: int = 1000,
copy_table_comments: bool = False,
is_incremental: bool = False,
table: str = None,
date_column: str = None,
key_column: str = None,
since_datetime: datetime = None,
sync_exclusions: bool = False,
*args, **kwargs) -> None:
self,
source: Dict[str, str],
destination: Dict[str, str],
columns_to_ignore: list = None,
destination_truncate: bool = True,
chunksize: int = 1000,
copy_table_comments: bool = False,
is_incremental: bool = False,
table: str = None,
date_column: str = None,
key_column: str = None,
since_datetime: datetime = None,
sync_exclusions: bool = False,
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.source = source
self.destination = destination
self.columns_to_ignore = columns_to_ignore
self.destination_truncate = destination_truncate
self.chunksize = chunksize
self.copy_table_comments = copy_table_comments
self.is_incremental=is_incremental
self.table=table
self.date_column=date_column
self.key_column=key_column
self.since_datetime=since_datetime
self.sync_exclusions=sync_exclusions
self.is_incremental = is_incremental
self.table = table
self.date_column = date_column
self.key_column = key_column
self.since_datetime = since_datetime
self.sync_exclusions = sync_exclusions
# any value that needs to be the same for inlets and outlets
key = str(random.randint(10000000, 99999999))
if source.get("om_service", None):
self.inlets = [OMEntity(entity=Table, fqn=self._get_fqn(source), key=key)]
if destination.get("om_service", None):
self.outlets = [
OMEntity(entity=Table, fqn=self._get_fqn(destination), key=key)
]
# rename if schema_name is present
if source.get("schema_name", None):
source["schema"] = source.pop("schema_name")
if destination.get("schema_name", None):
destination["schema"] = destination.pop("schema_name")
# filter to keys accepted by DbToDbHook
self.source = {
key: source[key]
for key in ["conn_id", "schema", "table", "query"]
if key in source
}
self.destination = {
key: destination[key]
for key in ["conn_id", "schema", "table"]
if key in destination
}

def _get_fqn(self, data):
data["database"] = BaseHook.get_connection(data["conn_id"]).schema
fqn = (
f'{data["om_service"]}.{data["database"]}.{data["schema"]}.{data["table"]}'
)
return fqn

def execute(self, context):
hook = DbToDbHook(
source=self.source,
destination=self.destination,
)
)

if self.is_incremental:
hook.incremental_copy(
Expand All @@ -142,5 +180,5 @@ def execute(self, context):
columns_to_ignore=self.columns_to_ignore,
destination_truncate=self.destination_truncate,
chunksize=self.chunksize,
copy_table_comments=self.copy_table_comments
copy_table_comments=self.copy_table_comments,
)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ psycopg2==2.9.5
pygsheets==2.0.5
google-api-python-client==2.102.0
google-auth==2.23.2
Markdown==3.6
Markdown==3.6
openmetadata-ingestion==1.4.1.0
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
"pytz>=2022.6",
"requests>=2.28.1",
"SQLAlchemy>=1.4.44",
"PyYAML==6.0"
"PyYAML==6.0",
"openmetadata-ingestion==1.4.1.0"
],
setup_requires=["setuptools", "wheel"],
author="Time de Dados CGINF",
Expand Down

0 comments on commit 543f271

Please sign in to comment.