Skip to content

Commit

Permalink
[FEAT] Add support for pyiceberg v0.7 (#2594)
Browse files Browse the repository at this point in the history
PyIceberg v0.7.0 was [just
released](https://github.com/apache/iceberg-python/releases/tag/pyiceberg-0.7.0).
One of the new changes is the Transaction API, which replaces some of
the private functions that we have been using. This PR adds support for
those changes
  • Loading branch information
kevinzwang authored Aug 1, 2024
1 parent 75b011d commit 73138c9
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
matrix:
python-version: ['3.8', '3.10']
daft-runner: [py, ray]
pyarrow-version: [7.0.0, 13.0.0]
pyarrow-version: [7.0.0, 15.0.0]
enable-aqe: [0, 1]
os: [ubuntu-20.04, windows-latest]
exclude:
Expand Down
2 changes: 1 addition & 1 deletion benchmarking/parquet/benchmark-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pytest==7.4.0
pytest-benchmark==4.0.0
pytest-memray==1.4.1
pyarrow==13.0.0
pyarrow==15.0.0
boto3==1.28.3
57 changes: 41 additions & 16 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,25 +521,14 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
f"Write Iceberg is only supported on pyarrow>=12.0.1, found {pa.__version__}. See this issue for more information: https://github.com/apache/arrow/issues/37054#issuecomment-1668644887"
)

from pyiceberg.table import _MergingSnapshotProducer
from pyiceberg.table.snapshots import Operation
if mode not in ["append", "overwrite"]:
raise ValueError(f"Only support `append` or `overwrite` mode. {mode} is unsupported")

operations = []
path = []
rows = []
size = []

if mode == "append":
operation = Operation.APPEND
elif mode == "overwrite":
operation = Operation.OVERWRITE
else:
raise ValueError(f"Only support `append` or `overwrite` mode. {mode} is unsupported")

# We perform the merge here since table is not pickle-able
# We should be able to move to a transaction API for iceberg 0.7.0
merge = _MergingSnapshotProducer(operation=operation, table=table)

builder = self._builder.write_iceberg(table)
write_df = DataFrame(builder)
write_df.collect()
Expand All @@ -548,13 +537,12 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
assert "data_file" in write_result
data_files = write_result["data_file"]

if operation == Operation.OVERWRITE:
if mode == "overwrite":
deleted_files = table.scan().plan_files()
else:
deleted_files = []

for data_file in data_files:
merge.append_data_file(data_file)
operations.append("ADD")
path.append(data_file.file_path)
rows.append(data_file.record_count)
Expand All @@ -567,7 +555,44 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
rows.append(data_file.record_count)
size.append(data_file.file_size_in_bytes)

merge.commit()
if parse(pyiceberg.__version__) >= parse("0.7.0"):
from pyiceberg.table import ALWAYS_TRUE, PropertyUtil, TableProperties

tx = table.transaction()

if mode == "overwrite":
tx.delete(delete_filter=ALWAYS_TRUE)

update_snapshot = tx.update_snapshot()

manifest_merge_enabled = mode == "append" and PropertyUtil.property_as_bool(
tx.table_metadata.properties,
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)

append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append

with append_method() as append_files:
for data_file in data_files:
append_files.append_data_file(data_file)

tx.commit_transaction()
else:
from pyiceberg.table import _MergingSnapshotProducer
from pyiceberg.table.snapshots import Operation

operations_map = {
"append": Operation.APPEND,
"overwrite": Operation.OVERWRITE,
}

merge = _MergingSnapshotProducer(operation=operations_map[mode], table=table)

for data_file in data_files:
merge.append_data_file(data_file)

merge.commit()

from daft import from_pydict

Expand Down
63 changes: 43 additions & 20 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,10 @@ def write_iceberg(
spec_id: int | None,
io_config: IOConfig | None = None,
):
import pyiceberg
from packaging.version import parse
from pyiceberg.io.pyarrow import (
compute_statistics_plan,
fill_parquet_file_metadata,
parquet_path_to_id_mapping,
schema_to_pyarrow,
)
Expand All @@ -582,28 +583,50 @@ def file_visitor(written_file, protocol=protocol):
file_path = f"{protocol}://{written_file.path}"
size = written_file.size
metadata = written_file.metadata
# TODO Version guard pyarrow version
data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=IcebergFileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=size,

kwargs = {
"content": DataFileContent.DATA,
"file_path": file_path,
"file_format": IcebergFileFormat.PARQUET,
"partition": Record(),
"file_size_in_bytes": size,
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# "sort_order_id": task.sort_order_id,
"sort_order_id": None,
# Just copy these from the table for now
spec_id=spec_id,
equality_ids=None,
key_metadata=None,
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
"spec_id": spec_id,
"equality_ids": None,
"key_metadata": None,
}

if parse(pyiceberg.__version__) >= parse("0.7.0"):
from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata

statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)

data_file = DataFile(
**{
**kwargs,
**statistics.to_serialized_dict(),
}
)
else:
from pyiceberg.io.pyarrow import fill_parquet_file_metadata

data_file = DataFile(**kwargs)

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)

data_files.append(data_file)

is_local_fs = canonicalized_protocol == "file"
Expand Down
4 changes: 2 additions & 2 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ opencv-python==4.8.1.78
tiktoken==0.7.0

# Pyarrow
pyarrow==13.0.0
pyarrow==15.0.0
# Ray
ray[data, client]==2.7.1; python_version < '3.8'
ray[data, client]==2.10.0; python_version >= '3.8'
Expand All @@ -44,7 +44,7 @@ ray[data, client]==2.10.0; python_version >= '3.8'
lancedb>=0.6.10; python_version >= '3.8'

#Iceberg
pyiceberg==0.6.0; python_version >= '3.8'
pyiceberg==0.7.0; python_version >= '3.8'
tenacity==8.2.3; python_version >= '3.8'

# Delta Lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ uvicorn==0.23.2
uvloop==0.17.0
watchfiles==0.19.0
websockets==11.0.3
pyarrow==13.0.0
pyarrow==15.0.0
slowapi==0.1.8

# Pin numpy version otherwise pyarrow doesn't work
Expand Down

0 comments on commit 73138c9

Please sign in to comment.