Skip to content
This repository has been archived by the owner on May 17, 2024. It is now read-only.

Commit

Permalink
v0 of concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Sung Won Chung committed Nov 13, 2023
1 parent c409c81 commit e6fac1e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
16 changes: 12 additions & 4 deletions data_diff/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ def dbt_diff(
)
diff_threads.append(diff_thread)
else:
_local_diff(diff_vars, json_output)
diff_thread = run_as_daemon(_local_diff, diff_vars, json_output, log_status_handler)
diff_threads.append(diff_thread)
else:
if json_output:
print(
Expand Down Expand Up @@ -265,7 +266,11 @@ def _get_prod_path_from_manifest(model, prod_manifest) -> Union[Tuple[str, str,
return prod_database, prod_schema, prod_alias


def _local_diff(diff_vars: TDiffVars, json_output: bool = False) -> None:
def _local_diff(
diff_vars: TDiffVars, json_output: bool = False, log_status_handler: Optional[LogStatusHandler] = None
) -> None:
if log_status_handler:
log_status_handler.diff_started(diff_vars.dev_path[-1])
dev_qualified_str = ".".join(diff_vars.dev_path)
prod_qualified_str = ".".join(diff_vars.prod_path)
diff_output_str = _diff_output_base(dev_qualified_str, prod_qualified_str)
Expand Down Expand Up @@ -373,6 +378,9 @@ def _local_diff(diff_vars: TDiffVars, json_output: bool = False) -> None:
diff_output_str += no_differences_template()
rich.print(diff_output_str)

if log_status_handler:
log_status_handler.diff_finished(diff_vars.dev_path[-1])


def _initialize_api() -> Optional[DatafoldAPI]:
datafold_host = os.environ.get("DATAFOLD_HOST")
Expand Down Expand Up @@ -406,7 +414,7 @@ def _cloud_diff(
log_status_handler: Optional[LogStatusHandler] = None,
) -> None:
if log_status_handler:
log_status_handler.cloud_diff_started(diff_vars.dev_path[-1])
log_status_handler.diff_started(diff_vars.dev_path[-1])
diff_output_str = _diff_output_base(".".join(diff_vars.dev_path), ".".join(diff_vars.prod_path))
payload = TCloudApiDataDiff(
data_source1_id=datasource_id,
Expand Down Expand Up @@ -476,7 +484,7 @@ def _cloud_diff(
rich.print(diff_output_str)

if log_status_handler:
log_status_handler.cloud_diff_finished(diff_vars.dev_path[-1])
log_status_handler.diff_finished(diff_vars.dev_path[-1])
except BaseException as ex: # Catch KeyboardInterrupt too
error = ex
finally:
Expand Down
28 changes: 14 additions & 14 deletions data_diff/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,31 +485,31 @@ def __init__(self):
super().__init__()
self.status = Status("")
self.prefix = ""
self.cloud_diff_status = {}
self.diff_status = {}

def emit(self, record):
log_entry = self.format(record)
if self.cloud_diff_status:
self._update_cloud_status(log_entry)
if self.diff_status:
self._update_diff_status(log_entry)
else:
self.status.update(self.prefix + log_entry)

def set_prefix(self, prefix_string):
self.prefix = prefix_string

def cloud_diff_started(self, model_name):
self.cloud_diff_status[model_name] = "[yellow]In Progress[/]"
self._update_cloud_status()
def diff_started(self, model_name):
self.diff_status[model_name] = "[yellow]In Progress[/]"
self._update_diff_status()

def cloud_diff_finished(self, model_name):
self.cloud_diff_status[model_name] = "[green]Finished [/]"
self._update_cloud_status()
def diff_finished(self, model_name):
self.diff_status[model_name] = "[green]Finished [/]"
self._update_diff_status()

def _update_cloud_status(self, log=None):
cloud_status_string = "\n"
for model_name, status in self.cloud_diff_status.items():
cloud_status_string += f"{status} {model_name}\n"
self.status.update(f"{cloud_status_string}{log or ''}")
def _update_diff_status(self, log=None):
status_string = "\n"
for model_name, status in self.diff_status.items():
status_string += f"{status} {model_name}\n"
self.status.update(f"{status_string}{log or ''}")


class UnknownMeta(type):
Expand Down

0 comments on commit e6fac1e

Please sign in to comment.