From 4cf7b23fadd73fb289223f5e4efdc3db3e37e471 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 18 May 2026 20:52:09 +0800 Subject: [PATCH 1/6] [python] Always resolve blob to actual data on read regardless of blob-as-descriptor setting --- .../read/reader/data_file_batch_reader.py | 20 ++++--- .../read/reader/format_blob_reader.py | 5 +- paimon-python/pypaimon/read/split_read.py | 3 +- .../pypaimon/tests/blob_table_test.py | 55 +++---------------- paimon-python/pypaimon/tests/blob_test.py | 42 ++------------ 5 files changed, 28 insertions(+), 97 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index 4035dcc4f0a9..ee389b7cb7d5 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -60,11 +60,16 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p for field in fields if hasattr(field.type, 'type') and field.type.type == 'BLOB' } - self.descriptor_blob_fields = { - field_name - for field_name in self.blob_descriptor_fields - if field_name in self.blob_field_names - } + if self.blob_as_descriptor: + # blob-as-descriptor=true means ALL blob fields store descriptors in parquet + self.descriptor_blob_fields = self.blob_field_names.copy() + else: + # Only specific fields listed in blob-descriptor-field store descriptors + self.descriptor_blob_fields = { + field_name + for field_name in self.blob_descriptor_fields + if field_name in self.blob_field_names + } def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]: if isinstance(self.format_reader, FormatBlobReader): @@ -154,10 +159,7 @@ def _convert_descriptor_stored_blob_columns(self, record_batch: RecordBatch) -> field_idx = record_batch.schema.get_field_index(field_name) values = record_batch.column(field_idx).to_pylist() - if self.blob_as_descriptor: - converted = [self._normalize_blob_cell(v) for v in values] - else: - converted = [self._blob_cell_to_data(v) for v in values] + converted = [self._blob_cell_to_data(v) for v in values] arrays[field_idx] = pa.array(converted, type=pa.large_binary()) return pa.RecordBatch.from_arrays(arrays, schema=record_batch.schema) diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py b/paimon-python/pypaimon/read/reader/format_blob_reader.py index e7e65394aa6d..7dd01a0de6be 100644 --- a/paimon-python/pypaimon/read/reader/format_blob_reader.py +++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py @@ -34,12 +34,11 @@ class FormatBlobReader(RecordBatchReader): def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], - full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool, + full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool = False, batch_size: int = 1024): self._file_io = file_io self._file_path = file_path self._push_down_predicate = push_down_predicate - self._blob_as_descriptor = blob_as_descriptor self._batch_size = batch_size # Get file size self._file_size = file_io.get_file_size(file_path) @@ -97,8 +96,6 @@ def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch for field_name in self._fields: if blob is None: pydict_data[field_name].append(None) - elif self._blob_as_descriptor: - pydict_data[field_name].append(blob.to_descriptor().serialize()) else: pydict_data[field_name].append(blob.to_data()) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 8a203c9f4c0d..588a916e5d60 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -697,8 +697,7 @@ def create_reader(self) -> RecordReader: else: reader = merge_reader - if (not CoreOptions.blob_as_descriptor(self.table.options) - and CoreOptions.blob_descriptor_fields(self.table.options)): + if CoreOptions.blob_descriptor_fields(self.table.options): reader = BlobDescriptorConvertReader(reader, self.table) return reader diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 56359c1d1fac..0a439b1cb0a4 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -1107,14 +1107,11 @@ def test_blob_write_read_partition(self): f"✅ End-to-end blob write/read test passed: wrote and read back {len(blob_data)} blob records correctly") # noqa: E501 def test_blob_write_read_end_to_end_with_descriptor(self): - """Test end-to-end blob functionality using blob descriptors.""" + """Test that blob-as-descriptor stores descriptors but read always resolves to actual data.""" import random from pypaimon import Schema - from pypaimon.table.row.blob import BlobDescriptor, Blob - from pypaimon.common.uri_reader import UriReaderFactory - from pypaimon.common.options.config import CatalogOptions + from pypaimon.table.row.blob import BlobDescriptor - # Create schema with blob column pa_schema = pa.schema([ ('id', pa.int32()), ('name', pa.string()), @@ -1130,81 +1127,47 @@ def test_blob_write_read_end_to_end_with_descriptor(self): } ) - # Create table self.catalog.create_table('test_db.blob_descriptor_test', schema, False) table: FileStoreTable = self.catalog.get_table('test_db.blob_descriptor_test') - # Create test blob data (1MB) blob_data = bytearray(1024 * 1024) - random.seed(42) # For reproducible tests + random.seed(42) for i in range(len(blob_data)): blob_data[i] = random.randint(0, 255) blob_data = bytes(blob_data) - # Create external blob file external_blob_path = os.path.join(self.temp_dir, 'external_blob') with open(external_blob_path, 'wb') as f: f.write(blob_data) - # Create blob descriptor pointing to external file blob_descriptor = BlobDescriptor(external_blob_path, 0, len(blob_data)) - # Create test data with blob descriptor test_data = pa.Table.from_pydict({ 'id': [1], 'name': ['paimon'], 'picture': [blob_descriptor.serialize()] }, schema=pa_schema) - # Write data using table API write_builder = table.new_batch_write_builder() writer = write_builder.new_write() writer.write_arrow(test_data) - - # Commit the data commit_messages = writer.prepare_commit() commit = write_builder.new_commit() commit.commit(commit_messages) - # Read data back read_builder = table.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() result = table_read.to_arrow(table_scan.plan().splits()) - # Verify the data was written and read correctly - self.assertEqual(result.num_rows, 1, "Should have 1 row") - self.assertEqual(result.column('id').to_pylist(), [1], "ID should match") - self.assertEqual(result.column('name').to_pylist(), ['paimon'], "Name should match") + self.assertEqual(result.num_rows, 1) + self.assertEqual(result.column('id').to_pylist(), [1]) + self.assertEqual(result.column('name').to_pylist(), ['paimon']) - # Get the blob descriptor bytes from the result + # Read always resolves descriptor to actual blob data (aligned with Java getBlob) picture_bytes = result.column('picture').to_pylist()[0] - self.assertIsInstance(picture_bytes, bytes, "Picture should be bytes") - - # Deserialize the blob descriptor - new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes) - - # The URI might be different if the blob was stored in the table's data directory - # Let's verify the descriptor properties and try to read the data - # Note: offset might be non-zero due to blob file format overhead - self.assertGreaterEqual(new_blob_descriptor.offset, 0, "Offset should be non-negative") - self.assertEqual(new_blob_descriptor.length, len(blob_data), "Length should match") - - # Create URI reader factory and read the blob data - catalog_options = {CatalogOptions.WAREHOUSE.key(): self.warehouse} - uri_reader_factory = UriReaderFactory(catalog_options) - uri_reader = uri_reader_factory.create(new_blob_descriptor.uri) - blob = Blob.from_descriptor(uri_reader, new_blob_descriptor) - - blob_descriptor_from_blob = blob.to_descriptor() - self.assertEqual( - blob_descriptor_from_blob.uri, - new_blob_descriptor.uri, - f"URI should be preserved. Expected: {new_blob_descriptor.uri}, Got: {blob_descriptor_from_blob.uri}" - ) - - # Verify the blob data matches the original - self.assertEqual(blob.to_data(), blob_data, "Blob data should match original") + self.assertIsInstance(picture_bytes, bytes) + self.assertEqual(picture_bytes, blob_data) print("✅ Blob descriptor end-to-end test passed:") print(" - Created external blob file and descriptor") diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index d9cf64210d61..911a8e3b1dc5 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -1134,7 +1134,7 @@ def test_blob_end_to_end_with_descriptor(self): self.assertGreater(file_size, 0) # ========== Step 3: Read data and check ========== - # Define schema for reading + # Reading always resolves blob to actual content (aligned with Java getBlob behavior) read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))] reader = FormatBlobReader( file_io=file_io, @@ -1142,44 +1142,17 @@ def test_blob_end_to_end_with_descriptor(self): read_fields=[blob_field_name], full_fields=read_fields, push_down_predicate=None, - blob_as_descriptor=True ) - # Read with blob_as_descriptor=True (read output as descriptor bytes) batch = reader.read_arrow_batch() self.assertIsNotNone(batch) self.assertEqual(batch.num_rows, 1) self.assertEqual(batch.num_columns, 1) - read_blob_bytes = batch.column(0)[0].as_py() - self.assertIsInstance(read_blob_bytes, bytes) - - # Deserialize the returned descriptor - returned_descriptor = BlobDescriptor.deserialize(read_blob_bytes) - - # The returned descriptor should point to the blob file (simplified implementation) - # because the current implementation creates a descriptor pointing to the blob file location - self.assertEqual(returned_descriptor.uri, str(blob_file_path)) - self.assertGreater(returned_descriptor.offset, 0) # Should have some offset in the blob file - - reader.close() - - reader_content = FormatBlobReader( - file_io=file_io, - file_path=str(blob_file_path), - read_fields=[blob_field_name], - full_fields=read_fields, - push_down_predicate=None, - blob_as_descriptor=False - ) - batch_content = reader_content.read_arrow_batch() - self.assertIsNotNone(batch_content) - self.assertEqual(batch_content.num_rows, 1) - read_content_bytes = batch_content.column(0)[0].as_py() + read_content_bytes = batch.column(0)[0].as_py() self.assertIsInstance(read_content_bytes, bytes) - # With blob_as_descriptor=False, we should get the actual blob content self.assertEqual(read_content_bytes, test_content) - reader_content.close() + reader.close() def test_null_blob_write(self): from pypaimon.write.blob_format_writer import BlobFormatWriter @@ -1229,7 +1202,7 @@ def test_null_blob_read(self): self.assertEqual(batch.column(0)[2].as_py(), b"world") reader.close() - def test_null_blob_read_as_descriptor(self): + def test_null_blob_read_resolves_content(self): from pypaimon.write.blob_format_writer import BlobFormatWriter file_io = LocalFileIO(self.temp_dir, Options({})) @@ -1251,17 +1224,14 @@ def test_null_blob_read_as_descriptor(self): read_fields=[blob_field_name], full_fields=read_fields, push_down_predicate=None, - blob_as_descriptor=True ) batch = reader.read_arrow_batch() self.assertIsNotNone(batch) self.assertEqual(batch.num_rows, 3) - desc0 = BlobDescriptor.deserialize(batch.column(0)[0].as_py()) - self.assertEqual(desc0.uri, blob_file_path) + self.assertEqual(batch.column(0)[0].as_py(), b"hello") self.assertIsNone(batch.column(0)[1].as_py()) - desc2 = BlobDescriptor.deserialize(batch.column(0)[2].as_py()) - self.assertEqual(desc2.uri, blob_file_path) + self.assertEqual(batch.column(0)[2].as_py(), b"world") reader.close() From 77bc51b52f03158f703d602cb0fb2c2c7ceffb5f Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 19 May 2026 16:32:27 +0800 Subject: [PATCH 2/6] [python] Add Blob.from_bytes unified API and revert broken read behavior Revert the read-path changes from 4cf7b23fa that always resolved blob descriptors to data. Restore master behavior where blob-as-descriptor flag is respected. Add Blob.from_bytes(data, file_io) as a unified entry point (equivalent to Java's Blob.fromBytes) that auto-dispatches BlobData vs BlobRef based on the bytes content. --- .../read/reader/data_file_batch_reader.py | 20 +++---- .../read/reader/format_blob_reader.py | 5 +- paimon-python/pypaimon/read/split_read.py | 3 +- paimon-python/pypaimon/table/row/blob.py | 16 ++++++ .../pypaimon/tests/blob_table_test.py | 55 ++++++++++++++++--- paimon-python/pypaimon/tests/blob_test.py | 42 ++++++++++++-- 6 files changed, 113 insertions(+), 28 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index ee389b7cb7d5..4035dcc4f0a9 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -60,16 +60,11 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p for field in fields if hasattr(field.type, 'type') and field.type.type == 'BLOB' } - if self.blob_as_descriptor: - # blob-as-descriptor=true means ALL blob fields store descriptors in parquet - self.descriptor_blob_fields = self.blob_field_names.copy() - else: - # Only specific fields listed in blob-descriptor-field store descriptors - self.descriptor_blob_fields = { - field_name - for field_name in self.blob_descriptor_fields - if field_name in self.blob_field_names - } + self.descriptor_blob_fields = { + field_name + for field_name in self.blob_descriptor_fields + if field_name in self.blob_field_names + } def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]: if isinstance(self.format_reader, FormatBlobReader): @@ -159,7 +154,10 @@ def _convert_descriptor_stored_blob_columns(self, record_batch: RecordBatch) -> field_idx = record_batch.schema.get_field_index(field_name) values = record_batch.column(field_idx).to_pylist() - converted = [self._blob_cell_to_data(v) for v in values] + if self.blob_as_descriptor: + converted = [self._normalize_blob_cell(v) for v in values] + else: + converted = [self._blob_cell_to_data(v) for v in values] arrays[field_idx] = pa.array(converted, type=pa.large_binary()) return pa.RecordBatch.from_arrays(arrays, schema=record_batch.schema) diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py b/paimon-python/pypaimon/read/reader/format_blob_reader.py index 7dd01a0de6be..e7e65394aa6d 100644 --- a/paimon-python/pypaimon/read/reader/format_blob_reader.py +++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py @@ -34,11 +34,12 @@ class FormatBlobReader(RecordBatchReader): def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], - full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool = False, + full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool, batch_size: int = 1024): self._file_io = file_io self._file_path = file_path self._push_down_predicate = push_down_predicate + self._blob_as_descriptor = blob_as_descriptor self._batch_size = batch_size # Get file size self._file_size = file_io.get_file_size(file_path) @@ -96,6 +97,8 @@ def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch for field_name in self._fields: if blob is None: pydict_data[field_name].append(None) + elif self._blob_as_descriptor: + pydict_data[field_name].append(blob.to_descriptor().serialize()) else: pydict_data[field_name].append(blob.to_data()) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 588a916e5d60..8a203c9f4c0d 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -697,7 +697,8 @@ def create_reader(self) -> RecordReader: else: reader = merge_reader - if CoreOptions.blob_descriptor_fields(self.table.options): + if (not CoreOptions.blob_as_descriptor(self.table.options) + and CoreOptions.blob_descriptor_fields(self.table.options)): reader = BlobDescriptorConvertReader(reader, self.table) return reader diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index b619b6a76aec..903492e704b0 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -276,6 +276,22 @@ def from_file(file_io, file_path: str, offset: int, length: int) -> 'Blob': def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob': return BlobRef(uri_reader, descriptor) + @staticmethod + def from_bytes(data: bytes, file_io=None) -> 'Blob': + if data is None: + return BlobData(b'') + if not isinstance(data, (bytes, bytearray)): + raise TypeError(f"Blob.from_bytes expects bytes, got {type(data)}") + data = bytes(data) + if BlobDescriptor.is_blob_descriptor(data): + descriptor = BlobDescriptor.deserialize(data) + if file_io is not None: + uri_reader = file_io.uri_reader_factory.create(descriptor.uri) + else: + uri_reader = FileUriReader(None) + return BlobRef(uri_reader, descriptor) + return BlobData(data) + class BlobData(Blob): diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 0a439b1cb0a4..56359c1d1fac 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -1107,11 +1107,14 @@ def test_blob_write_read_partition(self): f"✅ End-to-end blob write/read test passed: wrote and read back {len(blob_data)} blob records correctly") # noqa: E501 def test_blob_write_read_end_to_end_with_descriptor(self): - """Test that blob-as-descriptor stores descriptors but read always resolves to actual data.""" + """Test end-to-end blob functionality using blob descriptors.""" import random from pypaimon import Schema - from pypaimon.table.row.blob import BlobDescriptor + from pypaimon.table.row.blob import BlobDescriptor, Blob + from pypaimon.common.uri_reader import UriReaderFactory + from pypaimon.common.options.config import CatalogOptions + # Create schema with blob column pa_schema = pa.schema([ ('id', pa.int32()), ('name', pa.string()), @@ -1127,47 +1130,81 @@ def test_blob_write_read_end_to_end_with_descriptor(self): } ) + # Create table self.catalog.create_table('test_db.blob_descriptor_test', schema, False) table: FileStoreTable = self.catalog.get_table('test_db.blob_descriptor_test') + # Create test blob data (1MB) blob_data = bytearray(1024 * 1024) - random.seed(42) + random.seed(42) # For reproducible tests for i in range(len(blob_data)): blob_data[i] = random.randint(0, 255) blob_data = bytes(blob_data) + # Create external blob file external_blob_path = os.path.join(self.temp_dir, 'external_blob') with open(external_blob_path, 'wb') as f: f.write(blob_data) + # Create blob descriptor pointing to external file blob_descriptor = BlobDescriptor(external_blob_path, 0, len(blob_data)) + # Create test data with blob descriptor test_data = pa.Table.from_pydict({ 'id': [1], 'name': ['paimon'], 'picture': [blob_descriptor.serialize()] }, schema=pa_schema) + # Write data using table API write_builder = table.new_batch_write_builder() writer = write_builder.new_write() writer.write_arrow(test_data) + + # Commit the data commit_messages = writer.prepare_commit() commit = write_builder.new_commit() commit.commit(commit_messages) + # Read data back read_builder = table.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() result = table_read.to_arrow(table_scan.plan().splits()) - self.assertEqual(result.num_rows, 1) - self.assertEqual(result.column('id').to_pylist(), [1]) - self.assertEqual(result.column('name').to_pylist(), ['paimon']) + # Verify the data was written and read correctly + self.assertEqual(result.num_rows, 1, "Should have 1 row") + self.assertEqual(result.column('id').to_pylist(), [1], "ID should match") + self.assertEqual(result.column('name').to_pylist(), ['paimon'], "Name should match") - # Read always resolves descriptor to actual blob data (aligned with Java getBlob) + # Get the blob descriptor bytes from the result picture_bytes = result.column('picture').to_pylist()[0] - self.assertIsInstance(picture_bytes, bytes) - self.assertEqual(picture_bytes, blob_data) + self.assertIsInstance(picture_bytes, bytes, "Picture should be bytes") + + # Deserialize the blob descriptor + new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes) + + # The URI might be different if the blob was stored in the table's data directory + # Let's verify the descriptor properties and try to read the data + # Note: offset might be non-zero due to blob file format overhead + self.assertGreaterEqual(new_blob_descriptor.offset, 0, "Offset should be non-negative") + self.assertEqual(new_blob_descriptor.length, len(blob_data), "Length should match") + + # Create URI reader factory and read the blob data + catalog_options = {CatalogOptions.WAREHOUSE.key(): self.warehouse} + uri_reader_factory = UriReaderFactory(catalog_options) + uri_reader = uri_reader_factory.create(new_blob_descriptor.uri) + blob = Blob.from_descriptor(uri_reader, new_blob_descriptor) + + blob_descriptor_from_blob = blob.to_descriptor() + self.assertEqual( + blob_descriptor_from_blob.uri, + new_blob_descriptor.uri, + f"URI should be preserved. Expected: {new_blob_descriptor.uri}, Got: {blob_descriptor_from_blob.uri}" + ) + + # Verify the blob data matches the original + self.assertEqual(blob.to_data(), blob_data, "Blob data should match original") print("✅ Blob descriptor end-to-end test passed:") print(" - Created external blob file and descriptor") diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index 911a8e3b1dc5..d9cf64210d61 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -1134,7 +1134,7 @@ def test_blob_end_to_end_with_descriptor(self): self.assertGreater(file_size, 0) # ========== Step 3: Read data and check ========== - # Reading always resolves blob to actual content (aligned with Java getBlob behavior) + # Define schema for reading read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))] reader = FormatBlobReader( file_io=file_io, @@ -1142,17 +1142,44 @@ def test_blob_end_to_end_with_descriptor(self): read_fields=[blob_field_name], full_fields=read_fields, push_down_predicate=None, + blob_as_descriptor=True ) + # Read with blob_as_descriptor=True (read output as descriptor bytes) batch = reader.read_arrow_batch() self.assertIsNotNone(batch) self.assertEqual(batch.num_rows, 1) self.assertEqual(batch.num_columns, 1) - read_content_bytes = batch.column(0)[0].as_py() + read_blob_bytes = batch.column(0)[0].as_py() + self.assertIsInstance(read_blob_bytes, bytes) + + # Deserialize the returned descriptor + returned_descriptor = BlobDescriptor.deserialize(read_blob_bytes) + + # The returned descriptor should point to the blob file (simplified implementation) + # because the current implementation creates a descriptor pointing to the blob file location + self.assertEqual(returned_descriptor.uri, str(blob_file_path)) + self.assertGreater(returned_descriptor.offset, 0) # Should have some offset in the blob file + + reader.close() + + reader_content = FormatBlobReader( + file_io=file_io, + file_path=str(blob_file_path), + read_fields=[blob_field_name], + full_fields=read_fields, + push_down_predicate=None, + blob_as_descriptor=False + ) + batch_content = reader_content.read_arrow_batch() + self.assertIsNotNone(batch_content) + self.assertEqual(batch_content.num_rows, 1) + read_content_bytes = batch_content.column(0)[0].as_py() self.assertIsInstance(read_content_bytes, bytes) + # With blob_as_descriptor=False, we should get the actual blob content self.assertEqual(read_content_bytes, test_content) - reader.close() + reader_content.close() def test_null_blob_write(self): from pypaimon.write.blob_format_writer import BlobFormatWriter @@ -1202,7 +1229,7 @@ def test_null_blob_read(self): self.assertEqual(batch.column(0)[2].as_py(), b"world") reader.close() - def test_null_blob_read_resolves_content(self): + def test_null_blob_read_as_descriptor(self): from pypaimon.write.blob_format_writer import BlobFormatWriter file_io = LocalFileIO(self.temp_dir, Options({})) @@ -1224,14 +1251,17 @@ def test_null_blob_read_resolves_content(self): read_fields=[blob_field_name], full_fields=read_fields, push_down_predicate=None, + blob_as_descriptor=True ) batch = reader.read_arrow_batch() self.assertIsNotNone(batch) self.assertEqual(batch.num_rows, 3) - self.assertEqual(batch.column(0)[0].as_py(), b"hello") + desc0 = BlobDescriptor.deserialize(batch.column(0)[0].as_py()) + self.assertEqual(desc0.uri, blob_file_path) self.assertIsNone(batch.column(0)[1].as_py()) - self.assertEqual(batch.column(0)[2].as_py(), b"world") + desc2 = BlobDescriptor.deserialize(batch.column(0)[2].as_py()) + self.assertEqual(desc2.uri, blob_file_path) reader.close() From a610d97f86826d3e4f4da5d88af6deecbabdf613 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 19 May 2026 17:09:09 +0800 Subject: [PATCH 3/6] [python] Fix Blob.from_bytes type annotation and add tests - Add Optional[bytes] type annotation for data parameter - Raise ValueError when file_io is None but bytes are BlobDescriptor - Add unit tests for from_bytes covering all branches --- paimon-python/pypaimon/table/row/blob.py | 9 +++--- paimon-python/pypaimon/tests/blob_test.py | 38 +++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index 903492e704b0..019e8368e4a1 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -277,18 +277,17 @@ def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob' return BlobRef(uri_reader, descriptor) @staticmethod - def from_bytes(data: bytes, file_io=None) -> 'Blob': + def from_bytes(data: Optional[bytes], file_io=None) -> 'Blob': if data is None: return BlobData(b'') if not isinstance(data, (bytes, bytearray)): raise TypeError(f"Blob.from_bytes expects bytes, got {type(data)}") data = bytes(data) if BlobDescriptor.is_blob_descriptor(data): + if file_io is None: + raise ValueError("file_io is required to resolve BlobDescriptor bytes") descriptor = BlobDescriptor.deserialize(data) - if file_io is not None: - uri_reader = file_io.uri_reader_factory.create(descriptor.uri) - else: - uri_reader = FileUriReader(None) + uri_reader = file_io.uri_reader_factory.create(descriptor.uri) return BlobRef(uri_reader, descriptor) return BlobData(data) diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index d9cf64210d61..913ad135280a 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -134,6 +134,44 @@ def test_from_http(self): self.assertEqual(descriptor.offset, 0) self.assertEqual(descriptor.length, -1) + def test_from_bytes_with_raw_data(self): + raw = b"hello blob" + blob = Blob.from_bytes(raw) + self.assertIsInstance(blob, BlobData) + self.assertEqual(blob.to_data(), raw) + + def test_from_bytes_with_none(self): + blob = Blob.from_bytes(None) + self.assertIsInstance(blob, BlobData) + self.assertEqual(blob.to_data(), b'') + + def test_from_bytes_with_descriptor(self): + import tempfile, os + data = b"actual blob content" + tmp = tempfile.NamedTemporaryFile(delete=False) + tmp.write(data) + tmp.close() + + descriptor = BlobDescriptor(tmp.name, 0, len(data)) + serialized = descriptor.serialize() + + from pypaimon.common.file_io import FileIO + file_io = FileIO.get(f"file://{os.path.dirname(tmp.name)}", {}) + blob = Blob.from_bytes(serialized, file_io) + self.assertIsInstance(blob, BlobRef) + self.assertEqual(blob.to_data(), data) + os.unlink(tmp.name) + + def test_from_bytes_descriptor_without_file_io_raises(self): + descriptor = BlobDescriptor("/tmp/fake", 0, 10) + serialized = descriptor.serialize() + with self.assertRaises(ValueError): + Blob.from_bytes(serialized) + + def test_from_bytes_invalid_type_raises(self): + with self.assertRaises(TypeError): + Blob.from_bytes(12345) + def test_blob_data_interface_compliance(self): """Test that BlobData properly implements Blob interface.""" test_data = b"interface test data" From eb3897dbe2a9317ea1e74521246aa74f9e1b9a77 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 19 May 2026 18:09:27 +0800 Subject: [PATCH 4/6] [python] Align Blob.from_bytes with Java Blob.fromBytes semantics - from_bytes(None) returns None (not empty BlobData), matching Java - Add allow_blob_data parameter (Java's 4th arg): when False, always interpret bytes as descriptor - Integrate into read path: _blob_cell_to_data now delegates to Blob.from_bytes instead of duplicating dispatch logic --- .../read/reader/data_file_batch_reader.py | 22 ++----------------- paimon-python/pypaimon/table/row/blob.py | 6 ++--- paimon-python/pypaimon/tests/blob_test.py | 4 +--- 3 files changed, 6 insertions(+), 26 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index 4035dcc4f0a9..561ef7ac28c2 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -178,28 +178,10 @@ def _blob_cell_to_data(self, value): value = self._normalize_blob_cell(value) if value is None: return None - if not isinstance(value, bytes): return value - - descriptor = self._deserialize_descriptor_or_none(value) - if descriptor is None: - return value - - try: - uri_reader = self.file_io.uri_reader_factory.create(descriptor.uri) - blob = Blob.from_descriptor(uri_reader, descriptor) - return blob.to_data() - except Exception as e: - raise RuntimeError( - "Failed to read blob bytes from descriptor URI while converting blob value." - ) from e - - @staticmethod - def _deserialize_descriptor_or_none(raw: bytes): - if not BlobDescriptor.is_blob_descriptor(raw): - return None - return BlobDescriptor.deserialize(raw) + blob = Blob.from_bytes(value, self.file_io) + return blob.to_data() if blob is not None else None def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch: """Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER).""" diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index 019e8368e4a1..8af0c0dc69a3 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -277,13 +277,13 @@ def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob' return BlobRef(uri_reader, descriptor) @staticmethod - def from_bytes(data: Optional[bytes], file_io=None) -> 'Blob': + def from_bytes(data: Optional[bytes], file_io=None, allow_blob_data: bool = True) -> Optional['Blob']: if data is None: - return BlobData(b'') + return None if not isinstance(data, (bytes, bytearray)): raise TypeError(f"Blob.from_bytes expects bytes, got {type(data)}") data = bytes(data) - if BlobDescriptor.is_blob_descriptor(data): + if BlobDescriptor.is_blob_descriptor(data) or not allow_blob_data: if file_io is None: raise ValueError("file_io is required to resolve BlobDescriptor bytes") descriptor = BlobDescriptor.deserialize(data) diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index 913ad135280a..643108692796 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -141,9 +141,7 @@ def test_from_bytes_with_raw_data(self): self.assertEqual(blob.to_data(), raw) def test_from_bytes_with_none(self): - blob = Blob.from_bytes(None) - self.assertIsInstance(blob, BlobData) - self.assertEqual(blob.to_data(), b'') + self.assertIsNone(Blob.from_bytes(None)) def test_from_bytes_with_descriptor(self): import tempfile, os From 5a135390d77e36326ef5d6c492f8d8221854a2b3 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 19 May 2026 22:09:30 +0800 Subject: [PATCH 5/6] [python] Fix flake8 lint errors --- paimon-python/pypaimon/read/reader/data_file_batch_reader.py | 2 +- paimon-python/pypaimon/tests/blob_test.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index 561ef7ac28c2..da6afb3824f6 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -25,7 +25,7 @@ from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.schema.data_types import DataField, PyarrowFieldParser -from pypaimon.table.row.blob import Blob, BlobDescriptor +from pypaimon.table.row.blob import Blob from pypaimon.table.special_fields import SpecialFields diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index 643108692796..f26d0e5b8369 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -144,7 +144,8 @@ def test_from_bytes_with_none(self): self.assertIsNone(Blob.from_bytes(None)) def test_from_bytes_with_descriptor(self): - import tempfile, os + import tempfile + import os data = b"actual blob content" tmp = tempfile.NamedTemporaryFile(delete=False) tmp.write(data) From 3a2a85d53a1730693dcc68fec01c7a68252402a3 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 22:09:37 +0800 Subject: [PATCH 6/6] [python] Support row-level Blob access aligned with Java getBlob Add get_blob(pos) to InternalRow and to_blob_iterator() to TableRead, enabling lazy Blob access with streaming support. --- .../read/reader/iface/record_batch_reader.py | 10 +- paimon-python/pypaimon/read/table_read.py | 49 ++++++++++ .../pypaimon/table/row/generic_row.py | 9 ++ .../pypaimon/table/row/internal_row.py | 11 ++- .../pypaimon/table/row/offset_row.py | 27 +++++- .../pypaimon/tests/blob_table_test.py | 91 +++++++++++++++++++ 6 files changed, 192 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py index 3b29383c2253..da2f2e799475 100644 --- a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py @@ -57,17 +57,21 @@ def tuple_iterator(self) -> Optional[Iterator[tuple]]: return None return df.iter_rows() - def read_batch(self) -> Optional[RecordIterator[InternalRow]]: + def read_batch(self, file_io=None, blob_field_indices=None) -> Optional[RecordIterator[InternalRow]]: df = self.read_next_df() if df is None: return None - return InternalRowWrapperIterator(df.iter_rows(), df.width) + return InternalRowWrapperIterator( + df.iter_rows(), df.width, file_io, blob_field_indices) class InternalRowWrapperIterator(RecordIterator[InternalRow]): - def __init__(self, iterator: Iterator[tuple], width: int): + def __init__(self, iterator: Iterator[tuple], width: int, + file_io=None, blob_field_indices=None): self._iterator = iterator self._reused_row = OffsetRow(None, 0, width) + if file_io is not None and blob_field_indices: + self._reused_row.with_blob_context(file_io, blob_field_indices) def next(self) -> Optional[InternalRow]: row_tuple = next(self._iterator, None) diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index c45f2a853231..ebc010b501a3 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -74,6 +74,55 @@ def _record_generator(): return _record_generator() + def to_blob_iterator(self, splits: List[Split]) -> Iterator: + """Iterator where blob fields are accessible via row.get_blob(pos). + + Unlike to_iterator() which eagerly resolves blobs to bytes, + this returns rows with lazy Blob access supporting streaming. + """ + from pypaimon.common.options.core_options import CoreOptions + + blob_field_indices = { + i for i, field in enumerate(self.read_type) + if hasattr(field.type, 'type') and field.type.type == 'BLOB' + } + file_io = self.table.file_io + limit = self.limit + + # Force blob-as-descriptor=true so descriptors are preserved + original_value = self.table.options.blob_as_descriptor() + self.table.options.set(CoreOptions.BLOB_AS_DESCRIPTOR, True) + + def _blob_record_generator(): + try: + count = 0 + for split in splits: + if limit is not None and count >= limit: + return + reader = self._create_split_read(split).create_reader() + try: + for batch in iter( + lambda: reader.read_batch(file_io, blob_field_indices), + None + ): + for row in iter(batch.next, None): + yield row + count += 1 + if limit is not None and count >= limit: + return + finally: + reader.close() + finally: + # Restore original option + if original_value is not None: + self.table.options.set( + CoreOptions.BLOB_AS_DESCRIPTOR, original_value) + else: + self.table.options.options.data.pop( + CoreOptions.BLOB_AS_DESCRIPTOR.key(), None) + + return _blob_record_generator() + def to_arrow_batch_reader(self, splits: List[Split]) -> pyarrow.ipc.RecordBatchReader: schema = PyarrowFieldParser.from_paimon_schema(self.read_type) if self.include_row_kind: diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index e224fb349574..7f10ea2ecb73 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -44,6 +44,15 @@ def get_field(self, pos: int) -> Any: raise IndexError(f"Position {pos} is out of bounds for row arity {len(self.values)}") return self.values[pos] + def get_blob(self, pos: int): + from pypaimon.table.row.blob import Blob + value = self.get_field(pos) + if value is None: + return None + if isinstance(value, Blob): + return value + raise TypeError(f"Cannot get Blob from {type(value)} at position {pos}") + def get_row_kind(self) -> RowKind: return self.row_kind diff --git a/paimon-python/pypaimon/table/row/internal_row.py b/paimon-python/pypaimon/table/row/internal_row.py index ec89a743772a..c5283ddac63e 100644 --- a/paimon-python/pypaimon/table/row/internal_row.py +++ b/paimon-python/pypaimon/table/row/internal_row.py @@ -16,7 +16,7 @@ # under the License. from abc import ABC, abstractmethod -from typing import Any +from typing import Any, Optional from pypaimon.table.row.row_kind import RowKind @@ -45,6 +45,15 @@ def __len__(self) -> int: The number does not include RowKind. It is kept separately. """ + def get_blob(self, pos: int) -> Optional[Any]: + """Returns the Blob at the given position, or None if null. + + Requires a blob-aware row context. Use TableRead.to_blob_iterator(). + """ + raise NotImplementedError( + "get_blob() requires a blob-aware row. Use TableRead.to_blob_iterator()." + ) + def __str__(self) -> str: fields = [] for pos in range(self.__len__()): diff --git a/paimon-python/pypaimon/table/row/offset_row.py b/paimon-python/pypaimon/table/row/offset_row.py index a9f02b18678a..c856373dad13 100644 --- a/paimon-python/pypaimon/table/row/offset_row.py +++ b/paimon-python/pypaimon/table/row/offset_row.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from typing import Optional +from typing import Optional, Set from pypaimon.table.row.internal_row import InternalRow, RowKind @@ -28,6 +28,13 @@ def __init__(self, row_tuple: Optional[tuple], offset: int, arity: int): self.offset = offset self.arity = arity self.row_kind_byte: int = 1 + self._file_io = None + self._blob_field_indices: Optional[Set[int]] = None + + def with_blob_context(self, file_io, blob_field_indices: Set[int]) -> 'OffsetRow': + self._file_io = file_io + self._blob_field_indices = blob_field_indices + return self def replace(self, row_tuple: tuple) -> 'OffsetRow': self.row_tuple = row_tuple @@ -46,6 +53,24 @@ def get_field(self, pos: int): raise IndexError(f"Position {pos} is out of bounds for row arity {self.arity}") return self.row_tuple[self.offset + pos] + def get_blob(self, pos: int): + from pypaimon.table.row.blob import Blob, BlobDescriptor + + if self._blob_field_indices is not None and pos not in self._blob_field_indices: + raise TypeError(f"Field at position {pos} is not a BLOB field") + value = self.get_field(pos) + if value is None: + return None + if isinstance(value, (bytes, bytearray)): + value = bytes(value) + if BlobDescriptor.is_blob_descriptor(value): + descriptor = BlobDescriptor.deserialize(value) + uri_reader = self._file_io.uri_reader_factory.create(descriptor.uri) + return Blob.from_descriptor(uri_reader, descriptor) + else: + return Blob.from_data(value) + raise TypeError(f"Cannot convert {type(value)} to Blob") + def get_row_kind(self) -> RowKind: return RowKind(self.row_kind_byte) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 56359c1d1fac..68e86735ada2 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -3184,5 +3184,96 @@ def test_rename_blob_column_should_fail(self): self.assertIn('Cannot rename BLOB column', str(ctx.exception)) +class GetBlobTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.temp_dir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('test_db', False) + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('picture', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + cls.catalog.create_table('test_db.get_blob_test', schema, False) + cls.table = cls.catalog.get_table('test_db.get_blob_test') + + data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'name': ['a', 'b', 'c'], + 'picture': [b'img_data_1', b'img_data_2', b'img_data_3'], + }, schema=pa_schema) + + write_builder = cls.table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(data) + commit_messages = writer.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.temp_dir, ignore_errors=True) + + def test_get_blob_lazy_access(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + results = [] + for row in read.to_blob_iterator(splits): + blob = row.get_blob(2) + self.assertIsNotNone(blob) + results.append((row.get_field(0), blob.to_data())) + + self.assertEqual(len(results), 3) + results.sort(key=lambda x: x[0]) + self.assertEqual(results[0], (1, b'img_data_1')) + self.assertEqual(results[1], (2, b'img_data_2')) + self.assertEqual(results[2], (3, b'img_data_3')) + + def test_get_blob_streaming(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + for row in read.to_blob_iterator(splits): + blob = row.get_blob(2) + with blob.new_input_stream() as stream: + data = stream.read() + self.assertTrue(data.startswith(b'img_data_')) + break + + def test_get_blob_non_blob_field_raises(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + for row in read.to_blob_iterator(splits): + with self.assertRaises(TypeError): + row.get_blob(0) + break + + def test_to_iterator_unchanged(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + count = 0 + for row in read.to_iterator(splits): + self.assertIsNotNone(row.get_field(0)) + self.assertIsNotNone(row.get_field(1)) + count += 1 + self.assertEqual(count, 3) + + if __name__ == '__main__': unittest.main()