From b19497f59868105ac6c77b7c1536766bd95896ed Mon Sep 17 00:00:00 2001 From: Stephen Buck Date: Mon, 1 Jun 2026 18:02:58 -0500 Subject: [PATCH 1/2] Implement write.parquet.row-group-size-bytes in the pyarrow writer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pyiceberg writer has historically ignored write.parquet.row-group-size-bytes (logging 'not implemented') and used only write.parquet.row-group-limit (rows). For wide tables that means a single row group ends up at gigabytes — e.g. 337 cols × 1,048,576 default rows ≈ 1.7 GiB uncompressed per row group — which drives the polars / pyarrow reader's decode peak into the tens of GiB on production reads. Now write_file resolves row_group_size as min(row_group_limit, row_group_size_bytes / bytes_per_row), where bytes_per_row is approximated from the in-memory arrow_table's nbytes. This matches Spark / parquet-mr 'whichever limit fires first' semantics and lets the existing PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT (128 MiB) actually take effect. --- pyiceberg/io/pyarrow.py | 19 ++++- tests/integration/test_writes/test_writes.py | 1 - tests/io/test_pyarrow.py | 86 ++++++++++++++++++++ 3 files changed, 103 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 522af0f344..6e3751ce5d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2404,15 +2404,30 @@ def data_file_statistics_from_parquet_metadata( ) +def _resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int | None, row_group_size_bytes: int | None) -> int | None: + if not row_group_size_bytes or arrow_table.num_rows == 0: + return row_group_limit + bytes_per_row = max(1, arrow_table.nbytes // arrow_table.num_rows) + rows_for_byte_budget = max(1, row_group_size_bytes // bytes_per_row) + if row_group_limit is None: + return rows_for_byte_budget + return min(row_group_limit, rows_for_byte_budget) + + def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) - row_group_size = property_as_int( + row_group_limit = property_as_int( properties=table_metadata.properties, property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT, default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT, ) + row_group_size_bytes = property_as_int( + properties=table_metadata.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, + default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, + ) location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties) def write_parquet(task: WriteTask) -> DataFile: @@ -2436,6 +2451,7 @@ def write_parquet(task: WriteTask) -> DataFile: for batch in task.record_batches ] arrow_table = pa.Table.from_batches(batches) + row_group_size = _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes) file_path = location_provider.new_data_location( data_file_name=task.generate_data_file_filename("parquet"), partition_key=task.partition_key, @@ -2564,7 +2580,6 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: from pyiceberg.table import TableProperties for key_pattern in [ - TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES, f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*", ]: diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 46d54f0491..f82f3dc39b 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -680,7 +680,6 @@ def test_write_parquet_other_properties( @pytest.mark.parametrize( "properties", [ - {"write.parquet.row-group-size-bytes": "42"}, {"write.parquet.bloom-filter-enabled.column.bool": "42"}, {"write.parquet.bloom-filter-max-bytes": "42"}, ], diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e90f3a46fc..1d3aa02736 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -20,6 +20,7 @@ import tempfile import uuid from datetime import date +from pathlib import Path from typing import Any, List, Optional from unittest.mock import MagicMock, patch from uuid import uuid4 @@ -67,6 +68,7 @@ _determine_partitions, _primitive_to_physical, _read_deletes, + _resolve_row_group_size, _to_requested_schema, bin_pack_arrow_table, compute_statistics_plan, @@ -74,6 +76,7 @@ expression_to_pyarrow, parquet_path_to_id_mapping, schema_to_pyarrow, + write_file, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.partitioning import PartitionField, PartitionSpec @@ -2319,3 +2322,86 @@ def test_pyarrow_io_multi_fs() -> None: # Same PyArrowFileIO instance resolves local file input to LocalFileSystem assert isinstance(pyarrow_file_io.new_input("file:///path/to/file")._filesystem, LocalFileSystem) + + +@pytest.mark.parametrize( + "arrow_table,row_group_limit,row_group_size_bytes,expected", + [ + # Byte limit tighter than row limit — 2 int64 cols => 16 bytes/row, + # 1024-byte budget => 64 rows/group. + (pa.table({"a": list(range(1000)), "b": list(range(1000))}), 10_000, 1024, 64), + # Row limit tighter than byte limit. + (pa.table({"a": list(range(1000))}), 10, 10**9, 10), + # Byte limit disabled (0) falls back to the row limit. + (pa.table({"a": list(range(1000))}), 500, 0, 500), + # Empty input falls back to the row limit. + (pa.table({"a": pa.array([], type=pa.int64())}), 500, 1024, 500), + ], +) +def test__resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int, row_group_size_bytes: int, expected: int) -> None: + """Pick min(row_group_limit, bytes/(bytes_per_row)) when byte limit is set.""" + assert _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes) == expected + + +def test_write_file_byte_limit_produces_more_row_groups_than_row_limit_alone(tmp_path: Path) -> None: + """A tight byte limit splits a single arrow table across multiple row groups.""" + from pyiceberg.table import WriteTask + + table_schema = Schema( + NestedField(1, "a", LongType(), required=False), + NestedField(2, "b", LongType(), required=False), + ) + arrow_data = pa.table({"a": list(range(10_000)), "b": list(range(10_000))}) + + def _write(properties: dict[str, str], subdir: str) -> Path: + table_metadata = TableMetadataV2( + location=f"file://{tmp_path}/{subdir}", + last_column_id=2, + format_version=2, + schemas=[table_schema], + partition_specs=[PartitionSpec()], + properties=properties, + ) + task = WriteTask( + write_uuid=uuid.uuid4(), + task_id=0, + record_batches=arrow_data.to_batches(), + schema=table_schema, + ) + data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task]))) + return Path(data_files[0].file_path.removeprefix("file://")) + + default_groups = pq.ParquetFile(_write({}, "default")).num_row_groups + constrained_groups = pq.ParquetFile( + _write({TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: "1024"}, "constrained") + ).num_row_groups + assert default_groups == 1 + assert constrained_groups > 1 + + +def test_write_file_byte_limit_respects_row_limit_upper_bound(tmp_path: Path) -> None: + """With an effectively infinite byte target, the row limit caps row groups.""" + from pyiceberg.table import WriteTask + + table_schema = Schema(NestedField(1, "a", LongType(), required=False)) + arrow_data = pa.table({"a": list(range(10_000))}) + table_metadata = TableMetadataV2( + location=f"file://{tmp_path}", + last_column_id=1, + format_version=2, + schemas=[table_schema], + partition_specs=[PartitionSpec()], + properties={ + TableProperties.PARQUET_ROW_GROUP_LIMIT: "1000", + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: str(10**12), + }, + ) + task = WriteTask( + write_uuid=uuid.uuid4(), + task_id=0, + record_batches=arrow_data.to_batches(), + schema=table_schema, + ) + data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task]))) + pf = pq.ParquetFile(data_files[0].file_path.removeprefix("file://")) + assert pf.num_row_groups == 10 From 3a1e3d41795de3ed921a57ee45ac8ea15d962f5d Mon Sep 17 00:00:00 2001 From: Stephen Buck <77749012+stephrb@users.noreply.github.com> Date: Wed, 3 Jun 2026 12:11:10 -0500 Subject: [PATCH 2/2] Revert "Implement write.parquet.row-group-size-bytes in the pyarrow writer" --- pyiceberg/io/pyarrow.py | 19 +---- tests/integration/test_writes/test_writes.py | 1 + tests/io/test_pyarrow.py | 86 -------------------- 3 files changed, 3 insertions(+), 103 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 6e3751ce5d..522af0f344 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2404,30 +2404,15 @@ def data_file_statistics_from_parquet_metadata( ) -def _resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int | None, row_group_size_bytes: int | None) -> int | None: - if not row_group_size_bytes or arrow_table.num_rows == 0: - return row_group_limit - bytes_per_row = max(1, arrow_table.nbytes // arrow_table.num_rows) - rows_for_byte_budget = max(1, row_group_size_bytes // bytes_per_row) - if row_group_limit is None: - return rows_for_byte_budget - return min(row_group_limit, rows_for_byte_budget) - - def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) - row_group_limit = property_as_int( + row_group_size = property_as_int( properties=table_metadata.properties, property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT, default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT, ) - row_group_size_bytes = property_as_int( - properties=table_metadata.properties, - property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, - default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, - ) location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties) def write_parquet(task: WriteTask) -> DataFile: @@ -2451,7 +2436,6 @@ def write_parquet(task: WriteTask) -> DataFile: for batch in task.record_batches ] arrow_table = pa.Table.from_batches(batches) - row_group_size = _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes) file_path = location_provider.new_data_location( data_file_name=task.generate_data_file_filename("parquet"), partition_key=task.partition_key, @@ -2580,6 +2564,7 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: from pyiceberg.table import TableProperties for key_pattern in [ + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES, f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*", ]: diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index f82f3dc39b..46d54f0491 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -680,6 +680,7 @@ def test_write_parquet_other_properties( @pytest.mark.parametrize( "properties", [ + {"write.parquet.row-group-size-bytes": "42"}, {"write.parquet.bloom-filter-enabled.column.bool": "42"}, {"write.parquet.bloom-filter-max-bytes": "42"}, ], diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 1d3aa02736..e90f3a46fc 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -20,7 +20,6 @@ import tempfile import uuid from datetime import date -from pathlib import Path from typing import Any, List, Optional from unittest.mock import MagicMock, patch from uuid import uuid4 @@ -68,7 +67,6 @@ _determine_partitions, _primitive_to_physical, _read_deletes, - _resolve_row_group_size, _to_requested_schema, bin_pack_arrow_table, compute_statistics_plan, @@ -76,7 +74,6 @@ expression_to_pyarrow, parquet_path_to_id_mapping, schema_to_pyarrow, - write_file, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.partitioning import PartitionField, PartitionSpec @@ -2322,86 +2319,3 @@ def test_pyarrow_io_multi_fs() -> None: # Same PyArrowFileIO instance resolves local file input to LocalFileSystem assert isinstance(pyarrow_file_io.new_input("file:///path/to/file")._filesystem, LocalFileSystem) - - -@pytest.mark.parametrize( - "arrow_table,row_group_limit,row_group_size_bytes,expected", - [ - # Byte limit tighter than row limit — 2 int64 cols => 16 bytes/row, - # 1024-byte budget => 64 rows/group. - (pa.table({"a": list(range(1000)), "b": list(range(1000))}), 10_000, 1024, 64), - # Row limit tighter than byte limit. - (pa.table({"a": list(range(1000))}), 10, 10**9, 10), - # Byte limit disabled (0) falls back to the row limit. - (pa.table({"a": list(range(1000))}), 500, 0, 500), - # Empty input falls back to the row limit. - (pa.table({"a": pa.array([], type=pa.int64())}), 500, 1024, 500), - ], -) -def test__resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int, row_group_size_bytes: int, expected: int) -> None: - """Pick min(row_group_limit, bytes/(bytes_per_row)) when byte limit is set.""" - assert _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes) == expected - - -def test_write_file_byte_limit_produces_more_row_groups_than_row_limit_alone(tmp_path: Path) -> None: - """A tight byte limit splits a single arrow table across multiple row groups.""" - from pyiceberg.table import WriteTask - - table_schema = Schema( - NestedField(1, "a", LongType(), required=False), - NestedField(2, "b", LongType(), required=False), - ) - arrow_data = pa.table({"a": list(range(10_000)), "b": list(range(10_000))}) - - def _write(properties: dict[str, str], subdir: str) -> Path: - table_metadata = TableMetadataV2( - location=f"file://{tmp_path}/{subdir}", - last_column_id=2, - format_version=2, - schemas=[table_schema], - partition_specs=[PartitionSpec()], - properties=properties, - ) - task = WriteTask( - write_uuid=uuid.uuid4(), - task_id=0, - record_batches=arrow_data.to_batches(), - schema=table_schema, - ) - data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task]))) - return Path(data_files[0].file_path.removeprefix("file://")) - - default_groups = pq.ParquetFile(_write({}, "default")).num_row_groups - constrained_groups = pq.ParquetFile( - _write({TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: "1024"}, "constrained") - ).num_row_groups - assert default_groups == 1 - assert constrained_groups > 1 - - -def test_write_file_byte_limit_respects_row_limit_upper_bound(tmp_path: Path) -> None: - """With an effectively infinite byte target, the row limit caps row groups.""" - from pyiceberg.table import WriteTask - - table_schema = Schema(NestedField(1, "a", LongType(), required=False)) - arrow_data = pa.table({"a": list(range(10_000))}) - table_metadata = TableMetadataV2( - location=f"file://{tmp_path}", - last_column_id=1, - format_version=2, - schemas=[table_schema], - partition_specs=[PartitionSpec()], - properties={ - TableProperties.PARQUET_ROW_GROUP_LIMIT: "1000", - TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: str(10**12), - }, - ) - task = WriteTask( - write_uuid=uuid.uuid4(), - task_id=0, - record_batches=arrow_data.to_batches(), - schema=table_schema, - ) - data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task]))) - pf = pq.ParquetFile(data_files[0].file_path.removeprefix("file://")) - assert pf.num_row_groups == 10