Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions paimon-python/pypaimon/read/reader/data_file_batch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,16 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p
for field in fields
if hasattr(field.type, 'type') and field.type.type == 'BLOB'
}
self.descriptor_blob_fields = {
field_name
for field_name in self.blob_descriptor_fields
if field_name in self.blob_field_names
}
if self.blob_as_descriptor:
# blob-as-descriptor=true means ALL blob fields store descriptors in parquet
self.descriptor_blob_fields = self.blob_field_names.copy()
else:
# Only specific fields listed in blob-descriptor-field store descriptors
self.descriptor_blob_fields = {
field_name
for field_name in self.blob_descriptor_fields
if field_name in self.blob_field_names
}

def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]:
if isinstance(self.format_reader, FormatBlobReader):
Expand Down Expand Up @@ -154,10 +159,7 @@ def _convert_descriptor_stored_blob_columns(self, record_batch: RecordBatch) ->
field_idx = record_batch.schema.get_field_index(field_name)
values = record_batch.column(field_idx).to_pylist()

if self.blob_as_descriptor:
converted = [self._normalize_blob_cell(v) for v in values]
else:
converted = [self._blob_cell_to_data(v) for v in values]
converted = [self._blob_cell_to_data(v) for v in values]
arrays[field_idx] = pa.array(converted, type=pa.large_binary())

return pa.RecordBatch.from_arrays(arrays, schema=record_batch.schema)
Expand Down
5 changes: 1 addition & 4 deletions paimon-python/pypaimon/read/reader/format_blob_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@
class FormatBlobReader(RecordBatchReader):

def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool,
full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool = False,
batch_size: int = 1024):
self._file_io = file_io
self._file_path = file_path
self._push_down_predicate = push_down_predicate
self._blob_as_descriptor = blob_as_descriptor
self._batch_size = batch_size
# Get file size
self._file_size = file_io.get_file_size(file_path)
Expand Down Expand Up @@ -97,8 +96,6 @@ def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch
for field_name in self._fields:
if blob is None:
pydict_data[field_name].append(None)
elif self._blob_as_descriptor:
pydict_data[field_name].append(blob.to_descriptor().serialize())
else:
pydict_data[field_name].append(blob.to_data())

Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,7 @@ def create_reader(self) -> RecordReader:
else:
reader = merge_reader

if (not CoreOptions.blob_as_descriptor(self.table.options)
and CoreOptions.blob_descriptor_fields(self.table.options)):
if CoreOptions.blob_descriptor_fields(self.table.options):
reader = BlobDescriptorConvertReader(reader, self.table)

return reader
Expand Down
55 changes: 9 additions & 46 deletions paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1107,14 +1107,11 @@ def test_blob_write_read_partition(self):
f"✅ End-to-end blob write/read test passed: wrote and read back {len(blob_data)} blob records correctly") # noqa: E501

def test_blob_write_read_end_to_end_with_descriptor(self):
"""Test end-to-end blob functionality using blob descriptors."""
"""Test that blob-as-descriptor stores descriptors but read always resolves to actual data."""
import random
from pypaimon import Schema
from pypaimon.table.row.blob import BlobDescriptor, Blob
from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.common.options.config import CatalogOptions
from pypaimon.table.row.blob import BlobDescriptor

# Create schema with blob column
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
Expand All @@ -1130,81 +1127,47 @@ def test_blob_write_read_end_to_end_with_descriptor(self):
}
)

# Create table
self.catalog.create_table('test_db.blob_descriptor_test', schema, False)
table: FileStoreTable = self.catalog.get_table('test_db.blob_descriptor_test')

# Create test blob data (1MB)
blob_data = bytearray(1024 * 1024)
random.seed(42) # For reproducible tests
random.seed(42)
for i in range(len(blob_data)):
blob_data[i] = random.randint(0, 255)
blob_data = bytes(blob_data)

# Create external blob file
external_blob_path = os.path.join(self.temp_dir, 'external_blob')
with open(external_blob_path, 'wb') as f:
f.write(blob_data)

# Create blob descriptor pointing to external file
blob_descriptor = BlobDescriptor(external_blob_path, 0, len(blob_data))

# Create test data with blob descriptor
test_data = pa.Table.from_pydict({
'id': [1],
'name': ['paimon'],
'picture': [blob_descriptor.serialize()]
}, schema=pa_schema)

# Write data using table API
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)

# Commit the data
commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)

# Read data back
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
result = table_read.to_arrow(table_scan.plan().splits())

# Verify the data was written and read correctly
self.assertEqual(result.num_rows, 1, "Should have 1 row")
self.assertEqual(result.column('id').to_pylist(), [1], "ID should match")
self.assertEqual(result.column('name').to_pylist(), ['paimon'], "Name should match")
self.assertEqual(result.num_rows, 1)
self.assertEqual(result.column('id').to_pylist(), [1])
self.assertEqual(result.column('name').to_pylist(), ['paimon'])

