From 73138c9145116367531af4df1189c9481b5a6e23 Mon Sep 17 00:00:00 2001 From: Kev Wang Date: Thu, 1 Aug 2024 14:28:04 -0700 Subject: [PATCH] [FEAT] Add support for pyiceberg v0.7 (#2594) PyIceberg v0.7.0 was [just released](https://github.com/apache/iceberg-python/releases/tag/pyiceberg-0.7.0). One of the new changes is the Transaction API, which replaces some of the private functions that we have been using. This PR adds support for those changes --- .github/workflows/python-package.yml | 2 +- .../parquet/benchmark-requirements.txt | 2 +- daft/dataframe/dataframe.py | 57 ++++++++++++----- daft/table/table_io.py | 63 +++++++++++++------ requirements-dev.txt | 4 +- .../retry-server-requirements.txt | 2 +- 6 files changed, 89 insertions(+), 41 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index de22145ae5..2adbcb86e5 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -22,7 +22,7 @@ jobs: matrix: python-version: ['3.8', '3.10'] daft-runner: [py, ray] - pyarrow-version: [7.0.0, 13.0.0] + pyarrow-version: [7.0.0, 15.0.0] enable-aqe: [0, 1] os: [ubuntu-20.04, windows-latest] exclude: diff --git a/benchmarking/parquet/benchmark-requirements.txt b/benchmarking/parquet/benchmark-requirements.txt index a308fcd5f5..be36e198de 100644 --- a/benchmarking/parquet/benchmark-requirements.txt +++ b/benchmarking/parquet/benchmark-requirements.txt @@ -1,5 +1,5 @@ pytest==7.4.0 pytest-benchmark==4.0.0 pytest-memray==1.4.1 -pyarrow==13.0.0 +pyarrow==15.0.0 boto3==1.28.3 diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index c5a1948bf0..d90460de3e 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -521,25 +521,14 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") -> f"Write Iceberg is only supported on pyarrow>=12.0.1, found {pa.__version__}. See this issue for more information: https://github.com/apache/arrow/issues/37054#issuecomment-1668644887" ) - from pyiceberg.table import _MergingSnapshotProducer - from pyiceberg.table.snapshots import Operation + if mode not in ["append", "overwrite"]: + raise ValueError(f"Only support `append` or `overwrite` mode. {mode} is unsupported") operations = [] path = [] rows = [] size = [] - if mode == "append": - operation = Operation.APPEND - elif mode == "overwrite": - operation = Operation.OVERWRITE - else: - raise ValueError(f"Only support `append` or `overwrite` mode. {mode} is unsupported") - - # We perform the merge here since table is not pickle-able - # We should be able to move to a transaction API for iceberg 0.7.0 - merge = _MergingSnapshotProducer(operation=operation, table=table) - builder = self._builder.write_iceberg(table) write_df = DataFrame(builder) write_df.collect() @@ -548,13 +537,12 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") -> assert "data_file" in write_result data_files = write_result["data_file"] - if operation == Operation.OVERWRITE: + if mode == "overwrite": deleted_files = table.scan().plan_files() else: deleted_files = [] for data_file in data_files: - merge.append_data_file(data_file) operations.append("ADD") path.append(data_file.file_path) rows.append(data_file.record_count) @@ -567,7 +555,44 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") -> rows.append(data_file.record_count) size.append(data_file.file_size_in_bytes) - merge.commit() + if parse(pyiceberg.__version__) >= parse("0.7.0"): + from pyiceberg.table import ALWAYS_TRUE, PropertyUtil, TableProperties + + tx = table.transaction() + + if mode == "overwrite": + tx.delete(delete_filter=ALWAYS_TRUE) + + update_snapshot = tx.update_snapshot() + + manifest_merge_enabled = mode == "append" and PropertyUtil.property_as_bool( + tx.table_metadata.properties, + TableProperties.MANIFEST_MERGE_ENABLED, + TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, + ) + + append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append + + with append_method() as append_files: + for data_file in data_files: + append_files.append_data_file(data_file) + + tx.commit_transaction() + else: + from pyiceberg.table import _MergingSnapshotProducer + from pyiceberg.table.snapshots import Operation + + operations_map = { + "append": Operation.APPEND, + "overwrite": Operation.OVERWRITE, + } + + merge = _MergingSnapshotProducer(operation=operations_map[mode], table=table) + + for data_file in data_files: + merge.append_data_file(data_file) + + merge.commit() from daft import from_pydict diff --git a/daft/table/table_io.py b/daft/table/table_io.py index a677997a72..e0282709d7 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -557,9 +557,10 @@ def write_iceberg( spec_id: int | None, io_config: IOConfig | None = None, ): + import pyiceberg + from packaging.version import parse from pyiceberg.io.pyarrow import ( compute_statistics_plan, - fill_parquet_file_metadata, parquet_path_to_id_mapping, schema_to_pyarrow, ) @@ -582,28 +583,50 @@ def file_visitor(written_file, protocol=protocol): file_path = f"{protocol}://{written_file.path}" size = written_file.size metadata = written_file.metadata - # TODO Version guard pyarrow version - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=IcebergFileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=size, + + kwargs = { + "content": DataFileContent.DATA, + "file_path": file_path, + "file_format": IcebergFileFormat.PARQUET, + "partition": Record(), + "file_size_in_bytes": size, # After this has been fixed: # https://github.com/apache/iceberg-python/issues/271 - # sort_order_id=task.sort_order_id, - sort_order_id=None, + # "sort_order_id": task.sort_order_id, + "sort_order_id": None, # Just copy these from the table for now - spec_id=spec_id, - equality_ids=None, - key_metadata=None, - ) - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=metadata, - stats_columns=compute_statistics_plan(schema, properties), - parquet_column_mapping=parquet_path_to_id_mapping(schema), - ) + "spec_id": spec_id, + "equality_ids": None, + "key_metadata": None, + } + + if parse(pyiceberg.__version__) >= parse("0.7.0"): + from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + + data_file = DataFile( + **{ + **kwargs, + **statistics.to_serialized_dict(), + } + ) + else: + from pyiceberg.io.pyarrow import fill_parquet_file_metadata + + data_file = DataFile(**kwargs) + + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_files.append(data_file) is_local_fs = canonicalized_protocol == "file" diff --git a/requirements-dev.txt b/requirements-dev.txt index 99a65c96fe..bf3febc711 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -35,7 +35,7 @@ opencv-python==4.8.1.78 tiktoken==0.7.0 # Pyarrow -pyarrow==13.0.0 +pyarrow==15.0.0 # Ray ray[data, client]==2.7.1; python_version < '3.8' ray[data, client]==2.10.0; python_version >= '3.8' @@ -44,7 +44,7 @@ ray[data, client]==2.10.0; python_version >= '3.8' lancedb>=0.6.10; python_version >= '3.8' #Iceberg -pyiceberg==0.6.0; python_version >= '3.8' +pyiceberg==0.7.0; python_version >= '3.8' tenacity==8.2.3; python_version >= '3.8' # Delta Lake diff --git a/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt b/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt index fc550f4eb1..d430841e5e 100644 --- a/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt +++ b/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt @@ -17,7 +17,7 @@ uvicorn==0.23.2 uvloop==0.17.0 watchfiles==0.19.0 websockets==11.0.3 -pyarrow==13.0.0 +pyarrow==15.0.0 slowapi==0.1.8 # Pin numpy version otherwise pyarrow doesn't work