From 6ef99cb65d218a389d703812076db38d6aec7ef6 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Mon, 5 Jun 2023 17:59:48 -0700 Subject: [PATCH 01/19] repartition by given filter value, splitting the files into two groups --- .../compactor/model/repartition_result.py | 7 ++ .../compute/compactor/steps/repartition.py | 109 ++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 deltacat/compute/compactor/model/repartition_result.py create mode 100644 deltacat/compute/compactor/steps/repartition.py diff --git a/deltacat/compute/compactor/model/repartition_result.py b/deltacat/compute/compactor/model/repartition_result.py new file mode 100644 index 00000000..4668b4f4 --- /dev/null +++ b/deltacat/compute/compactor/model/repartition_result.py @@ -0,0 +1,7 @@ +from typing import NamedTuple +from deltacat.compute.compactor import DeltaAnnotated + + +class RePartitionResult(NamedTuple): + cold_delta: DeltaAnnotated + hot_delta: DeltaAnnotated diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py new file mode 100644 index 00000000..05b00043 --- /dev/null +++ b/deltacat/compute/compactor/steps/repartition.py @@ -0,0 +1,109 @@ +import importlib +import logging +from contextlib import nullcontext +import pyarrow.compute as pc +import pyarrow as pa +from typing import List, Optional, Any +from deltacat.types.media import StorageType, ContentType +import ray +from deltacat import logs +from deltacat.compute.compactor import DeltaAnnotated +from deltacat.compute.compactor.model.repartition_result import RePartitionResult +from deltacat.storage import interface as unimplemented_deltacat_storage +from deltacat.storage import Partition +from deltacat.utils.ray_utils.runtime import ( + get_current_ray_task_id, + get_current_ray_worker_id, +) +from deltacat.utils.common import ReadKwargsProvider +from deltacat.utils.performance import timed_invocation +from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig + +if importlib.util.find_spec("memray"): + import memray + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +def _timed_repartition( + annotated_delta: DeltaAnnotated, + column: str, + filter_value: Any, + detination_partition: Partition, + enable_profiler: bool, + read_kwargs_provider: Optional[ReadKwargsProvider], + deltacat_storage=unimplemented_deltacat_storage, +) -> RePartitionResult: + + task_id = get_current_ray_task_id() + worker_id = get_current_ray_worker_id() + with memray.Tracker( + f"repartition_{worker_id}_{task_id}.bin" + ) if enable_profiler else nullcontext(): + tables: List[pa.Table] = deltacat_storage.download_delta( + annotated_delta, + storage_type=StorageType.LOCAL, + file_reader_kwargs_provider=read_kwargs_provider, + ) + cold_tables = [] + hot_tables = [] + total_record_count = 0 + for table in tables: + total_record_count += len(table) + cold = table.filter( + (pc.cast(table[column], pa.int64()) <= pc.scalar(filter_value)) + ) + cold_tables.append(cold) + hot = table.filter( + (pc.cast(table[column], pa.int64()) > pc.scalar(filter_value)) + ) + hot_tables.append(hot) + # TODO(rootliu) set optimal or max number of records per file to defer the performance degradation due to too many small files + cold_table = pa.concat_tables(cold_tables) + hot_table = pa.concat_tables(hot_tables) + cold_delta = deltacat_storage.stage_delta( + cold_table, + detination_partition, + content_type=ContentType.PARQUET, + ) + hot_delta = deltacat_storage.stage_delta( + hot_table, + detination_partition, + content_type=ContentType.PARQUET, + ) + assert ( + len(cold_table) + len(hot_table) == total_record_count + ), "Repartitioned table should have the same number of records as the original table" + return RePartitionResult( + cold_delta=cold_delta, + hot_delta=hot_delta, + ) + + +@ray.remote +def repartition( + annotated_delta: DeltaAnnotated, + column: str, + filter_value: Any, + detination_partition: Partition, + enable_profiler: bool, + metrics_config: MetricsConfig, + read_kwargs_provider: Optional[ReadKwargsProvider], + deltacat_storage=unimplemented_deltacat_storage, +) -> RePartitionResult: + logger.info(f"Starting repartition task...") + repartition_result, duration = timed_invocation( + func=_timed_repartition, + annotated_delta=annotated_delta, + column=column, + filter_value=filter_value, + detination_partition=detination_partition, + enable_profiler=enable_profiler, + read_kwargs_provider=read_kwargs_provider, + deltacat_storage=deltacat_storage, + ) + if metrics_config: + emit_timer_metrics( + metrics_name="repartition", value=duration, metrics_config=metrics_config + ) + return repartition_result From 820dc535665e1587f31f94e1858314fb3107d42a Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Tue, 6 Jun 2023 10:35:57 -0700 Subject: [PATCH 02/19] fix typo in function signature --- deltacat/compute/compactor/steps/repartition.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index 05b00043..61ba717a 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -29,7 +29,7 @@ def _timed_repartition( annotated_delta: DeltaAnnotated, column: str, filter_value: Any, - detination_partition: Partition, + destination_partition: Partition, enable_profiler: bool, read_kwargs_provider: Optional[ReadKwargsProvider], deltacat_storage=unimplemented_deltacat_storage, @@ -63,12 +63,12 @@ def _timed_repartition( hot_table = pa.concat_tables(hot_tables) cold_delta = deltacat_storage.stage_delta( cold_table, - detination_partition, + destination_partition, content_type=ContentType.PARQUET, ) hot_delta = deltacat_storage.stage_delta( hot_table, - detination_partition, + destination_partition, content_type=ContentType.PARQUET, ) assert ( @@ -85,7 +85,7 @@ def repartition( annotated_delta: DeltaAnnotated, column: str, filter_value: Any, - detination_partition: Partition, + destination_partition: Partition, enable_profiler: bool, metrics_config: MetricsConfig, read_kwargs_provider: Optional[ReadKwargsProvider], @@ -97,7 +97,7 @@ def repartition( annotated_delta=annotated_delta, column=column, filter_value=filter_value, - detination_partition=detination_partition, + destination_partition=destination_partition, enable_profiler=enable_profiler, read_kwargs_provider=read_kwargs_provider, deltacat_storage=deltacat_storage, From 64375870a447881baedd99196054097d95a8a825 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Tue, 6 Jun 2023 19:29:53 -0700 Subject: [PATCH 03/19] add repartition session --- .../compactor/model/repartition_result.py | 5 +- .../compute/compactor/steps/repartition.py | 117 +++++++----- .../compactor/utils/repartition_session.py | 169 ++++++++++++++++++ 3 files changed, 241 insertions(+), 50 deletions(-) create mode 100644 deltacat/compute/compactor/utils/repartition_session.py diff --git a/deltacat/compute/compactor/model/repartition_result.py b/deltacat/compute/compactor/model/repartition_result.py index 4668b4f4..c5be13c8 100644 --- a/deltacat/compute/compactor/model/repartition_result.py +++ b/deltacat/compute/compactor/model/repartition_result.py @@ -1,7 +1,6 @@ -from typing import NamedTuple +from typing import NamedTuple, List from deltacat.compute.compactor import DeltaAnnotated class RePartitionResult(NamedTuple): - cold_delta: DeltaAnnotated - hot_delta: DeltaAnnotated + range_deltas: List[DeltaAnnotated] diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index 61ba717a..d9b778ea 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -3,7 +3,7 @@ from contextlib import nullcontext import pyarrow.compute as pc import pyarrow as pa -from typing import List, Optional, Any +from typing import List, Optional from deltacat.types.media import StorageType, ContentType import ray from deltacat import logs @@ -27,65 +27,88 @@ def _timed_repartition( annotated_delta: DeltaAnnotated, - column: str, - filter_value: Any, + repartition_type: str, + repartition_args: dict, destination_partition: Partition, enable_profiler: bool, read_kwargs_provider: Optional[ReadKwargsProvider], deltacat_storage=unimplemented_deltacat_storage, ) -> RePartitionResult: - task_id = get_current_ray_task_id() - worker_id = get_current_ray_worker_id() - with memray.Tracker( - f"repartition_{worker_id}_{task_id}.bin" - ) if enable_profiler else nullcontext(): - tables: List[pa.Table] = deltacat_storage.download_delta( - annotated_delta, - storage_type=StorageType.LOCAL, - file_reader_kwargs_provider=read_kwargs_provider, - ) - cold_tables = [] - hot_tables = [] - total_record_count = 0 - for table in tables: - total_record_count += len(table) - cold = table.filter( - (pc.cast(table[column], pa.int64()) <= pc.scalar(filter_value)) + if repartition_type == "range": + # A delta that is partitioned by range is partitioned in such a way + # that each partition contains rows for which the partitioning expression value lies within a given range for a specified column. + # For example, if the partitioning column is "date", the partition ranges are "2020-01-01" to "2020-01-10" and "2020-01-11" to "2020-01-20". + # The partitioned delta will have two partitions, one for each range. + column: str = repartition_args["column"] + partition_ranges: List = repartition_args["ranges"] + task_id = get_current_ray_task_id() + worker_id = get_current_ray_worker_id() + with memray.Tracker( + f"repartition_{worker_id}_{task_id}.bin" + ) if enable_profiler else nullcontext(): + tables: List[pa.Table] = deltacat_storage.download_delta( + annotated_delta, + storage_type=StorageType.LOCAL, + file_reader_kwargs_provider=read_kwargs_provider, ) - cold_tables.append(cold) - hot = table.filter( - (pc.cast(table[column], pa.int64()) > pc.scalar(filter_value)) + partitioned_tables = {i: [] for i in range(len(partition_ranges) + 1)} + total_record_count = 0 + col_name_int64 = f"{column}_int64" + for table in tables: + total_record_count += len(table) + table_new = table.add_column( + 0, + pa.field(col_name_int64, pa.int64()), + pc.cast(table[column], pa.int64()), + ) + for filter_value in partition_ranges: + partitioned_tables[partition_ranges.index(filter_value)].append( + table_new.filter( + (pc.field(col_name_int64) <= pc.scalar(filter_value)) + ) + ) + if filter_value == partition_ranges[-1]: + partitioned_tables[partition_ranges.index(filter_value)].append( + table_new.filter( + (pc.field(col_name_int64) > pc.scalar(filter_value)) + ) + ) + # TODO(rootliu) set optimal or max number of records per file to defer the performance degradation due to too many small files + # iterate the items in partitioned_tables, concat all tables for the same key + # and then write the result to the storage + range_table_length = 0 + range_deltas: List[DeltaAnnotated] = [] + for _, value in partitioned_tables.items(): + if len(value) > 0: + range_table = pa.concat_tables(value) + if len(range_table) > 0: + range_table_length += len(range_table) + range_delta = deltacat_storage.stage_delta( + range_table, + destination_partition, + content_type=ContentType.PARQUET, + ) + range_deltas.append(range_delta) + + assert ( + range == total_record_count + ), "Repartitioned table should have the same number of records as the original table" + return RePartitionResult( + range_deltas=range_deltas, ) - hot_tables.append(hot) - # TODO(rootliu) set optimal or max number of records per file to defer the performance degradation due to too many small files - cold_table = pa.concat_tables(cold_tables) - hot_table = pa.concat_tables(hot_tables) - cold_delta = deltacat_storage.stage_delta( - cold_table, - destination_partition, - content_type=ContentType.PARQUET, - ) - hot_delta = deltacat_storage.stage_delta( - hot_table, - destination_partition, - content_type=ContentType.PARQUET, - ) - assert ( - len(cold_table) + len(hot_table) == total_record_count - ), "Repartitioned table should have the same number of records as the original table" - return RePartitionResult( - cold_delta=cold_delta, - hot_delta=hot_delta, + else: + raise NotImplementedError( + f"Repartition type {repartition_type} is not supported." ) @ray.remote def repartition( annotated_delta: DeltaAnnotated, - column: str, - filter_value: Any, destination_partition: Partition, + repartition_type: str, + repartition_args: dict, enable_profiler: bool, metrics_config: MetricsConfig, read_kwargs_provider: Optional[ReadKwargsProvider], @@ -95,8 +118,8 @@ def repartition( repartition_result, duration = timed_invocation( func=_timed_repartition, annotated_delta=annotated_delta, - column=column, - filter_value=filter_value, + repartition_type=repartition_type, + repartition_args=repartition_args, destination_partition=destination_partition, enable_profiler=enable_profiler, read_kwargs_provider=read_kwargs_provider, diff --git a/deltacat/compute/compactor/utils/repartition_session.py b/deltacat/compute/compactor/utils/repartition_session.py new file mode 100644 index 00000000..8f5c8ab9 --- /dev/null +++ b/deltacat/compute/compactor/utils/repartition_session.py @@ -0,0 +1,169 @@ +import ray +import time +import logging +from deltacat import logs +from deltacat.utils.common import ReadKwargsProvider +import sungate as sg +import functools +import itertools +from deltacat.compute.compactor import ( + RoundCompletionInfo, + SortKey, +) +from deltacat.compute.compactor import DeltaAnnotated +from deltacat.utils.ray_utils.concurrency import ( + invoke_parallel, + round_robin_options_provider, +) +from deltacat.compute.compactor.model.repartition_result import RePartitionResult +from deltacat.utils.placement import PlacementGroupConfig +from typing import List, Optional, Dict, Any +from deltacat.utils.ray_utils.runtime import live_node_resource_keys +from deltacat.compute.compactor.utils import io +from deltacat.compute.compactor.utils import round_completion_file as rcf +from deltacat.compute.compactor.steps import repartition as repar +from deltacat.storage import ( + Delta, + DeltaLocator, + PartitionLocator, +) + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) +deltacat_storage = sg.andes + + +def repartition( + source_partition_locator: PartitionLocator, + destination_partition_locator: PartitionLocator, + repartition_type: str, + repartition_args: Any, + compaction_artifact_s3_bucket: str, + last_stream_position_to_compact: int, + sort_keys: List[SortKey] = None, + pg_config: Optional[PlacementGroupConfig] = None, + list_deltas_kwargs: Optional[Dict[str, Any]] = None, + read_kwargs_provider: Optional[ReadKwargsProvider] = None, + **kwargs, +) -> Optional[str]: + + node_resource_keys = None + if pg_config: # use resource in each placement group + cluster_resources = pg_config.resource + cluster_cpus = cluster_resources["CPU"] + else: # use all cluster resource + cluster_resources = ray.cluster_resources() + logger.info(f"Total cluster resources: {cluster_resources}") + logger.info(f"Available cluster resources: {ray.available_resources()}") + cluster_cpus = int(cluster_resources["CPU"]) + logger.info(f"Total cluster CPUs: {cluster_cpus}") + node_resource_keys = live_node_resource_keys() + logger.info( + f"Found {len(node_resource_keys)} live cluster nodes: " + f"{node_resource_keys}" + ) + + # create a remote options provider to round-robin tasks across all nodes or allocated bundles + logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}") + round_robin_opt_provider = functools.partial( + round_robin_options_provider, + resource_keys=node_resource_keys, + pg_config=pg_config.opts if pg_config else None, + ) + + (deltas, _,) = io.discover_deltas( + source_partition_locator, + None, + last_stream_position_to_compact, + None, + None, + None, + deltacat_storage, + **list_deltas_kwargs, + ) + + uniform_deltas = [] + for delta in deltas: + # limit the input deltas to fit on this cluster and convert them to + # annotated deltas of equivalent size for easy parallel distribution + uniform_deltas_part = DeltaAnnotated.rebatch( + [DeltaAnnotated.of(delta)], + 200 * 2**20, # 200 MiB compressed data per task, + min_file_counts=1000, + ) + uniform_deltas.extend(uniform_deltas_part) + + logger.info(f"Retrieved a total of {len(uniform_deltas)} uniform deltas.") + + max_parallelism = cluster_cpus + # create a new stream for this round + compacted_stream_locator = destination_partition_locator.stream_locator + stream = deltacat_storage.get_stream( + compacted_stream_locator.namespace, + compacted_stream_locator.table_name, + compacted_stream_locator.table_version, + ) + partition = deltacat_storage.stage_partition( + stream, + destination_partition_locator.partition_values, + ) + new_compacted_partition_locator = partition.locator + repar_start = time.time() + repar_tasks_pending = invoke_parallel( + items=uniform_deltas, + ray_task=repar.repartition, + max_parallelism=max_parallelism, + options_provider=round_robin_opt_provider, + repartition_type=repartition_type, + repartition_args=repartition_args, + destination_partition=partition, + enable_profiler=False, + metrics_config=None, + read_kwargs_provider=read_kwargs_provider, + deltacat_storage=deltacat_storage, + ) + logger.info(f"Getting {len(repar_tasks_pending)} task results...") + repar_results: List[RePartitionResult] = ray.get(repar_tasks_pending) + # Transpose the list, filling in with None for shorter lists + transposed = list(itertools.zip_longest(*repar_results, fillvalue=None)) + # Flatten the list and remove None values + ordered_deltas: List[DeltaAnnotated] = [ + i for sublist in transposed for i in sublist if i is not None + ] + repar_end = time.time() + print(f"repartition {repar_end - repar_start} seconds") + + logger.info(f"Got {len(ordered_deltas)} task results.") + # ordered_deltas are ordered as [cold1, cold2, coldN, hot1, hot2, hotN] + merged_delta = Delta.merge_deltas(ordered_deltas) + compacted_delta = deltacat_storage.commit_delta( + merged_delta, properties=kwargs.get("properties", {}) + ) + deltacat_storage.commit_partition(partition) + logger.info(f"Committed final delta: {compacted_delta}") + print(f"Job run completed successfully!") + logger.info(f"Job run completed successfully!") + + new_compacted_delta_locator = DeltaLocator.of( + new_compacted_partition_locator, + compacted_delta.stream_position, + ) + bit_width_of_sort_keys = SortKey.validate_sort_keys( + source_partition_locator, + sort_keys, + deltacat_storage, + ) + new_round_completion_info = RoundCompletionInfo.of( + last_stream_position_to_compact, + new_compacted_delta_locator, + None, + bit_width_of_sort_keys, + None, + ) + rcf_source_partition_locator = source_partition_locator + round_completion_file_s3_url = None + round_completion_file_s3_url = rcf.write_round_completion_file( + compaction_artifact_s3_bucket, + rcf_source_partition_locator, + new_round_completion_info, + ) + return round_completion_file_s3_url From 5ea52cea01038b0280ad6dcbbdb77152c47c8374 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 7 Jun 2023 12:22:16 -0700 Subject: [PATCH 04/19] add rebase partition loactor to be compitable with discover deltas --- deltacat/compute/compactor/utils/repartition_session.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/deltacat/compute/compactor/utils/repartition_session.py b/deltacat/compute/compactor/utils/repartition_session.py index 8f5c8ab9..74fd0342 100644 --- a/deltacat/compute/compactor/utils/repartition_session.py +++ b/deltacat/compute/compactor/utils/repartition_session.py @@ -39,6 +39,8 @@ def repartition( repartition_args: Any, compaction_artifact_s3_bucket: str, last_stream_position_to_compact: int, + rebase_source_partition_locator: Optional[PartitionLocator] = None, + rebase_source_partition_high_watermark: Optional[int] = None, sort_keys: List[SortKey] = None, pg_config: Optional[PlacementGroupConfig] = None, list_deltas_kwargs: Optional[Dict[str, Any]] = None, @@ -74,9 +76,9 @@ def repartition( source_partition_locator, None, last_stream_position_to_compact, - None, - None, - None, + destination_partition_locator, + rebase_source_partition_locator, + rebase_source_partition_high_watermark, deltacat_storage, **list_deltas_kwargs, ) From 5d901d8d825616232c5cea57b644178dff59c7cd Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 7 Jun 2023 12:28:49 -0700 Subject: [PATCH 05/19] fix bug in record length assertion during repartition --- deltacat/compute/compactor/steps/repartition.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index d9b778ea..af864f2d 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -92,7 +92,7 @@ def _timed_repartition( range_deltas.append(range_delta) assert ( - range == total_record_count + range_table_length == total_record_count ), "Repartitioned table should have the same number of records as the original table" return RePartitionResult( range_deltas=range_deltas, From 81ad66e3f046fa7797af43dff4cc49d38ea588fa Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 7 Jun 2023 14:51:03 -0700 Subject: [PATCH 06/19] fix bug in getting repar result --- deltacat/compute/compactor/model/repartition_result.py | 4 ++-- deltacat/compute/compactor/steps/repartition.py | 5 +++-- deltacat/compute/compactor/utils/repartition_session.py | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/deltacat/compute/compactor/model/repartition_result.py b/deltacat/compute/compactor/model/repartition_result.py index c5be13c8..96ca233d 100644 --- a/deltacat/compute/compactor/model/repartition_result.py +++ b/deltacat/compute/compactor/model/repartition_result.py @@ -1,6 +1,6 @@ from typing import NamedTuple, List -from deltacat.compute.compactor import DeltaAnnotated +from deltacat.storage import Delta class RePartitionResult(NamedTuple): - range_deltas: List[DeltaAnnotated] + range_deltas: List[Delta] diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index af864f2d..9c20f416 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -18,6 +18,7 @@ from deltacat.utils.common import ReadKwargsProvider from deltacat.utils.performance import timed_invocation from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig +from deltacat.storage import Delta if importlib.util.find_spec("memray"): import memray @@ -78,7 +79,7 @@ def _timed_repartition( # iterate the items in partitioned_tables, concat all tables for the same key # and then write the result to the storage range_table_length = 0 - range_deltas: List[DeltaAnnotated] = [] + range_deltas: List[Delta] = [] for _, value in partitioned_tables.items(): if len(value) > 0: range_table = pa.concat_tables(value) @@ -93,7 +94,7 @@ def _timed_repartition( assert ( range_table_length == total_record_count - ), "Repartitioned table should have the same number of records as the original table" + ), f"Repartitioned table should have the same number of records {range_table_length} as the original table {total_record_count}" return RePartitionResult( range_deltas=range_deltas, ) diff --git a/deltacat/compute/compactor/utils/repartition_session.py b/deltacat/compute/compactor/utils/repartition_session.py index 74fd0342..bb3aa980 100644 --- a/deltacat/compute/compactor/utils/repartition_session.py +++ b/deltacat/compute/compactor/utils/repartition_session.py @@ -125,14 +125,15 @@ def repartition( ) logger.info(f"Getting {len(repar_tasks_pending)} task results...") repar_results: List[RePartitionResult] = ray.get(repar_tasks_pending) + repar_results: List[Delta] = [rp.range_deltas for rp in repar_results] # Transpose the list, filling in with None for shorter lists transposed = list(itertools.zip_longest(*repar_results, fillvalue=None)) # Flatten the list and remove None values - ordered_deltas: List[DeltaAnnotated] = [ + ordered_deltas: List[Delta] = [ i for sublist in transposed for i in sublist if i is not None ] repar_end = time.time() - print(f"repartition {repar_end - repar_start} seconds") + logger.info(f"repartition {repar_end - repar_start} seconds") logger.info(f"Got {len(ordered_deltas)} task results.") # ordered_deltas are ordered as [cold1, cold2, coldN, hot1, hot2, hotN] @@ -142,7 +143,6 @@ def repartition( ) deltacat_storage.commit_partition(partition) logger.info(f"Committed final delta: {compacted_delta}") - print(f"Job run completed successfully!") logger.info(f"Job run completed successfully!") new_compacted_delta_locator = DeltaLocator.of( From 6ea322785c236c3448461de46a7195999d241453 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 7 Jun 2023 15:01:23 -0700 Subject: [PATCH 07/19] remove print, finalize repartition interface --- deltacat/compute/compactor/utils/repartition_session.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/deltacat/compute/compactor/utils/repartition_session.py b/deltacat/compute/compactor/utils/repartition_session.py index bb3aa980..0e84c4b0 100644 --- a/deltacat/compute/compactor/utils/repartition_session.py +++ b/deltacat/compute/compactor/utils/repartition_session.py @@ -3,7 +3,6 @@ import logging from deltacat import logs from deltacat.utils.common import ReadKwargsProvider -import sungate as sg import functools import itertools from deltacat.compute.compactor import ( @@ -26,10 +25,10 @@ Delta, DeltaLocator, PartitionLocator, + interface as unimplemented_deltacat_storage, ) logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) -deltacat_storage = sg.andes def repartition( @@ -45,6 +44,7 @@ def repartition( pg_config: Optional[PlacementGroupConfig] = None, list_deltas_kwargs: Optional[Dict[str, Any]] = None, read_kwargs_provider: Optional[ReadKwargsProvider] = None, + deltacat_storage=unimplemented_deltacat_storage, **kwargs, ) -> Optional[str]: @@ -126,9 +126,7 @@ def repartition( logger.info(f"Getting {len(repar_tasks_pending)} task results...") repar_results: List[RePartitionResult] = ray.get(repar_tasks_pending) repar_results: List[Delta] = [rp.range_deltas for rp in repar_results] - # Transpose the list, filling in with None for shorter lists transposed = list(itertools.zip_longest(*repar_results, fillvalue=None)) - # Flatten the list and remove None values ordered_deltas: List[Delta] = [ i for sublist in transposed for i in sublist if i is not None ] @@ -144,7 +142,6 @@ def repartition( deltacat_storage.commit_partition(partition) logger.info(f"Committed final delta: {compacted_delta}") logger.info(f"Job run completed successfully!") - new_compacted_delta_locator = DeltaLocator.of( new_compacted_partition_locator, compacted_delta.stream_position, From 574f331b64a423f120635718e9b1eea80acbc682 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 7 Jun 2023 15:05:42 -0700 Subject: [PATCH 08/19] remove print, finalize repartition interface --- deltacat/compute/compactor/steps/repartition.py | 1 + 1 file changed, 1 insertion(+) diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index 9c20f416..0ba7c234 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -99,6 +99,7 @@ def _timed_repartition( range_deltas=range_deltas, ) else: + # Other repartition types, e.g., hash, key, list, column, etc are not supported yet raise NotImplementedError( f"Repartition type {repartition_type} is not supported." ) From e80249459edd3af1a31295c3c242cd7b12232967 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 7 Jun 2023 15:15:28 -0700 Subject: [PATCH 09/19] remove comment --- deltacat/compute/compactor/steps/repartition.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index 0ba7c234..1ec4f118 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -76,8 +76,6 @@ def _timed_repartition( ) ) # TODO(rootliu) set optimal or max number of records per file to defer the performance degradation due to too many small files - # iterate the items in partitioned_tables, concat all tables for the same key - # and then write the result to the storage range_table_length = 0 range_deltas: List[Delta] = [] for _, value in partitioned_tables.items(): From 434e4b8193079bf3f5edede7eee501218af8bd38 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 7 Jun 2023 18:02:57 -0700 Subject: [PATCH 10/19] address comments --- .../compactor/model/repartition_result.py | 2 +- .../compute/compactor/steps/repartition.py | 65 +++++-- .../compactor/utils/repartition_session.py | 168 ------------------ 3 files changed, 47 insertions(+), 188 deletions(-) delete mode 100644 deltacat/compute/compactor/utils/repartition_session.py diff --git a/deltacat/compute/compactor/model/repartition_result.py b/deltacat/compute/compactor/model/repartition_result.py index 96ca233d..532cfe58 100644 --- a/deltacat/compute/compactor/model/repartition_result.py +++ b/deltacat/compute/compactor/model/repartition_result.py @@ -2,5 +2,5 @@ from deltacat.storage import Delta -class RePartitionResult(NamedTuple): +class RepartitionResult(NamedTuple): range_deltas: List[Delta] diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index 1ec4f118..5697dac9 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -8,7 +8,7 @@ import ray from deltacat import logs from deltacat.compute.compactor import DeltaAnnotated -from deltacat.compute.compactor.model.repartition_result import RePartitionResult +from deltacat.compute.compactor.model.repartition_result import RepartitionResult from deltacat.storage import interface as unimplemented_deltacat_storage from deltacat.storage import Partition from deltacat.utils.ray_utils.runtime import ( @@ -19,6 +19,7 @@ from deltacat.utils.performance import timed_invocation from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig from deltacat.storage import Delta +from enum import Enum if importlib.util.find_spec("memray"): import memray @@ -26,17 +27,24 @@ logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) +class RepartitionType(str, Enum): + RANGE = "range" + HASH = "hash" + COLUMN = "none" + + def _timed_repartition( annotated_delta: DeltaAnnotated, - repartition_type: str, + repartition_type: RepartitionType, repartition_args: dict, destination_partition: Partition, enable_profiler: bool, read_kwargs_provider: Optional[ReadKwargsProvider], + repartitioned_file_content_type: ContentType = ContentType.PARQUET, deltacat_storage=unimplemented_deltacat_storage, -) -> RePartitionResult: +) -> RepartitionResult: - if repartition_type == "range": + if repartition_type == RepartitionType.RANGE: # A delta that is partitioned by range is partitioned in such a way # that each partition contains rows for which the partitioning expression value lies within a given range for a specified column. # For example, if the partitioning column is "date", the partition ranges are "2020-01-01" to "2020-01-10" and "2020-01-11" to "2020-01-20". @@ -53,7 +61,11 @@ def _timed_repartition( storage_type=StorageType.LOCAL, file_reader_kwargs_provider=read_kwargs_provider, ) - partitioned_tables = {i: [] for i in range(len(partition_ranges) + 1)} + # check if the column exists in the table + if not all(column in table.column_names for table in tables): + raise ValueError(f"Column {column} does not exist in the table") + + partitioned_tables = [[] for _ in range(len(partition_ranges) + 1)] total_record_count = 0 col_name_int64 = f"{column}_int64" for table in tables: @@ -63,20 +75,33 @@ def _timed_repartition( pa.field(col_name_int64, pa.int64()), pc.cast(table[column], pa.int64()), ) - for filter_value in partition_ranges: - partitioned_tables[partition_ranges.index(filter_value)].append( + # handle the partition for values less than or equal to the smallest value + partitioned_tables[0].append( + table_new.filter( + pc.field(col_name_int64) <= pc.scalar(partition_ranges[0]) + ) + ) + # Iterate over pairs of values in partition_ranges + for i, (lower_limit, upper_limit) in enumerate( + zip(partition_ranges[:-1], partition_ranges[1:]), start=1 + ): + # Add the table filtered by the lower and upper limits to partitioned_tables + partitioned_tables[i].append( table_new.filter( - (pc.field(col_name_int64) <= pc.scalar(filter_value)) + (pc.field(col_name_int64) > pc.scalar(lower_limit)) + & (pc.field(col_name_int64) <= pc.scalar(upper_limit)) ) ) - if filter_value == partition_ranges[-1]: - partitioned_tables[partition_ranges.index(filter_value)].append( - table_new.filter( - (pc.field(col_name_int64) > pc.scalar(filter_value)) - ) - ) + # handle the partition for values greater than the largest value + partitioned_tables[-1].append( + table_new.filter( + pc.field(col_name_int64) > pc.scalar(partition_ranges[-1]) + ) + ) + # TODO(rootliu) set optimal or max number of records per file to defer the performance degradation due to too many small files range_table_length = 0 + # After re-grouping the tables by specified ranges, for each group, we need concat and stage the tables range_deltas: List[Delta] = [] for _, value in partitioned_tables.items(): if len(value) > 0: @@ -86,14 +111,14 @@ def _timed_repartition( range_delta = deltacat_storage.stage_delta( range_table, destination_partition, - content_type=ContentType.PARQUET, + content_type=repartitioned_file_content_type, ) range_deltas.append(range_delta) assert ( range_table_length == total_record_count ), f"Repartitioned table should have the same number of records {range_table_length} as the original table {total_record_count}" - return RePartitionResult( + return RepartitionResult( range_deltas=range_deltas, ) else: @@ -107,13 +132,14 @@ def _timed_repartition( def repartition( annotated_delta: DeltaAnnotated, destination_partition: Partition, - repartition_type: str, + repartition_type: RepartitionType, repartition_args: dict, enable_profiler: bool, - metrics_config: MetricsConfig, + metrics_config: Optional[MetricsConfig], read_kwargs_provider: Optional[ReadKwargsProvider], + repartitioned_file_content_type: ContentType = ContentType.PARQUET, deltacat_storage=unimplemented_deltacat_storage, -) -> RePartitionResult: +) -> RepartitionResult: logger.info(f"Starting repartition task...") repartition_result, duration = timed_invocation( func=_timed_repartition, @@ -123,6 +149,7 @@ def repartition( destination_partition=destination_partition, enable_profiler=enable_profiler, read_kwargs_provider=read_kwargs_provider, + repartitioned_file_content_type=repartitioned_file_content_type, deltacat_storage=deltacat_storage, ) if metrics_config: diff --git a/deltacat/compute/compactor/utils/repartition_session.py b/deltacat/compute/compactor/utils/repartition_session.py deleted file mode 100644 index 0e84c4b0..00000000 --- a/deltacat/compute/compactor/utils/repartition_session.py +++ /dev/null @@ -1,168 +0,0 @@ -import ray -import time -import logging -from deltacat import logs -from deltacat.utils.common import ReadKwargsProvider -import functools -import itertools -from deltacat.compute.compactor import ( - RoundCompletionInfo, - SortKey, -) -from deltacat.compute.compactor import DeltaAnnotated -from deltacat.utils.ray_utils.concurrency import ( - invoke_parallel, - round_robin_options_provider, -) -from deltacat.compute.compactor.model.repartition_result import RePartitionResult -from deltacat.utils.placement import PlacementGroupConfig -from typing import List, Optional, Dict, Any -from deltacat.utils.ray_utils.runtime import live_node_resource_keys -from deltacat.compute.compactor.utils import io -from deltacat.compute.compactor.utils import round_completion_file as rcf -from deltacat.compute.compactor.steps import repartition as repar -from deltacat.storage import ( - Delta, - DeltaLocator, - PartitionLocator, - interface as unimplemented_deltacat_storage, -) - -logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) - - -def repartition( - source_partition_locator: PartitionLocator, - destination_partition_locator: PartitionLocator, - repartition_type: str, - repartition_args: Any, - compaction_artifact_s3_bucket: str, - last_stream_position_to_compact: int, - rebase_source_partition_locator: Optional[PartitionLocator] = None, - rebase_source_partition_high_watermark: Optional[int] = None, - sort_keys: List[SortKey] = None, - pg_config: Optional[PlacementGroupConfig] = None, - list_deltas_kwargs: Optional[Dict[str, Any]] = None, - read_kwargs_provider: Optional[ReadKwargsProvider] = None, - deltacat_storage=unimplemented_deltacat_storage, - **kwargs, -) -> Optional[str]: - - node_resource_keys = None - if pg_config: # use resource in each placement group - cluster_resources = pg_config.resource - cluster_cpus = cluster_resources["CPU"] - else: # use all cluster resource - cluster_resources = ray.cluster_resources() - logger.info(f"Total cluster resources: {cluster_resources}") - logger.info(f"Available cluster resources: {ray.available_resources()}") - cluster_cpus = int(cluster_resources["CPU"]) - logger.info(f"Total cluster CPUs: {cluster_cpus}") - node_resource_keys = live_node_resource_keys() - logger.info( - f"Found {len(node_resource_keys)} live cluster nodes: " - f"{node_resource_keys}" - ) - - # create a remote options provider to round-robin tasks across all nodes or allocated bundles - logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}") - round_robin_opt_provider = functools.partial( - round_robin_options_provider, - resource_keys=node_resource_keys, - pg_config=pg_config.opts if pg_config else None, - ) - - (deltas, _,) = io.discover_deltas( - source_partition_locator, - None, - last_stream_position_to_compact, - destination_partition_locator, - rebase_source_partition_locator, - rebase_source_partition_high_watermark, - deltacat_storage, - **list_deltas_kwargs, - ) - - uniform_deltas = [] - for delta in deltas: - # limit the input deltas to fit on this cluster and convert them to - # annotated deltas of equivalent size for easy parallel distribution - uniform_deltas_part = DeltaAnnotated.rebatch( - [DeltaAnnotated.of(delta)], - 200 * 2**20, # 200 MiB compressed data per task, - min_file_counts=1000, - ) - uniform_deltas.extend(uniform_deltas_part) - - logger.info(f"Retrieved a total of {len(uniform_deltas)} uniform deltas.") - - max_parallelism = cluster_cpus - # create a new stream for this round - compacted_stream_locator = destination_partition_locator.stream_locator - stream = deltacat_storage.get_stream( - compacted_stream_locator.namespace, - compacted_stream_locator.table_name, - compacted_stream_locator.table_version, - ) - partition = deltacat_storage.stage_partition( - stream, - destination_partition_locator.partition_values, - ) - new_compacted_partition_locator = partition.locator - repar_start = time.time() - repar_tasks_pending = invoke_parallel( - items=uniform_deltas, - ray_task=repar.repartition, - max_parallelism=max_parallelism, - options_provider=round_robin_opt_provider, - repartition_type=repartition_type, - repartition_args=repartition_args, - destination_partition=partition, - enable_profiler=False, - metrics_config=None, - read_kwargs_provider=read_kwargs_provider, - deltacat_storage=deltacat_storage, - ) - logger.info(f"Getting {len(repar_tasks_pending)} task results...") - repar_results: List[RePartitionResult] = ray.get(repar_tasks_pending) - repar_results: List[Delta] = [rp.range_deltas for rp in repar_results] - transposed = list(itertools.zip_longest(*repar_results, fillvalue=None)) - ordered_deltas: List[Delta] = [ - i for sublist in transposed for i in sublist if i is not None - ] - repar_end = time.time() - logger.info(f"repartition {repar_end - repar_start} seconds") - - logger.info(f"Got {len(ordered_deltas)} task results.") - # ordered_deltas are ordered as [cold1, cold2, coldN, hot1, hot2, hotN] - merged_delta = Delta.merge_deltas(ordered_deltas) - compacted_delta = deltacat_storage.commit_delta( - merged_delta, properties=kwargs.get("properties", {}) - ) - deltacat_storage.commit_partition(partition) - logger.info(f"Committed final delta: {compacted_delta}") - logger.info(f"Job run completed successfully!") - new_compacted_delta_locator = DeltaLocator.of( - new_compacted_partition_locator, - compacted_delta.stream_position, - ) - bit_width_of_sort_keys = SortKey.validate_sort_keys( - source_partition_locator, - sort_keys, - deltacat_storage, - ) - new_round_completion_info = RoundCompletionInfo.of( - last_stream_position_to_compact, - new_compacted_delta_locator, - None, - bit_width_of_sort_keys, - None, - ) - rcf_source_partition_locator = source_partition_locator - round_completion_file_s3_url = None - round_completion_file_s3_url = rcf.write_round_completion_file( - compaction_artifact_s3_bucket, - rcf_source_partition_locator, - new_round_completion_info, - ) - return round_completion_file_s3_url From a087eb0f6977397b516df609a020331485744afc Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 7 Jun 2023 20:33:57 -0700 Subject: [PATCH 11/19] address comments, fix bug in range partition --- .../compute/compactor/steps/repartition.py | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index 5697dac9..95a5d8c9 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -26,11 +26,19 @@ logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) +# Similar to Spark (https://sparkbyexamples.com/spark/spark-partitioning-understanding/), where +# partition helps in localizing the data and reduce the data shuffling across the network nodes reducing network latency +# which is a major component of the transformation operation thereby reducing the time of completion. +# Deltacat with Ray can support different partitioning strategies to reduce the data movement either across network or between compute and storage +# Note that the term partition here is different from the term used in catalog +# Type of Partition: +# Range Partition: It assigns rows to partitions based on column values falling within a given range, e.g., repartition(column="last_updated", ranges=['2023-01-01', '2023-02-01', '2023-03-01']), data will be split into 4 files +# Hash Partition: Hash Partitioning attempts to spread the data evenly across various partitions based on the key, e.g., repartition(column="last_updated", num_partitions=10), data will be split into 10 files evenly + class RepartitionType(str, Enum): RANGE = "range" HASH = "hash" - COLUMN = "none" def _timed_repartition( @@ -45,10 +53,6 @@ def _timed_repartition( ) -> RepartitionResult: if repartition_type == RepartitionType.RANGE: - # A delta that is partitioned by range is partitioned in such a way - # that each partition contains rows for which the partitioning expression value lies within a given range for a specified column. - # For example, if the partitioning column is "date", the partition ranges are "2020-01-01" to "2020-01-10" and "2020-01-11" to "2020-01-20". - # The partitioned delta will have two partitions, one for each range. column: str = repartition_args["column"] partition_ranges: List = repartition_args["ranges"] task_id = get_current_ray_task_id() @@ -64,8 +68,8 @@ def _timed_repartition( # check if the column exists in the table if not all(column in table.column_names for table in tables): raise ValueError(f"Column {column} does not exist in the table") - - partitioned_tables = [[] for _ in range(len(partition_ranges) + 1)] + # given a range [x, y, z], we need to split the table into 4 files, i.e., (-inf, x], (x, y], (y, z], (z, inf) + partitioned_tables_list = [[] for _ in range(len(partition_ranges) + 1)] total_record_count = 0 col_name_int64 = f"{column}_int64" for table in tables: @@ -76,7 +80,7 @@ def _timed_repartition( pc.cast(table[column], pa.int64()), ) # handle the partition for values less than or equal to the smallest value - partitioned_tables[0].append( + partitioned_tables_list[0].append( table_new.filter( pc.field(col_name_int64) <= pc.scalar(partition_ranges[0]) ) @@ -86,43 +90,42 @@ def _timed_repartition( zip(partition_ranges[:-1], partition_ranges[1:]), start=1 ): # Add the table filtered by the lower and upper limits to partitioned_tables - partitioned_tables[i].append( + partitioned_tables_list[i].append( table_new.filter( (pc.field(col_name_int64) > pc.scalar(lower_limit)) & (pc.field(col_name_int64) <= pc.scalar(upper_limit)) ) ) # handle the partition for values greater than the largest value - partitioned_tables[-1].append( + partitioned_tables_list[-1].append( table_new.filter( pc.field(col_name_int64) > pc.scalar(partition_ranges[-1]) ) ) # TODO(rootliu) set optimal or max number of records per file to defer the performance degradation due to too many small files - range_table_length = 0 + partition_table_length = 0 # After re-grouping the tables by specified ranges, for each group, we need concat and stage the tables - range_deltas: List[Delta] = [] - for _, value in partitioned_tables.items(): - if len(value) > 0: - range_table = pa.concat_tables(value) - if len(range_table) > 0: - range_table_length += len(range_table) - range_delta = deltacat_storage.stage_delta( - range_table, + partition_deltas: List[Delta] = [] + for partition_tables in partitioned_tables_list: + if len(partition_tables) > 0: + partition_table = pa.concat_tables(partition_tables) + if len(partition_table) > 0: + partition_table_length += len(partition_table) + partition_delta = deltacat_storage.stage_delta( + partition_table, destination_partition, content_type=repartitioned_file_content_type, ) - range_deltas.append(range_delta) + partition_deltas.append(partition_delta) assert ( - range_table_length == total_record_count - ), f"Repartitioned table should have the same number of records {range_table_length} as the original table {total_record_count}" + partition_table_length == total_record_count + ), f"Repartitioned table should have the same number of records {partition_table_length} as the original table {total_record_count}" return RepartitionResult( - range_deltas=range_deltas, + range_deltas=partition_deltas, ) else: - # Other repartition types, e.g., hash, key, list, column, etc are not supported yet raise NotImplementedError( f"Repartition type {repartition_type} is not supported." ) From db2cac8dd6c6942debafa56e4ea336a04451a753 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Fri, 9 Jun 2023 10:44:25 -0700 Subject: [PATCH 12/19] move repartition_session to compute/compactor --- .../compute/compactor/repartition_session.py | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 deltacat/compute/compactor/repartition_session.py diff --git a/deltacat/compute/compactor/repartition_session.py b/deltacat/compute/compactor/repartition_session.py new file mode 100644 index 00000000..51051fd5 --- /dev/null +++ b/deltacat/compute/compactor/repartition_session.py @@ -0,0 +1,174 @@ +import ray +import time +import logging +from deltacat import logs +from deltacat.utils.common import ReadKwargsProvider +import functools +import itertools +from deltacat.compute.compactor import ( + RoundCompletionInfo, + SortKey, +) +from deltacat.types.media import ContentType +from deltacat.compute.compactor import DeltaAnnotated +from deltacat.utils.ray_utils.concurrency import ( + invoke_parallel, + round_robin_options_provider, +) + +from deltacat.compute.compactor.model.repartition_result import RepartitionResult +from deltacat.utils.placement import PlacementGroupConfig +from typing import List, Optional, Dict, Any +from deltacat.utils.ray_utils.runtime import live_node_resource_keys +from deltacat.compute.compactor.utils import io +from deltacat.compute.compactor.utils import round_completion_file as rcf +from deltacat.compute.compactor.steps import repartition as repar +from deltacat.compute.compactor.steps.repartition import RepartitionType +from deltacat.storage import ( + Delta, + DeltaLocator, + PartitionLocator, + interface as unimplemented_deltacat_storage, +) +from deltacat.utils.metrics import MetricsConfig + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +def repartition( + source_partition_locator: PartitionLocator, + destination_partition_locator: PartitionLocator, + repartition_args: Any, + compaction_artifact_s3_bucket: str, + last_stream_position_to_compact: int, + repartition_type: RepartitionType = RepartitionType.RANGE, + rebase_source_partition_locator: Optional[PartitionLocator] = None, + rebase_source_partition_high_watermark: Optional[int] = None, + sort_keys: List[SortKey] = None, + min_file_count: int = 1000, + min_delta_bytes: int = 200 * 2**20, + repartitioned_file_content_type: ContentType = ContentType.PARQUET, + metrics_config: Optional[MetricsConfig] = None, + pg_config: Optional[PlacementGroupConfig] = None, + list_deltas_kwargs: Optional[Dict[str, Any]] = None, + read_kwargs_provider: Optional[ReadKwargsProvider] = None, + deltacat_storage=unimplemented_deltacat_storage, + **kwargs, +) -> Optional[str]: + + node_resource_keys = None + if pg_config: # use resource in each placement group + cluster_resources = pg_config.resource + cluster_cpus = cluster_resources["CPU"] + else: # use all cluster resource + cluster_resources = ray.cluster_resources() + logger.info(f"Total cluster resources: {cluster_resources}") + logger.info(f"Available cluster resources: {ray.available_resources()}") + cluster_cpus = int(cluster_resources["CPU"]) + logger.info(f"Total cluster CPUs: {cluster_cpus}") + node_resource_keys = live_node_resource_keys() + logger.info( + f"Found {len(node_resource_keys)} live cluster nodes: " + f"{node_resource_keys}" + ) + + # create a remote options provider to round-robin tasks across all nodes or allocated bundles + logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}") + round_robin_opt_provider = functools.partial( + round_robin_options_provider, + resource_keys=node_resource_keys, + pg_config=pg_config.opts if pg_config else None, + ) + + (deltas, _,) = io.discover_deltas( + source_partition_locator, + None, + last_stream_position_to_compact, + destination_partition_locator, + rebase_source_partition_locator, + rebase_source_partition_high_watermark, + deltacat_storage, + **list_deltas_kwargs, + ) + + uniform_deltas = [] + for delta in deltas: + uniform_deltas_part = DeltaAnnotated.rebatch( + [DeltaAnnotated.of(delta)], + min_delta_bytes=min_delta_bytes, + min_file_counts=min_file_count, + ) + uniform_deltas.extend(uniform_deltas_part) + + logger.info(f"Retrieved a total of {len(uniform_deltas)} uniform deltas.") + + max_parallelism = cluster_cpus + # create a new stream for this round + compacted_stream_locator = destination_partition_locator.stream_locator + stream = deltacat_storage.get_stream( + compacted_stream_locator.namespace, + compacted_stream_locator.table_name, + compacted_stream_locator.table_version, + ) + partition = deltacat_storage.stage_partition( + stream, + destination_partition_locator.partition_values, + ) + new_compacted_partition_locator = partition.locator + repar_start = time.time() + repar_tasks_pending = invoke_parallel( + items=uniform_deltas, + ray_task=repar.repartition, + max_parallelism=max_parallelism, + options_provider=round_robin_opt_provider, + repartition_type=repartition_type, + repartition_args=repartition_args, + destination_partition=partition, + enable_profiler=False, + metrics_config=metrics_config, + read_kwargs_provider=read_kwargs_provider, + repartitioned_file_content_type=repartitioned_file_content_type, + deltacat_storage=deltacat_storage, + ) + logger.info(f"Getting {len(repar_tasks_pending)} task results...") + repar_results: List[RepartitionResult] = ray.get(repar_tasks_pending) + repar_results: List[Delta] = [rp.range_deltas for rp in repar_results] + transposed = list(itertools.zip_longest(*repar_results, fillvalue=None)) + ordered_deltas: List[Delta] = [ + i for sublist in transposed for i in sublist if i is not None + ] + repar_end = time.time() + logger.info(f"repartition {repar_end - repar_start} seconds") + logger.info(f"Got {len(ordered_deltas)} task results.") + # ordered_deltas are ordered as [cold1, cold2, coldN, hot1, hot2, hotN] + merged_delta = Delta.merge_deltas(ordered_deltas) + compacted_delta = deltacat_storage.commit_delta( + merged_delta, properties=kwargs.get("properties", {}) + ) + deltacat_storage.commit_partition(partition) + logger.info(f"Committed final delta: {compacted_delta}") + logger.info(f"Job run completed successfully!") + new_compacted_delta_locator = DeltaLocator.of( + new_compacted_partition_locator, + compacted_delta.stream_position, + ) + bit_width_of_sort_keys = SortKey.validate_sort_keys( + source_partition_locator, + sort_keys, + deltacat_storage, + ) + new_round_completion_info = RoundCompletionInfo.of( + last_stream_position_to_compact, + new_compacted_delta_locator, + None, + bit_width_of_sort_keys, + None, + ) + rcf_source_partition_locator = source_partition_locator + round_completion_file_s3_url = None + round_completion_file_s3_url = rcf.write_round_completion_file( + compaction_artifact_s3_bucket, + rcf_source_partition_locator, + new_round_completion_info, + ) + return round_completion_file_s3_url From 48263428ecfb53b72ee35866ab05a99b2f29b872 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Mon, 12 Jun 2023 12:11:10 -0700 Subject: [PATCH 13/19] add support to normalize file size in repartition --- .../compute/compactor/repartition_session.py | 2 + .../compute/compactor/steps/repartition.py | 58 ++++++++++--------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/deltacat/compute/compactor/repartition_session.py b/deltacat/compute/compactor/repartition_session.py index 51051fd5..e74205fd 100644 --- a/deltacat/compute/compactor/repartition_session.py +++ b/deltacat/compute/compactor/repartition_session.py @@ -45,6 +45,7 @@ def repartition( rebase_source_partition_locator: Optional[PartitionLocator] = None, rebase_source_partition_high_watermark: Optional[int] = None, sort_keys: List[SortKey] = None, + records_per_repartitioned_file: int = 4_000_000, min_file_count: int = 1000, min_delta_bytes: int = 200 * 2**20, repartitioned_file_content_type: ContentType = ContentType.PARQUET, @@ -123,6 +124,7 @@ def repartition( options_provider=round_robin_opt_provider, repartition_type=repartition_type, repartition_args=repartition_args, + max_records_per_output_file=records_per_repartitioned_file, destination_partition=partition, enable_profiler=False, metrics_config=metrics_config, diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index 95a5d8c9..655b13e2 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -26,14 +26,16 @@ logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) -# Similar to Spark (https://sparkbyexamples.com/spark/spark-partitioning-understanding/), where -# partition helps in localizing the data and reduce the data shuffling across the network nodes reducing network latency -# which is a major component of the transformation operation thereby reducing the time of completion. -# Deltacat with Ray can support different partitioning strategies to reduce the data movement either across network or between compute and storage -# Note that the term partition here is different from the term used in catalog -# Type of Partition: -# Range Partition: It assigns rows to partitions based on column values falling within a given range, e.g., repartition(column="last_updated", ranges=['2023-01-01', '2023-02-01', '2023-03-01']), data will be split into 4 files -# Hash Partition: Hash Partitioning attempts to spread the data evenly across various partitions based on the key, e.g., repartition(column="last_updated", num_partitions=10), data will be split into 10 files evenly +""" +Similar to Spark (https://sparkbyexamples.com/spark/spark-partitioning-understanding/), where +partition helps in localizing the data and reduce the data shuffling across the network nodes reducing network latency +which is a major component of the transformation operation thereby reducing the time of completion. +Deltacat with Ray can support different partitioning strategies to reduce the data movement either across network or between compute and storage +Note that the term partition here is different from the term used in catalog +Type of Partition: +Range Partition: It assigns rows to partitions based on column values falling within a given range, e.g., repartition(column="last_updated", ranges=['2023-01-01', '2023-02-01', '2023-03-01']), data will be split into 4 files +Hash Partition: Hash Partitioning attempts to spread the data evenly across various partitions based on the key, e.g., repartition(column="last_updated", num_partitions=10), data will be split into 10 files evenly +""" class RepartitionType(str, Enum): @@ -41,11 +43,19 @@ class RepartitionType(str, Enum): HASH = "hash" +def generate_unique_name(base_name, existing_names): + counter = 1 + while base_name + str(counter) in existing_names: + counter += 1 + return base_name + str(counter) + + def _timed_repartition( annotated_delta: DeltaAnnotated, + destination_partition: Partition, repartition_type: RepartitionType, repartition_args: dict, - destination_partition: Partition, + max_records_per_output_file: int, enable_profiler: bool, read_kwargs_provider: Optional[ReadKwargsProvider], repartitioned_file_content_type: ContentType = ContentType.PARQUET, @@ -69,9 +79,13 @@ def _timed_repartition( if not all(column in table.column_names for table in tables): raise ValueError(f"Column {column} does not exist in the table") # given a range [x, y, z], we need to split the table into 4 files, i.e., (-inf, x], (x, y], (y, z], (z, inf) + partition_ranges.sort() partitioned_tables_list = [[] for _ in range(len(partition_ranges) + 1)] total_record_count = 0 col_name_int64 = f"{column}_int64" + col_name_int64 = generate_unique_name( + col_name_int64, tables[0].schema.names + ) for table in tables: total_record_count += len(table) table_new = table.add_column( @@ -79,31 +93,20 @@ def _timed_repartition( pa.field(col_name_int64, pa.int64()), pc.cast(table[column], pa.int64()), ) - # handle the partition for values less than or equal to the smallest value - partitioned_tables_list[0].append( - table_new.filter( - pc.field(col_name_int64) <= pc.scalar(partition_ranges[0]) - ) - ) + # Adjust the partition ranges to include -Inf and +Inf + partition_ranges = [-float("Inf")] + partition_ranges + [float("Inf")] + # Iterate over pairs of values in partition_ranges for i, (lower_limit, upper_limit) in enumerate( - zip(partition_ranges[:-1], partition_ranges[1:]), start=1 + zip(partition_ranges[:-1], partition_ranges[1:]), start=0 ): - # Add the table filtered by the lower and upper limits to partitioned_tables + # Add the table filtered by the lower and upper limits to partitioned_tables_list partitioned_tables_list[i].append( table_new.filter( (pc.field(col_name_int64) > pc.scalar(lower_limit)) & (pc.field(col_name_int64) <= pc.scalar(upper_limit)) ) ) - # handle the partition for values greater than the largest value - partitioned_tables_list[-1].append( - table_new.filter( - pc.field(col_name_int64) > pc.scalar(partition_ranges[-1]) - ) - ) - - # TODO(rootliu) set optimal or max number of records per file to defer the performance degradation due to too many small files partition_table_length = 0 # After re-grouping the tables by specified ranges, for each group, we need concat and stage the tables partition_deltas: List[Delta] = [] @@ -115,6 +118,7 @@ def _timed_repartition( partition_delta = deltacat_storage.stage_delta( partition_table, destination_partition, + max_records_per_entry=max_records_per_output_file, content_type=repartitioned_file_content_type, ) partition_deltas.append(partition_delta) @@ -137,6 +141,7 @@ def repartition( destination_partition: Partition, repartition_type: RepartitionType, repartition_args: dict, + max_records_per_output_file: int, enable_profiler: bool, metrics_config: Optional[MetricsConfig], read_kwargs_provider: Optional[ReadKwargsProvider], @@ -147,9 +152,10 @@ def repartition( repartition_result, duration = timed_invocation( func=_timed_repartition, annotated_delta=annotated_delta, + destination_partition=destination_partition, repartition_type=repartition_type, repartition_args=repartition_args, - destination_partition=destination_partition, + max_records_per_output_file=max_records_per_output_file, enable_profiler=enable_profiler, read_kwargs_provider=read_kwargs_provider, repartitioned_file_content_type=repartitioned_file_content_type, From 8fed394afbdc15feca28dd0a13e930430939220c Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Mon, 12 Jun 2023 14:08:07 -0700 Subject: [PATCH 14/19] fix generic type issue, i.e., Dataset is not a generic type --- deltacat/storage/model/types.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deltacat/storage/model/types.py b/deltacat/storage/model/types.py index b877b7f1..72fa2cb2 100644 --- a/deltacat/storage/model/types.py +++ b/deltacat/storage/model/types.py @@ -9,7 +9,8 @@ LocalTable = Union[pa.Table, pd.DataFrame, np.ndarray] LocalDataset = List[LocalTable] -DistributedDataset = Dataset[Union[ArrowRow, np.ndarray, Any]] +# DistributedDataset = Dataset[Union[ArrowRow, np.ndarray, Any]] +DistributedDataset = Union[Dataset[ArrowRow], Dataset[np.ndarray], Dataset[Any]] class DeltaType(str, Enum): From 7a6d90f4a5f71ae6a24201a2976c13f609b48146 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Mon, 12 Jun 2023 14:47:05 -0700 Subject: [PATCH 15/19] use ray dataset directly for distributeddataset --- deltacat/storage/model/types.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/deltacat/storage/model/types.py b/deltacat/storage/model/types.py index 72fa2cb2..d863823a 100644 --- a/deltacat/storage/model/types.py +++ b/deltacat/storage/model/types.py @@ -1,16 +1,17 @@ from enum import Enum -from typing import Any, List, Union +from typing import List, Union import numpy as np import pandas as pd import pyarrow as pa -from ray.data._internal.arrow_block import ArrowRow + +# from ray.data._internal.arrow_block import ArrowRow from ray.data.dataset import Dataset LocalTable = Union[pa.Table, pd.DataFrame, np.ndarray] LocalDataset = List[LocalTable] # DistributedDataset = Dataset[Union[ArrowRow, np.ndarray, Any]] -DistributedDataset = Union[Dataset[ArrowRow], Dataset[np.ndarray], Dataset[Any]] +DistributedDataset = Dataset class DeltaType(str, Enum): From 7d6fbd1474e863df9077443c29487b55cdcd2c02 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Mon, 12 Jun 2023 14:55:31 -0700 Subject: [PATCH 16/19] add ray version checker to determine how to use Dataset class --- deltacat/storage/model/types.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/deltacat/storage/model/types.py b/deltacat/storage/model/types.py index d863823a..db5f7dca 100644 --- a/deltacat/storage/model/types.py +++ b/deltacat/storage/model/types.py @@ -1,17 +1,23 @@ from enum import Enum -from typing import List, Union +from typing import List, Union, Any import numpy as np import pandas as pd import pyarrow as pa - -# from ray.data._internal.arrow_block import ArrowRow +import pkg_resources +from ray.data._internal.arrow_block import ArrowRow from ray.data.dataset import Dataset LocalTable = Union[pa.Table, pd.DataFrame, np.ndarray] LocalDataset = List[LocalTable] -# DistributedDataset = Dataset[Union[ArrowRow, np.ndarray, Any]] -DistributedDataset = Dataset +# Starting Ray 2.5.0, Dataset follows a strict mode (https://docs.ray.io/en/latest/data/faq.html#migrating-to-strict-mode), +# and generic annotation is removed. So add a version checker to determine whether to use the old or new definition. +ray_version = pkg_resources.parse_version(pkg_resources.get_distribution("ray").version) +change_version = pkg_resources.parse_version("2.5.0") +if ray_version < change_version: + DistributedDataset = Dataset[Union[ArrowRow, np.ndarray, Any]] +else: # This is the new definition + DistributedDataset = Dataset # Adjust this based on how you want to use Dataset class DeltaType(str, Enum): From f1e1adb009a2772162f9908505288815837a6e5f Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Mon, 12 Jun 2023 14:55:42 -0700 Subject: [PATCH 17/19] add ray version checker to determine how to use Dataset class --- deltacat/storage/model/types.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deltacat/storage/model/types.py b/deltacat/storage/model/types.py index db5f7dca..9e0db24a 100644 --- a/deltacat/storage/model/types.py +++ b/deltacat/storage/model/types.py @@ -16,8 +16,8 @@ change_version = pkg_resources.parse_version("2.5.0") if ray_version < change_version: DistributedDataset = Dataset[Union[ArrowRow, np.ndarray, Any]] -else: # This is the new definition - DistributedDataset = Dataset # Adjust this based on how you want to use Dataset +else: + DistributedDataset = Dataset class DeltaType(str, Enum): From a407dc9a599a1f85ee5bad67bbbdd48776cac661 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Mon, 12 Jun 2023 16:47:17 -0700 Subject: [PATCH 18/19] added todo, removed hard coded arg --- deltacat/compute/compactor/repartition_session.py | 4 +++- deltacat/compute/compactor/steps/repartition.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/deltacat/compute/compactor/repartition_session.py b/deltacat/compute/compactor/repartition_session.py index e74205fd..cd5061e9 100644 --- a/deltacat/compute/compactor/repartition_session.py +++ b/deltacat/compute/compactor/repartition_session.py @@ -35,6 +35,7 @@ logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) +# TODO:(rootliu) move this repartition function to a separate module under compute def repartition( source_partition_locator: PartitionLocator, destination_partition_locator: PartitionLocator, @@ -49,6 +50,7 @@ def repartition( min_file_count: int = 1000, min_delta_bytes: int = 200 * 2**20, repartitioned_file_content_type: ContentType = ContentType.PARQUET, + enable_profiler: bool = False, metrics_config: Optional[MetricsConfig] = None, pg_config: Optional[PlacementGroupConfig] = None, list_deltas_kwargs: Optional[Dict[str, Any]] = None, @@ -126,7 +128,7 @@ def repartition( repartition_args=repartition_args, max_records_per_output_file=records_per_repartitioned_file, destination_partition=partition, - enable_profiler=False, + enable_profiler=enable_profiler, metrics_config=metrics_config, read_kwargs_provider=read_kwargs_provider, repartitioned_file_content_type=repartitioned_file_content_type, diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index 655b13e2..1b7f9b5b 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -75,6 +75,7 @@ def _timed_repartition( storage_type=StorageType.LOCAL, file_reader_kwargs_provider=read_kwargs_provider, ) + # TODO: (rootliu) design a better way to handle the case when the column does not exist in the table, e.g., backfill + repartition by stream position # check if the column exists in the table if not all(column in table.column_names for table in tables): raise ValueError(f"Column {column} does not exist in the table") From fb6fd7dc50bd4041a7e050ad3c002db41aa73803 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Mon, 12 Jun 2023 17:11:01 -0700 Subject: [PATCH 19/19] fix bug in partition ranges --- deltacat/compute/compactor/steps/repartition.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/deltacat/compute/compactor/steps/repartition.py b/deltacat/compute/compactor/steps/repartition.py index 1b7f9b5b..86b6ad40 100644 --- a/deltacat/compute/compactor/steps/repartition.py +++ b/deltacat/compute/compactor/steps/repartition.py @@ -81,7 +81,8 @@ def _timed_repartition( raise ValueError(f"Column {column} does not exist in the table") # given a range [x, y, z], we need to split the table into 4 files, i.e., (-inf, x], (x, y], (y, z], (z, inf) partition_ranges.sort() - partitioned_tables_list = [[] for _ in range(len(partition_ranges) + 1)] + partition_ranges = [-float("Inf")] + partition_ranges + [float("Inf")] + partitioned_tables_list = [[] for _ in range(len(partition_ranges) - 1)] total_record_count = 0 col_name_int64 = f"{column}_int64" col_name_int64 = generate_unique_name( @@ -94,9 +95,6 @@ def _timed_repartition( pa.field(col_name_int64, pa.int64()), pc.cast(table[column], pa.int64()), ) - # Adjust the partition ranges to include -Inf and +Inf - partition_ranges = [-float("Inf")] + partition_ranges + [float("Inf")] - # Iterate over pairs of values in partition_ranges for i, (lower_limit, upper_limit) in enumerate( zip(partition_ranges[:-1], partition_ranges[1:]), start=0