# Get the blob descriptor bytes from the result
# Read always resolves descriptor to actual blob data (aligned with Java getBlob)
picture_bytes = result.column('picture').to_pylist()[0]
self.assertIsInstance(picture_bytes, bytes, "Picture should be bytes")

# Deserialize the blob descriptor
new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes)

# The URI might be different if the blob was stored in the table's data directory
# Let's verify the descriptor properties and try to read the data
# Note: offset might be non-zero due to blob file format overhead
self.assertGreaterEqual(new_blob_descriptor.offset, 0, "Offset should be non-negative")
self.assertEqual(new_blob_descriptor.length, len(blob_data), "Length should match")

# Create URI reader factory and read the blob data
catalog_options = {CatalogOptions.WAREHOUSE.key(): self.warehouse}
uri_reader_factory = UriReaderFactory(catalog_options)
uri_reader = uri_reader_factory.create(new_blob_descriptor.uri)
blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)

blob_descriptor_from_blob = blob.to_descriptor()
self.assertEqual(
blob_descriptor_from_blob.uri,
new_blob_descriptor.uri,
f"URI should be preserved. Expected: {new_blob_descriptor.uri}, Got: {blob_descriptor_from_blob.uri}"
)

# Verify the blob data matches the original
self.assertEqual(blob.to_data(), blob_data, "Blob data should match original")
self.assertIsInstance(picture_bytes, bytes)
self.assertEqual(picture_bytes, blob_data)

print("✅ Blob descriptor end-to-end test passed:")
print(" - Created external blob file and descriptor")
Expand Down
42 changes: 6 additions & 36 deletions paimon-python/pypaimon/tests/blob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,52 +1134,25 @@ def test_blob_end_to_end_with_descriptor(self):
self.assertGreater(file_size, 0)

# ========== Step 3: Read data and check ==========
# Define schema for reading
# Reading always resolves blob to actual content (aligned with Java getBlob behavior)
read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
reader = FormatBlobReader(
file_io=file_io,
file_path=str(blob_file_path),
read_fields=[blob_field_name],
full_fields=read_fields,
push_down_predicate=None,
blob_as_descriptor=True
)

# Read with blob_as_descriptor=True (read output as descriptor bytes)
batch = reader.read_arrow_batch()
self.assertIsNotNone(batch)
self.assertEqual(batch.num_rows, 1)
self.assertEqual(batch.num_columns, 1)

read_blob_bytes = batch.column(0)[0].as_py()
self.assertIsInstance(read_blob_bytes, bytes)

# Deserialize the returned descriptor
returned_descriptor = BlobDescriptor.deserialize(read_blob_bytes)

# The returned descriptor should point to the blob file (simplified implementation)
# because the current implementation creates a descriptor pointing to the blob file location
self.assertEqual(returned_descriptor.uri, str(blob_file_path))
self.assertGreater(returned_descriptor.offset, 0) # Should have some offset in the blob file

reader.close()

reader_content = FormatBlobReader(
file_io=file_io,
file_path=str(blob_file_path),
read_fields=[blob_field_name],
full_fields=read_fields,
push_down_predicate=None,
blob_as_descriptor=False
)
batch_content = reader_content.read_arrow_batch()
self.assertIsNotNone(batch_content)
self.assertEqual(batch_content.num_rows, 1)
read_content_bytes = batch_content.column(0)[0].as_py()
read_content_bytes = batch.column(0)[0].as_py()
self.assertIsInstance(read_content_bytes, bytes)
# With blob_as_descriptor=False, we should get the actual blob content
self.assertEqual(read_content_bytes, test_content)
reader_content.close()
reader.close()

def test_null_blob_write(self):
from pypaimon.write.blob_format_writer import BlobFormatWriter
Expand Down Expand Up @@ -1229,7 +1202,7 @@ def test_null_blob_read(self):
self.assertEqual(batch.column(0)[2].as_py(), b"world")
reader.close()

def test_null_blob_read_as_descriptor(self):
def test_null_blob_read_resolves_content(self):
from pypaimon.write.blob_format_writer import BlobFormatWriter

file_io = LocalFileIO(self.temp_dir, Options({}))
Expand All @@ -1251,17 +1224,14 @@ def test_null_blob_read_as_descriptor(self):
read_fields=[blob_field_name],
full_fields=read_fields,
push_down_predicate=None,
blob_as_descriptor=True
)

batch = reader.read_arrow_batch()
self.assertIsNotNone(batch)
self.assertEqual(batch.num_rows, 3)
desc0 = BlobDescriptor.deserialize(batch.column(0)[0].as_py())
self.assertEqual(desc0.uri, blob_file_path)
self.assertEqual(batch.column(0)[0].as_py(), b"hello")
self.assertIsNone(batch.column(0)[1].as_py())
desc2 = BlobDescriptor.deserialize(batch.column(0)[2].as_py())
self.assertEqual(desc2.uri, blob_file_path)
self.assertEqual(batch.column(0)[2].as_py(), b"world")
reader.close()


Expand Down
Loading