From 7b5b15e82e74c1989e4d8d434f79fc2d47baee53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Mon, 18 May 2026 19:13:17 +0800 Subject: [PATCH 1/6] [core] introduce Placeholder for Blob File Format --- .../java/org/apache/paimon/data/Blob.java | 25 + .../operation/BlobFallbackRecordReader.java | 326 +++++++++++++ .../operation/DataEvolutionSplitRead.java | 258 ++++++---- .../apache/paimon/append/BlobTableTest.java | 118 +++++ .../operation/DataEvolutionReadTest.java | 457 +++++++++++++----- .../paimon/format/blob/BlobFileMeta.java | 10 +- .../paimon/format/blob/BlobFormatReader.java | 2 + .../paimon/format/blob/BlobFormatWriter.java | 10 +- .../format/blob/BlobFileFormatTest.java | 104 +++- 9 files changed, 1081 insertions(+), 229 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java index f11a056efbcf..c3ba17513bf0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java @@ -38,6 +38,31 @@ @Public public interface Blob { + /** + * The placeholder blob, mainly for blob update in data-evolution. It should never be exposed to + * users. + */ + Blob PLACE_HOLDER = + new Blob() { + @Override + public byte[] toData() { + throw new UnsupportedOperationException( + "Should never call this method for placeholder blob."); + } + + @Override + public BlobDescriptor toDescriptor() { + throw new UnsupportedOperationException( + "Should never call this method for placeholder blob."); + } + + @Override + public SeekableInputStream newInputStream() throws IOException { + throw new UnsupportedOperationException( + "Should never call this method for placeholder blob."); + } + }; + byte[] toData(); BlobDescriptor toDescriptor(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java new file mode 100644 index 000000000000..246eced12920 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.append.ForceSingleBatchReader; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReader.RecordIterator; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Range; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static java.util.Collections.reverseOrder; +import static java.util.Comparator.comparingLong; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Resolves blob placeholder rows by falling back through older sequence groups. The read logic is + * as below: + * + *
    + *
  1. Group files by max-seq, higher-seq files will have newer records. + *
  2. Sort files by range within each group, and create sequential readers for them. Note that + * absent ranges will be read as all-placeholder rows. + *
  3. Sort by max-seq and merge read all readers, records at each index will be the first + * non-placeholder blob. + *
+ */ +public class BlobFallbackRecordReader implements RecordReader { + + private final List> groupReaders = new ArrayList<>(); + private final int blobIndex; + private boolean returned; + + BlobFallbackRecordReader( + List files, + long rowCount, + BlobFileReaderFactory readerFactory, + List rowRanges, + RowType readRowType, + int blobIndex) { + this.blobIndex = blobIndex; + + checkArgument(!files.isEmpty(), "Blob bunch should not be empty."); + long firstRowId = + files.stream().mapToLong(DataFileMeta::nonNullFirstRowId).min().getAsLong(); + long lastRowId = firstRowId + rowCount - 1; + + // sort descendent group readers in descending order + Map> sequenceGroups = new TreeMap<>(reverseOrder()); + for (DataFileMeta file : files) { + sequenceGroups + .computeIfAbsent(file.maxSequenceNumber(), ignored -> new ArrayList<>()) + .add(file); + } + + for (Map.Entry> entry : sequenceGroups.entrySet()) { + // within each group, sort by first row id + List groupFiles = entry.getValue(); + groupFiles.sort(comparingLong(DataFileMeta::nonNullFirstRowId)); + groupReaders.add( + new ForceSingleBatchReader( + new BlobSequenceGroupRecordReader( + groupFiles, + readerFactory, + rowRanges, + readRowType, + blobIndex, + firstRowId, + lastRowId, + entry.getKey()))); + } + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + if (returned) { + return null; + } + returned = true; + + RecordIterator[] iterators = new RecordIterator[groupReaders.size()]; + for (int i = 0; i < groupReaders.size(); i++) { + RecordIterator iterator = groupReaders.get(i).readBatch(); + if (iterator == null) { + return null; + } + iterators[i] = iterator; + } + + return new RecordIterator() { + @Nullable + @Override + public InternalRow next() throws IOException { + InternalRow result = null; + // we should always move each iterator forward + for (RecordIterator iterator : iterators) { + InternalRow row = iterator.next(); + if (row == null) { + return null; + } + // result is the first non-placeholder record + if (result == null && !isPlaceHolder(row)) { + result = row; + } + } + if (result == null) { + throw new IllegalStateException( + "Invalid state: all blob files at the same row id store a placeholder, it's a bug."); + } + return result; + } + + @Override + public void releaseBatch() { + for (RecordIterator iterator : iterators) { + iterator.releaseBatch(); + } + } + }; + } + + private boolean isPlaceHolder(InternalRow row) { + return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) == Blob.PLACE_HOLDER; + } + + @Override + public void close() throws IOException { + IOException exception = null; + for (RecordReader reader : groupReaders) { + try { + reader.close(); + } catch (IOException e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + } + } + if (exception != null) { + throw exception; + } + } + + /** + * Reads one blob sequence group (all blob files with the same max_seq_num) and emits + * placeholder rows for row id gaps. For example, if the full row range is [0, 100], but there's + * only one blob file with row range [20, 80], then the rows with row id [0, 19] and [81, 100] + * will be emitted as placeholder rows (and also maybe enriched with special fields). + */ + public static class BlobSequenceGroupRecordReader implements RecordReader { + + private final List files; + private final BlobFileReaderFactory readerFactory; + // pushed row ranges + private final List rowRanges; + private final RowType readRowType; + private final int blobIndex; + private final long lastRowId; + private final long sequenceNumber; + + private RecordReader currentReader; + private DataFileMeta currentFile; + private int fileIndex; + private long nextRowId; + + BlobSequenceGroupRecordReader( + List files, + BlobFileReaderFactory readerFactory, + List rowRanges, + RowType readRowType, + int blobIndex, + long firstRowId, + long lastRowId, + long sequenceNumber) { + this.files = files; + this.readerFactory = readerFactory; + this.rowRanges = rowRanges == null ? null : Range.sortAndMergeOverlap(rowRanges); + this.readRowType = readRowType; + this.blobIndex = blobIndex; + this.lastRowId = lastRowId; + this.sequenceNumber = sequenceNumber; + this.nextRowId = firstRowId; + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + while (true) { + nextRowId = nextSelectedRowId(nextRowId); + if (nextRowId > lastRowId) { + return null; + } + + if (currentReader != null) { + RecordIterator batch = currentReader.readBatch(); + if (batch != null) { + return batch; + } + long afterFile = lastRowId(currentFile) + 1; + closeCurrentFileReader(); + nextRowId = afterFile; + continue; + } + + while (fileIndex < files.size() && lastRowId(files.get(fileIndex)) < nextRowId) { + fileIndex++; + } + if (fileIndex >= files.size()) { + return placeHolderBatch(lastRowId); + } + + DataFileMeta nextFile = files.get(fileIndex); + if (nextFile.nonNullFirstRowId() > nextRowId) { + return placeHolderBatch(nextFile.nonNullFirstRowId() - 1); + } + + currentFile = nextFile; + currentReader = readerFactory.create(nextFile); + fileIndex++; + } + } + + private long nextSelectedRowId(long rowId) { + if (rowRanges == null) { + return rowId; + } + for (Range range : rowRanges) { + if (rowId < range.from) { + return range.from; + } + if (rowId <= range.to) { + return rowId; + } + } + return lastRowId + 1; + } + + private RecordIterator placeHolderBatch(long endRowId) { + long startRowId = nextRowId; + nextRowId = endRowId + 1; + return new RecordIterator() { + + long rowId = startRowId; + + @Nullable + @Override + public InternalRow next() { + rowId = nextSelectedRowId(rowId); + if (rowId > endRowId) { + return null; + } + return placeHolderRow(rowId++); + } + + @Override + public void releaseBatch() {} + }; + } + + private GenericRow placeHolderRow(long rowId) { + GenericRow row = new GenericRow(readRowType.getFieldCount()); + row.setField(blobIndex, Blob.PLACE_HOLDER); + for (int i = 0; i < readRowType.getFieldCount(); i++) { + String fieldName = readRowType.getFieldNames().get(i); + if (SpecialFields.ROW_ID.name().equals(fieldName)) { + row.setField(i, rowId); + } else if (SpecialFields.SEQUENCE_NUMBER.name().equals(fieldName)) { + row.setField(i, sequenceNumber); + } + } + return row; + } + + private long lastRowId(DataFileMeta file) { + return file.nonNullFirstRowId() + file.rowCount() - 1; + } + + private void closeCurrentFileReader() throws IOException { + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + currentFile = null; + } + + @Override + public void close() throws IOException { + closeCurrentFileReader(); + } + } + + interface BlobFileReaderFactory { + + RecordReader create(DataFileMeta file) throws IOException; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index c2f10b9a2b0c..261e3e7f8a12 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -46,10 +46,12 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.FormatReaderMapping; import org.apache.paimon.utils.FormatReaderMapping.Builder; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RangeHelper; import org.apache.paimon.utils.RoaringBitmap32; @@ -302,15 +304,16 @@ private DataEvolutionFileReader createUnionReader( dataSchema, readFields, false)); + RowType partialReadRowType = new RowType(readFields); fileRecordReaders[i] = new ForceSingleBatchReader( - createFileReader( + createFieldBunchReader( partition, bunch, dataFilePathFactory, formatReaderMapping, rowRanges, - readRowType)); + partialReadRowType)); } } @@ -327,6 +330,84 @@ private DataEvolutionFileReader createUnionReader( return new DataEvolutionFileReader(rowOffsets, fieldOffsets, fileRecordReaders); } + private RecordReader createFieldBunchReader( + BinaryRow partition, + FieldBunch bunch, + DataFilePathFactory dataFilePathFactory, + FormatReaderMapping formatReaderMapping, + List rowRanges, + RowType readRowType) + throws IOException { + if (bunch instanceof DataBunch) { + // for data bunch, directly read the single file + return createFileReader( + partition, + bunch.files().get(0), + dataFilePathFactory, + formatReaderMapping, + rowRanges, + readRowType); + } else if (bunch instanceof VectorFileBunch) { + // for vector bunch, sequential read all data files and concat them + List> readerSuppliers = new ArrayList<>(); + for (DataFileMeta file : bunch.files()) { + RoaringBitmap32 selection = file.toFileSelection(rowRanges); + FormatReaderContext formatReaderContext = + new FormatReaderContext( + fileIO, + dataFilePathFactory.toPath(file), + file.fileSize(), + selection); + readerSuppliers.add( + () -> + new DataFileRecordReader( + readRowType, + formatReaderMapping.getReaderFactory(), + formatReaderContext, + coreOptions.scanIgnoreCorruptFile(), + coreOptions.scanIgnoreLostFile(), + formatReaderMapping.getIndexMapping(), + formatReaderMapping.getCastMapping(), + PartitionUtils.create( + formatReaderMapping.getPartitionPair(), partition), + true, + file.firstRowId(), + file.maxSequenceNumber(), + formatReaderMapping.getSystemFields())); + } + return ConcatRecordReader.create(readerSuppliers); + } else if (bunch instanceof BlobFileBunch) { + // for blob funch, fallback on placeholders + int blobIndex = findBlobFieldIndex(readRowType); + checkArgument(blobIndex >= 0, "Blob bunch read type should contain a blob field."); + return new BlobFallbackRecordReader( + bunch.files(), + bunch.rowCount(), + file -> + createFileReader( + partition, + file, + dataFilePathFactory, + formatReaderMapping, + rowRanges, + readRowType), + rowRanges, + readRowType, + blobIndex); + } else { + throw new UnsupportedOperationException("Unsupported bunch type: " + bunch); + } + } + + private static int findBlobFieldIndex(RowType rowType) { + for (int i = 0; i < rowType.getFieldCount(); i++) { + if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) { + return i; + } + } + return -1; + } + private FileRecordReader createFileReader( BinaryRow partition, DataFilePathFactory dataFilePathFactory, @@ -351,49 +432,6 @@ private FileRecordReader createFileReader( partition, file, dataFilePathFactory, formatReaderMapping, rowRanges, readRowType); } - private RecordReader createFileReader( - BinaryRow partition, - FieldBunch bunch, - DataFilePathFactory dataFilePathFactory, - FormatReaderMapping formatReaderMapping, - List rowRanges, - RowType readRowType) - throws IOException { - if (bunch.files().size() == 1) { - return createFileReader( - partition, - bunch.files().get(0), - dataFilePathFactory, - formatReaderMapping, - rowRanges, - readRowType); - } - List> readerSuppliers = new ArrayList<>(); - for (DataFileMeta file : bunch.files()) { - RoaringBitmap32 selection = file.toFileSelection(rowRanges); - FormatReaderContext formatReaderContext = - new FormatReaderContext( - fileIO, dataFilePathFactory.toPath(file), file.fileSize(), selection); - readerSuppliers.add( - () -> - new DataFileRecordReader( - readRowType, - formatReaderMapping.getReaderFactory(), - formatReaderContext, - coreOptions.scanIgnoreCorruptFile(), - coreOptions.scanIgnoreLostFile(), - formatReaderMapping.getIndexMapping(), - formatReaderMapping.getCastMapping(), - PartitionUtils.create( - formatReaderMapping.getPartitionPair(), partition), - true, - file.firstRowId(), - file.maxSequenceNumber(), - formatReaderMapping.getSystemFields())); - } - return ConcatRecordReader.create(readerSuppliers); - } - private FileRecordReader createFileReader( BinaryRow partition, DataFileMeta file, @@ -433,8 +471,8 @@ public static List splitFieldBunches( Function fileToRowType, boolean rowIdPushDown) { List fieldsFiles = new ArrayList<>(); - Map blobBunchMap = new HashMap<>(); - Map vectorStoreBunchMap = new TreeMap<>(); + Map blobBunchMap = new HashMap<>(); + Map vectorStoreBunchMap = new TreeMap<>(); long rowCount = -1; for (DataFileMeta file : needMergeFiles) { if (isBlobFile(file.fileName())) { @@ -442,9 +480,7 @@ public static List splitFieldBunches( int fieldId = rowType.getField(file.writeCols().get(0)).id(); final long expectedRowCount = rowCount; blobBunchMap - .computeIfAbsent( - fieldId, - key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown)) + .computeIfAbsent(fieldId, key -> new BlobFileBunch(expectedRowCount)) .add(file); } else if (isVectorStoreFile(file.fileName())) { RowType rowType = fileToRowType.apply(file); @@ -456,7 +492,7 @@ public static List splitFieldBunches( vectorStoreBunchMap .computeIfAbsent( vectorStoreKey, - key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown)) + key -> new VectorFileBunch(expectedRowCount, rowIdPushDown)) .add(file); } else { // Normal file, just add it to the current merge split @@ -496,8 +532,61 @@ public List files() { } } + /** + * The {@link FieldBunch} for blobs. Compared to {@link VectorFileBunch} which only contains + * data files of the max max_seq number, {@link BlobFileBunch} contains all blob files. + */ + @VisibleForTesting + static class BlobFileBunch implements FieldBunch { + + final List files; + final List ranges; + final long expectedRowCount; + + BlobFileBunch(long expectedRowCount) { + this.files = new ArrayList<>(); + this.expectedRowCount = expectedRowCount; + this.ranges = new ArrayList<>(); + } + + void add(DataFileMeta file) { + if (!isBlobFile(file.fileName())) { + throw new IllegalArgumentException("Only blob file can be added to this bunch."); + } + if (!files.isEmpty()) { + checkArgument( + file.writeCols().equals(files.get(0).writeCols()), + "All files in this bunch should have the same write columns."); + } + + files.add(file); + ranges.add(file.nonNullRowIdRange()); + } + + @Override + public long rowCount() { + List merged = Range.sortAndMergeOverlap(ranges, true); + Preconditions.checkState( + merged.size() == 1, + "Blob file bunch should always contain a contiguous row range."); + + long rowCount = merged.get(0).count(); + Preconditions.checkState( + rowCount == expectedRowCount, + "The merged rowCount of blob file bunch should be aligned with normal files."); + + return rowCount; + } + + @Override + public List files() { + return files; + } + } + + /** {@link FieldBunch} for vector-store files. */ @VisibleForTesting - static class SpecialFieldBunch implements FieldBunch { + static class VectorFileBunch implements FieldBunch { final List files; final long expectedRowCount; @@ -508,70 +597,65 @@ static class SpecialFieldBunch implements FieldBunch { long latestMaxSequenceNumber = -1; long rowCount; - SpecialFieldBunch(long expectedRowCount, boolean rowIdPushDown) { + VectorFileBunch(long expectedRowCount, boolean rowIdPushDown) { this.files = new ArrayList<>(); - this.rowCount = 0; this.expectedRowCount = expectedRowCount; this.rowIdPushDown = rowIdPushDown; } void add(DataFileMeta file) { - if (!isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName())) { + if (!isVectorStoreFile(file.fileName())) { throw new IllegalArgumentException( - "Only blob/vector-store file can be added to this bunch."); + "Only vector-store file can be added to this bunch."); } - if (file.nonNullFirstRowId() == latestFistRowId) { if (file.maxSequenceNumber() >= latestMaxSequenceNumber) { throw new IllegalArgumentException( - "Blob/vector-store file with same first row id should have decreasing sequence number."); + "Vector file with same first row id should have decreasing sequence number."); } return; } + if (!files.isEmpty()) { long firstRowId = file.nonNullFirstRowId(); - if (rowIdPushDown) { - if (firstRowId < expectedNextFirstRowId) { - if (file.maxSequenceNumber() > latestMaxSequenceNumber) { - DataFileMeta lastFile = files.remove(files.size() - 1); - rowCount -= lastFile.rowCount(); - } else { - return; - } - } - } else { - if (firstRowId < expectedNextFirstRowId) { - checkArgument( - file.maxSequenceNumber() < latestMaxSequenceNumber, - "Blob/vector-store file with overlapping row id should have decreasing sequence number."); + if (rowIdPushDown && firstRowId < expectedNextFirstRowId) { + if (file.maxSequenceNumber() > latestMaxSequenceNumber) { + DataFileMeta lastFile = files.remove(files.size() - 1); + rowCount -= lastFile.rowCount(); + } else { return; - } else if (firstRowId > expectedNextFirstRowId) { - throw new IllegalArgumentException( - "Blob/vector-store file first row id should be continuous, expect " - + expectedNextFirstRowId - + " but got " - + firstRowId); } + } else if (firstRowId < expectedNextFirstRowId) { + checkArgument( + file.maxSequenceNumber() < latestMaxSequenceNumber, + "Vector file with overlapping row id should have decreasing sequence number."); + return; + } else if (!rowIdPushDown && firstRowId > expectedNextFirstRowId) { + throw new IllegalArgumentException( + "Vector file first row id should be continuous, expect " + + expectedNextFirstRowId + + " but got " + + firstRowId); } + if (!files.isEmpty()) { - if (!isBlobFile(file.fileName())) { - checkArgument( - file.schemaId() == files.get(0).schemaId(), - "All files in this bunch should have the same schema id."); - } + checkArgument( + file.schemaId() == files.get(0).schemaId(), + "All files in this bunch should have the same schema id."); checkArgument( file.writeCols().equals(files.get(0).writeCols()), "All files in this bunch should have the same write columns."); } } + files.add(file); rowCount += file.rowCount(); checkArgument( rowCount <= expectedRowCount, - "Blob/vector-store files row count exceed the expect " + expectedRowCount); - this.latestMaxSequenceNumber = file.maxSequenceNumber(); - this.latestFistRowId = file.nonNullFirstRowId(); - this.expectedNextFirstRowId = latestFistRowId + file.rowCount(); + "Vector files row count exceed the expect " + expectedRowCount); + latestMaxSequenceNumber = file.maxSequenceNumber(); + latestFistRowId = file.nonNullFirstRowId(); + expectedNextFirstRowId = latestFistRowId + file.rowCount(); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index 38aabeda6a7b..8a3e0e5ff98b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator; import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobData; @@ -30,15 +31,22 @@ import org.apache.paimon.data.BlobViewStruct; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.blob.BlobFileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.DataEvolutionSplitRead; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; @@ -48,6 +56,7 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.system.RowTrackingTable; import org.apache.paimon.types.DataField; @@ -150,6 +159,73 @@ public void testBasic() throws Exception { assertThat(integer.get()).isEqualTo(1000); } + @Test + public void testReadBlobPlaceHolderFallback() throws Exception { + createTableDefault(); + writeDataDefault( + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("first"), new BlobData(blobBytes)), + GenericRow.of( + 2, BinaryString.fromString("second"), new BlobData(blobBytes)), + GenericRow.of( + 3, BinaryString.fromString("third"), new BlobData(blobBytes)))); + + FileStoreTable table = getTableDefault(); + List files = + table.store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + DataFileMeta dataFile = + files.stream() + .filter(file -> !BlobFileFormat.isBlobFile(file.fileName())) + .findFirst() + .get(); + DataFileMeta oldBlobFile = + files.stream() + .filter(file -> BlobFileFormat.isBlobFile(file.fileName())) + .findFirst() + .get(); + + byte[] updatedBytes = "updated-blob".getBytes(); + DataFilePathFactory pathFactory = + table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0); + DataFileMeta newBlobFile = + writeBlobFile( + table.fileIO(), + pathFactory.newBlobPath(), + Arrays.asList(Blob.PLACE_HOLDER, new BlobData(updatedBytes)), + dataFile.nonNullFirstRowId(), + oldBlobFile.maxSequenceNumber() + 1, + oldBlobFile.schemaId(), + oldBlobFile.writeCols()); + + DataSplit split = + DataSplit.builder() + .withSnapshot(1L) + .withPartition(BinaryRow.EMPTY_ROW) + .withBucket(0) + .withBucketPath(pathFactory.parent().toString()) + .withDataFiles(Arrays.asList(dataFile, newBlobFile, oldBlobFile)) + .build(); + + DataEvolutionSplitRead read = + new DataEvolutionSplitRead( + table.fileIO(), + table.schemaManager(), + table.schema(), + table.rowType(), + table.coreOptions(), + table.store().pathFactory()); + + List actual = new ArrayList<>(); + read.createReader(split).forEachRemaining(row -> actual.add(row.getBlob(2).toData())); + + assertThat(actual.size()).isEqualTo(3); + assertThat(actual.get(0)).isEqualTo(blobBytes); + assertThat(actual.get(1)).isEqualTo(updatedBytes); + assertThat(actual.get(2)).isEqualTo(blobBytes); + } + @Test public void testWriteByInputStream() throws Exception { createTableDefault(); @@ -971,6 +1047,48 @@ private static void writeFile(FileIO fileIO, Path path, byte[] bytes) throws IOE } } + private static DataFileMeta writeBlobFile( + FileIO fileIO, + Path path, + List blobs, + long firstRowId, + long maxSequenceNumber, + long schemaId, + List writeCols) + throws IOException { + try (PositionOutputStream out = fileIO.newOutputStream(path, false)) { + FormatWriter writer = + new BlobFileFormat() + .createWriterFactory(RowType.of(DataTypes.BLOB())) + .create(out, "none"); + for (Blob blob : blobs) { + writer.addElement(GenericRow.of(blob)); + } + writer.close(); + } + return DataFileMeta.create( + path.getName(), + fileIO.getFileSize(path), + blobs.size(), + DataFileMeta.EMPTY_MIN_KEY, + DataFileMeta.EMPTY_MAX_KEY, + SimpleStats.EMPTY_STATS, + SimpleStats.EMPTY_STATS, + 0, + maxSequenceNumber, + schemaId, + DataFileMeta.DUMMY_LEVEL, + Collections.emptyList(), + Timestamp.fromEpochMillis(System.currentTimeMillis()), + 0L, + null, + FileSource.APPEND, + null, + null, + firstRowId, + writeCols); + } + private static long countFilesWithSuffix(FileIO fileIO, Path root, String suffix) throws IOException { long count = 0; diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java index 9bb9a7274ed1..fb5cc101274b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java @@ -18,15 +18,25 @@ package org.apache.paimon.operation; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.operation.BlobFallbackRecordReader.BlobSequenceGroupRecordReader; +import org.apache.paimon.operation.DataEvolutionSplitRead.BlobFileBunch; import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch; -import org.apache.paimon.operation.DataEvolutionSplitRead.SpecialFieldBunch; +import org.apache.paimon.operation.DataEvolutionSplitRead.VectorFileBunch; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReader.RecordIterator; import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Range; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,150 +52,149 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Tests for {@link SpecialFieldBunch}. */ +/** Tests for blob and vector field bunches. */ public class DataEvolutionReadTest { - private SpecialFieldBunch blobBunch; + private VectorFileBunch vectorBunch; @BeforeEach public void setUp() { - blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, false); + vectorBunch = new VectorFileBunch(Long.MAX_VALUE, false); } @Test - public void testAddSingleBlobEntry() { - DataFileMeta blobEntry = createBlobFile("blob1", 0L, 100L, 1L); + public void testAddSingleVectorEntry() { + DataFileMeta vectorEntry = createVectorFile("vector1", 0L, 100L, 1L); - blobBunch.add(blobEntry); + vectorBunch.add(vectorEntry); - assertThat(blobBunch.files).hasSize(1); - assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry); - assertThat(blobBunch.rowCount()).isEqualTo(100); - assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0); - assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col")); + assertThat(vectorBunch.files).hasSize(1); + assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry); + assertThat(vectorBunch.rowCount()).isEqualTo(100); + assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0); + assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col")); } @Test - public void testAddBlobEntryAndTail() { - DataFileMeta blobEntry = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobTail = createBlobFile("blob2", 100, 200, 1); - - blobBunch.add(blobEntry); - blobBunch.add(blobTail); - - assertThat(blobBunch.files).hasSize(2); - assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry); - assertThat(blobBunch.files.get(1)).isEqualTo(blobTail); - assertThat(blobBunch.rowCount()).isEqualTo(300); - assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0); - assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col")); - assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L); + public void testAddVectorEntryAndTail() { + DataFileMeta vectorEntry = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorTail = createVectorFile("vector2", 100, 200, 1); + + vectorBunch.add(vectorEntry); + vectorBunch.add(vectorTail); + + assertThat(vectorBunch.files).hasSize(2); + assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry); + assertThat(vectorBunch.files.get(1)).isEqualTo(vectorTail); + assertThat(vectorBunch.rowCount()).isEqualTo(300); + assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0); + assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col")); + assertThat(vectorBunch.files.get(0).schemaId()).isEqualTo(0L); } @Test - public void testAddNonBlobFileThrowsException() { + public void testAddNonVectorFileThrowsException() { DataFileMeta normalFile = createNormalFile("normal1.parquet", 0, 100, 1, 0L); - assertThatThrownBy(() -> blobBunch.add(normalFile)) + assertThatThrownBy(() -> vectorBunch.add(normalFile)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Only blob/vector-store file can be added to this bunch."); + .hasMessage("Only vector-store file can be added to this bunch."); } @Test - public void testAddBlobFileWithSameFirstRowId() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 2); + public void testAddVectorFileWithSameFirstRowId() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 2); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with same firstRowId but higher sequence number should throw exception - assertThatThrownBy(() -> blobBunch.add(blobEntry2)) + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob/vector-store file with same first row id should have decreasing sequence number."); + "Vector file with same first row id should have decreasing sequence number."); } @Test - public void testAddBlobFileWithSameFirstRowIdAndLowerSequenceNumber() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2); - DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 1); + public void testAddVectorFileWithSameFirstRowIdAndLowerSequenceNumber() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 1); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with same firstRowId and lower sequence number should be ignored - blobBunch.add(blobEntry2); + vectorBunch.add(vectorEntry2); - assertThat(blobBunch.files).hasSize(1); - assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1); + assertThat(vectorBunch.files).hasSize(1); + assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1); } @Test - public void testAddBlobFileWithOverlappingRowId() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2); - DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 1); + public void testAddVectorFileWithOverlappingRowId() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 1); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with overlapping row id and lower sequence number should be ignored - blobBunch.add(blobEntry2); + vectorBunch.add(vectorEntry2); - assertThat(blobBunch.files).hasSize(1); - assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1); + assertThat(vectorBunch.files).hasSize(1); + assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1); } @Test - public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 2); + public void testAddVectorFileWithOverlappingRowIdAndHigherSequenceNumber() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 2); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with overlapping row id and higher sequence number should throw exception - assertThatThrownBy(() -> blobBunch.add(blobEntry2)) + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob/vector-store file with overlapping row id should have decreasing sequence number."); + "Vector file with overlapping row id should have decreasing sequence number."); } @Test - public void testAddBlobFileWithNonContinuousRowId() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1); + public void testAddVectorFileWithNonContinuousRowId() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with non-continuous row id should throw exception - assertThatThrownBy(() -> blobBunch.add(blobEntry2)) + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob/vector-store file first row id should be continuous, expect 100 but got 200"); + "Vector file first row id should be continuous, expect 100 but got 200"); } @Test - public void testAddBlobFileWithDifferentWriteCols() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = - createBlobFileWithCols("blob2", 100, 200, 1, Arrays.asList("different_col")); + public void testAddVectorFileWithDifferentWriteCols() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = + createVectorFileWithCols("vector2", 100, 200, 1, Arrays.asList("different_col")); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with different write columns should throw exception - assertThatThrownBy(() -> blobBunch.add(blobEntry2)) + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("All files in this bunch should have the same write columns."); } @Test - public void testComplexBlobBunchScenario() { - // Create a complex scenario with multiple blob entries and a tail - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 100, 200, 1); - DataFileMeta blobEntry3 = createBlobFile("blob3", 300, 300, 1); - DataFileMeta blobTail = createBlobFile("blob4", 600, 400, 1); - - blobBunch.add(blobEntry1); - blobBunch.add(blobEntry2); - blobBunch.add(blobEntry3); - blobBunch.add(blobTail); - - assertThat(blobBunch.files).hasSize(4); - assertThat(blobBunch.rowCount()).isEqualTo(1000); - assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0); - assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col")); + public void testComplexVectorBunchScenario() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 100, 200, 1); + DataFileMeta vectorEntry3 = createVectorFile("vector3", 300, 300, 1); + DataFileMeta vectorTail = createVectorFile("vector4", 600, 400, 1); + + vectorBunch.add(vectorEntry1); + vectorBunch.add(vectorEntry2); + vectorBunch.add(vectorEntry3); + vectorBunch.add(vectorTail); + + assertThat(vectorBunch.files).hasSize(4); + assertThat(vectorBunch.rowCount()).isEqualTo(1000); + assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0); + assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col")); } @Test @@ -209,26 +218,31 @@ public void testComplexBlobBunchScenario2() { List batch = batches.get(0); - assertThat(batch.get(1).fileName()).contains("blob5"); // pick - assertThat(batch.get(2).fileName()).contains("blob2"); // skip - assertThat(batch.get(3).fileName()).contains("blob1"); // skip - assertThat(batch.get(4).fileName()).contains("blob9"); // pick - assertThat(batch.get(5).fileName()).contains("blob6"); // skip - assertThat(batch.get(6).fileName()).contains("blob3"); // skip - assertThat(batch.get(7).fileName()).contains("blob7"); // pick - assertThat(batch.get(8).fileName()).contains("blob4"); // skip - assertThat(batch.get(9).fileName()).contains("blob8"); // pick + assertThat(batch.get(1).fileName()).contains("blob5"); + assertThat(batch.get(2).fileName()).contains("blob2"); + assertThat(batch.get(3).fileName()).contains("blob1"); + assertThat(batch.get(4).fileName()).contains("blob9"); + assertThat(batch.get(5).fileName()).contains("blob6"); + assertThat(batch.get(6).fileName()).contains("blob3"); + assertThat(batch.get(7).fileName()).contains("blob7"); + assertThat(batch.get(8).fileName()).contains("blob4"); + assertThat(batch.get(9).fileName()).contains("blob8"); List fieldBunches = splitFieldBunches(batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); assertThat(fieldBunches.size()).isEqualTo(2); - SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1); - assertThat(blobBunch.files).hasSize(4); + BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1); + assertThat(blobBunch.files).hasSize(9); assertThat(blobBunch.files.get(0).fileName()).contains("blob5"); - assertThat(blobBunch.files.get(1).fileName()).contains("blob9"); - assertThat(blobBunch.files.get(2).fileName()).contains("blob7"); - assertThat(blobBunch.files.get(3).fileName()).contains("blob8"); + assertThat(blobBunch.files.get(1).fileName()).contains("blob2"); + assertThat(blobBunch.files.get(2).fileName()).contains("blob1"); + assertThat(blobBunch.files.get(3).fileName()).contains("blob9"); + assertThat(blobBunch.files.get(4).fileName()).contains("blob6"); + assertThat(blobBunch.files.get(5).fileName()).contains("blob3"); + assertThat(blobBunch.files.get(6).fileName()).contains("blob7"); + assertThat(blobBunch.files.get(7).fileName()).contains("blob4"); + assertThat(blobBunch.files.get(8).fileName()).contains("blob8"); } @Test @@ -275,19 +289,29 @@ public void testComplexBlobBunchScenario3() { batch, file -> makeBlobRowType(file.writeCols(), String::hashCode)); assertThat(fieldBunches.size()).isEqualTo(3); - SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1); - assertThat(blobBunch.files).hasSize(4); + BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1); + assertThat(blobBunch.files).hasSize(9); assertThat(blobBunch.files.get(0).fileName()).contains("blob5"); - assertThat(blobBunch.files.get(1).fileName()).contains("blob9"); - assertThat(blobBunch.files.get(2).fileName()).contains("blob7"); - assertThat(blobBunch.files.get(3).fileName()).contains("blob8"); - - blobBunch = (SpecialFieldBunch) fieldBunches.get(2); - assertThat(blobBunch.files).hasSize(4); + assertThat(blobBunch.files.get(1).fileName()).contains("blob2"); + assertThat(blobBunch.files.get(2).fileName()).contains("blob1"); + assertThat(blobBunch.files.get(3).fileName()).contains("blob9"); + assertThat(blobBunch.files.get(4).fileName()).contains("blob6"); + assertThat(blobBunch.files.get(5).fileName()).contains("blob3"); + assertThat(blobBunch.files.get(6).fileName()).contains("blob7"); + assertThat(blobBunch.files.get(7).fileName()).contains("blob4"); + assertThat(blobBunch.files.get(8).fileName()).contains("blob8"); + + blobBunch = (BlobFileBunch) fieldBunches.get(2); + assertThat(blobBunch.files).hasSize(9); assertThat(blobBunch.files.get(0).fileName()).contains("blob15"); - assertThat(blobBunch.files.get(1).fileName()).contains("blob19"); - assertThat(blobBunch.files.get(2).fileName()).contains("blob17"); - assertThat(blobBunch.files.get(3).fileName()).contains("blob18"); + assertThat(blobBunch.files.get(1).fileName()).contains("blob12"); + assertThat(blobBunch.files.get(2).fileName()).contains("blob11"); + assertThat(blobBunch.files.get(3).fileName()).contains("blob19"); + assertThat(blobBunch.files.get(4).fileName()).contains("blob16"); + assertThat(blobBunch.files.get(5).fileName()).contains("blob13"); + assertThat(blobBunch.files.get(6).fileName()).contains("blob17"); + assertThat(blobBunch.files.get(7).fileName()).contains("blob14"); + assertThat(blobBunch.files.get(8).fileName()).contains("blob18"); } /** Creates a blob file with the specified parameters. */ @@ -357,8 +381,146 @@ private DataFileMeta createBlobFileWithCols( writeCols); } + private DataFileMeta createVectorFile( + String fileName, long firstRowId, long rowCount, long maxSequenceNumber) { + return createVectorFileWithCols( + fileName, firstRowId, rowCount, maxSequenceNumber, Arrays.asList("vector_col")); + } + + private DataFileMeta createVectorFileWithSchema( + String fileName, + long firstRowId, + long rowCount, + long maxSequenceNumber, + long schemaId) { + return createFile( + fileName + ".vector.avro", + firstRowId, + rowCount, + maxSequenceNumber, + schemaId, + Arrays.asList("vector_col")); + } + + private DataFileMeta createVectorFileWithCols( + String fileName, + long firstRowId, + long rowCount, + long maxSequenceNumber, + List writeCols) { + return createFile( + fileName + ".vector.avro", firstRowId, rowCount, maxSequenceNumber, 0L, writeCols); + } + + private DataFileMeta createFile( + String fileName, + long firstRowId, + long rowCount, + long maxSequenceNumber, + long schemaId, + List writeCols) { + return DataFileMeta.create( + fileName, + rowCount, + rowCount, + DataFileMeta.EMPTY_MIN_KEY, + DataFileMeta.EMPTY_MAX_KEY, + SimpleStats.EMPTY_STATS, + SimpleStats.EMPTY_STATS, + 0, + maxSequenceNumber, + schemaId, + DataFileMeta.DUMMY_LEVEL, + Collections.emptyList(), + Timestamp.fromEpochMillis(System.currentTimeMillis()), + rowCount, + null, + FileSource.APPEND, + null, + null, + firstRowId, + writeCols); + } + + private static List fileRows(DataFileMeta file, List rowRanges) { + List rows = new ArrayList<>(); + long lastRowId = file.nonNullFirstRowId() + file.rowCount() - 1; + for (long rowId = file.nonNullFirstRowId(); rowId <= lastRowId; rowId++) { + if (selected(rowId, rowRanges)) { + rows.add(blobRow(rowId, file.maxSequenceNumber(), false)); + } + } + return rows; + } + + private static boolean selected(long rowId, List rowRanges) { + for (Range range : rowRanges) { + if (rowId < range.from) { + return false; + } + if (rowId <= range.to) { + return true; + } + } + return false; + } + + private static InternalRow blobRow(long rowId, long sequenceNumber, boolean placeholder) { + GenericRow row = new GenericRow(3); + row.setField(0, placeholder ? Blob.PLACE_HOLDER : new BlobData(new byte[] {(byte) rowId})); + row.setField(1, rowId); + row.setField(2, sequenceNumber); + return row; + } + + private static RecordReader oneRowPerBatchReader(List rows) { + return new RecordReader() { + + int index; + + @Override + public RecordIterator readBatch() { + if (index >= rows.size()) { + return null; + } + InternalRow row = rows.get(index++); + return new RecordIterator() { + + boolean returned; + + @Override + public InternalRow next() { + if (returned) { + return null; + } + returned = true; + return row; + } + + @Override + public void releaseBatch() {} + }; + } + + @Override + public void close() {} + }; + } + + @Test + void testAddVectorFilesWithDifferentSchemaId() { + DataFileMeta vectorEntry1 = createVectorFileWithSchema("vector1", 0, 100, 1, 0L); + DataFileMeta vectorEntry2 = createVectorFileWithSchema("vector2", 100, 200, 1, 1L); + + vectorBunch.add(vectorEntry1); + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("All files in this bunch should have the same schema id."); + } + @Test void testAddBlobFilesWithDifferentSchemaId() { + BlobFileBunch blobBunch = new BlobFileBunch(300); DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1, 0L); DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200, 1, 1L); @@ -373,24 +535,71 @@ void testAddBlobFilesWithDifferentSchemaId() { @Test public void testRowIdPushDown() { - SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true); - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1); - blobBunch.add(blobEntry1); - SpecialFieldBunch finalBlobBunch = blobBunch; - DataFileMeta finalBlobEntry = blobEntry2; - assertThatCode(() -> finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException(); + VectorFileBunch vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true); + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1); + vectorBunch.add(vectorEntry1); + VectorFileBunch finalVectorBunch = vectorBunch; + DataFileMeta finalVectorEntry = vectorEntry2; + assertThatCode(() -> finalVectorBunch.add(finalVectorEntry)).doesNotThrowAnyException(); + + vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true); + vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + vectorEntry2 = createVectorFile("vector2", 50, 200, 2); + vectorBunch.add(vectorEntry1); + vectorBunch.add(vectorEntry2); + assertThat(vectorBunch.files).containsExactlyInAnyOrder(vectorEntry2); + + VectorFileBunch finalVectorBunch2 = vectorBunch; + DataFileMeta vectorEntry3 = createVectorFile("vector2", 250, 100, 2); + assertThatCode(() -> finalVectorBunch2.add(vectorEntry3)).doesNotThrowAnyException(); + } - blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true); - blobEntry1 = createBlobFile("blob1", 0, 100, 1); - blobEntry2 = createBlobFile("blob2", 50, 200, 2); - blobBunch.add(blobEntry1); - blobBunch.add(blobEntry2); - assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2); + @Test + public void testBlobSequenceGroupReaderWithRowRanges() throws Exception { + List files = + Arrays.asList(createBlobFile("blob1", 0, 3, 10), createBlobFile("blob2", 5, 2, 10)); + List rowRanges = Arrays.asList(new Range(1, 1), new Range(3, 5)); + RowType readRowType = + new RowType( + Arrays.asList( + new DataField(0, "blob_col", DataTypes.BLOB()), + new DataField(1, SpecialFields.ROW_ID.name(), DataTypes.BIGINT()), + new DataField( + 2, + SpecialFields.SEQUENCE_NUMBER.name(), + DataTypes.BIGINT()))); + + BlobSequenceGroupRecordReader reader = + new BlobSequenceGroupRecordReader( + files, + file -> oneRowPerBatchReader(fileRows(file, rowRanges)), + rowRanges, + readRowType, + 0, + 0, + 6, + 10); + + List rowIds = new ArrayList<>(); + List placeholders = new ArrayList<>(); + List batchSizes = new ArrayList<>(); + RecordIterator batch; + while ((batch = reader.readBatch()) != null) { + int batchSize = 0; + InternalRow row; + while ((row = batch.next()) != null) { + rowIds.add(row.getLong(1)); + placeholders.add(row.getBlob(0) == Blob.PLACE_HOLDER); + batchSize++; + } + batch.releaseBatch(); + batchSizes.add(batchSize); + } - SpecialFieldBunch finalBlobBunch2 = blobBunch; - DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2); - assertThatCode(() -> finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException(); + assertThat(rowIds).containsExactly(1L, 3L, 4L, 5L); + assertThat(placeholders).containsExactly(false, true, true, false); + assertThat(batchSizes).containsExactly(1, 2, 1); } /** Creates a normal (non-blob) file for testing. */ diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java index 27a1ed56170a..d6d87542315f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java @@ -42,7 +42,7 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm byte[] header = new byte[5]; IOUtils.readFully(in, header); byte version = header[4]; - if (version != 1) { + if (version != 1 && version != 2) { throw new IOException("Unsupported version: " + version); } int indexLength = BytesUtils.getInt(header, 0); @@ -55,7 +55,7 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm long[] blobOffsets = new long[blobLengths.length]; long offset = 0; for (int i = 0; i < blobLengths.length; i++) { - if (blobLengths[i] == -1) { + if (blobLengths[i] < 0) { blobOffsets[i] = -1; } else { blobOffsets[i] = offset; @@ -86,7 +86,11 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm } public boolean isNull(int i) { - return blobLengths[i] == -1; + return blobLengths[i] == BlobFormatWriter.NULL_LENGTH; + } + + public boolean isPlaceHolder(int i) { + return blobLengths[i] == BlobFormatWriter.PLACE_HOLDER_LENGTH; } public long blobLength(int i) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java index 801964b86239..489e1d7c619b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java @@ -94,6 +94,8 @@ public InternalRow next() { Blob blob; if (fileMeta.isNull(currentPosition)) { blob = null; + } else if (fileMeta.isPlaceHolder(currentPosition)) { + blob = Blob.PLACE_HOLDER; } else { long offset = fileMeta.blobOffset(currentPosition) + 4; long length = fileMeta.blobLength(currentPosition) - 16; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java index 83d6c0eb915d..d032e1b043ea 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java @@ -43,9 +43,11 @@ /** {@link FormatWriter} for blob file. */ public class BlobFormatWriter implements FileAwareFormatWriter { - public static final byte VERSION = 1; + public static final byte VERSION = 2; public static final int MAGIC_NUMBER = 1481511375; public static final byte[] MAGIC_NUMBER_BYTES = intToLittleEndian(MAGIC_NUMBER); + public static final long NULL_LENGTH = -1L; + public static final long PLACE_HOLDER_LENGTH = -2L; private final PositionOutputStream out; @Nullable private final BlobConsumer writeConsumer; @@ -81,13 +83,17 @@ public boolean deleteFileUponAbort() { public void addElement(InternalRow element) throws IOException { checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only support one field."); if (element.isNullAt(0)) { - lengths.add(-1L); + lengths.add(NULL_LENGTH); if (writeConsumer != null) { writeConsumer.accept(blobFieldName, null); } return; } Blob blob = element.getBlob(0); + if (blob == Blob.PLACE_HOLDER) { + lengths.add(PLACE_HOLDER_LENGTH); + return; + } long previousPos = out.getPos(); crc32.reset(); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java index 0b66e15c2b88..028cc023d1dd 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DeltaVarintCompressor; import org.apache.paimon.utils.RoaringBitmap32; import org.junit.jupiter.api.BeforeEach; @@ -44,7 +45,10 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.zip.CRC32; +import static org.apache.paimon.utils.StreamUtils.intToLittleEndian; +import static org.apache.paimon.utils.StreamUtils.longToLittleEndian; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link BlobFileFormat}. */ @@ -73,22 +77,53 @@ public void testReadBlobInlineBytes() throws IOException { innerTest(false); } + @Test + public void testReadLegacyVersionOneBlobFile() throws IOException { + BlobFileFormat format = new BlobFileFormat(false); + RowType rowType = RowType.of(DataTypes.BLOB()); + List blobs = Arrays.asList("hello".getBytes(), null, "world".getBytes()); + + try (PositionOutputStream out = fileIO.newOutputStream(file, false)) { + writeLegacyVersionOneBlobFile(out, blobs); + } + + FormatReaderFactory readerFactory = format.createReaderFactory(null, rowType, null); + FormatReaderContext context = + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)); + List result = new ArrayList<>(); + readerFactory + .createReader(context) + .forEachRemaining( + row -> result.add(row.isNullAt(0) ? null : row.getBlob(0).toData())); + + assertThat(result).hasSize(blobs.size()); + assertThat((byte[]) result.get(0)).isEqualTo(blobs.get(0)); + assertThat(result.get(1)).isNull(); + assertThat((byte[]) result.get(2)).isEqualTo(blobs.get(2)); + } + private void innerTest(boolean blobAsDescriptor) throws IOException { BlobFileFormat format = new BlobFileFormat(blobAsDescriptor); RowType rowType = RowType.of(DataTypes.BLOB()); // write FormatWriterFactory writerFactory = format.createWriterFactory(rowType); - List blobs = - Arrays.asList("hello".getBytes(), null, "world".getBytes(), new byte[0]); + List blobs = + Arrays.asList( + "hello".getBytes(), + null, + Blob.PLACE_HOLDER, + "world".getBytes(), + new byte[0]); try (PositionOutputStream out = fileIO.newOutputStream(file, false)) { FormatWriter formatWriter = writerFactory.create(out, null); - for (byte[] bytes : blobs) { - if (bytes == null) { + for (Object blob : blobs) { + if (blob == null) { formatWriter.addElement(GenericRow.of((Object) null)); - continue; + } else if (blob == Blob.PLACE_HOLDER) { + formatWriter.addElement(GenericRow.of(Blob.PLACE_HOLDER)); } else { - formatWriter.addElement(GenericRow.of(new BlobData(bytes))); + formatWriter.addElement(GenericRow.of(new BlobData((byte[]) blob))); } } formatWriter.close(); @@ -98,7 +133,7 @@ private void innerTest(boolean blobAsDescriptor) throws IOException { FormatReaderFactory readerFactory = format.createReaderFactory(null, rowType, null); FormatReaderContext context = new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)); - List result = new ArrayList<>(); + List result = new ArrayList<>(); readerFactory .createReader(context) .forEachRemaining( @@ -107,7 +142,10 @@ private void innerTest(boolean blobAsDescriptor) throws IOException { result.add(null); } else { Blob blob = row.getBlob(0); - if (blobAsDescriptor) { + if (blob == Blob.PLACE_HOLDER) { + result.add(Blob.PLACE_HOLDER); + return; + } else if (blobAsDescriptor) { assertThat(blob).isInstanceOf(BlobRef.class); } else { assertThat(blob).isInstanceOf(BlobData.class); @@ -117,19 +155,59 @@ private void innerTest(boolean blobAsDescriptor) throws IOException { }); // assert - assertThat(result).containsExactlyElementsOf(blobs); + assertThat(result).hasSize(blobs.size()); + assertThat((byte[]) result.get(0)).isEqualTo((byte[]) blobs.get(0)); + assertThat(result.get(1)).isNull(); + assertThat(result.get(2)).isSameAs(Blob.PLACE_HOLDER); + assertThat((byte[]) result.get(3)).isEqualTo((byte[]) blobs.get(3)); + assertThat((byte[]) result.get(4)).isEqualTo((byte[]) blobs.get(4)); // read with selection RoaringBitmap32 selection = new RoaringBitmap32(); selection.add(2); context = new FormatReaderContext(fileIO, file, fileIO.getFileSize(file), selection); result.clear(); - readerFactory - .createReader(context) - .forEachRemaining(row -> result.add(row.getBlob(0).toData())); + readerFactory.createReader(context).forEachRemaining(row -> result.add(row.getBlob(0))); // assert - assertThat(result).containsOnly(blobs.get(2)); + assertThat(result).hasSize(1); + assertThat(result.get(0)).isSameAs(Blob.PLACE_HOLDER); + } + + private void writeLegacyVersionOneBlobFile(PositionOutputStream out, List blobs) + throws IOException { + CRC32 crc32 = new CRC32(); + long[] lengths = new long[blobs.size()]; + for (int i = 0; i < blobs.size(); i++) { + byte[] blob = blobs.get(i); + if (blob == null) { + lengths[i] = BlobFormatWriter.NULL_LENGTH; + continue; + } + + long previousPos = out.getPos(); + crc32.reset(); + + crc32.update( + BlobFormatWriter.MAGIC_NUMBER_BYTES, + 0, + BlobFormatWriter.MAGIC_NUMBER_BYTES.length); + out.write(BlobFormatWriter.MAGIC_NUMBER_BYTES); + crc32.update(blob, 0, blob.length); + out.write(blob); + + long binLength = out.getPos() - previousPos + 12; + lengths[i] = binLength; + byte[] lengthBytes = longToLittleEndian(binLength); + crc32.update(lengthBytes, 0, lengthBytes.length); + out.write(lengthBytes); + out.write(intToLittleEndian((int) crc32.getValue())); + } + + byte[] indexBytes = DeltaVarintCompressor.compress(lengths); + out.write(indexBytes); + out.write(intToLittleEndian(indexBytes.length)); + out.write(1); } @Test From a14fe7a7c5188fc4d0fe0676f44b6fe193bd712b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Tue, 19 May 2026 12:12:54 +0800 Subject: [PATCH 2/6] minor optim --- .../operation/BlobFallbackRecordReader.java | 129 +++--- .../operation/DataEvolutionSplitRead.java | 32 +- .../BlobFallbackRecordReaderTest.java | 390 ++++++++++++++++++ .../operation/DataEvolutionReadTest.java | 123 +----- 4 files changed, 486 insertions(+), 188 deletions(-) create mode 100644 paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java index 246eced12920..a1e6542baf13 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java @@ -24,8 +24,6 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.reader.RecordReader.RecordIterator; -import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Range; @@ -94,8 +92,7 @@ public class BlobFallbackRecordReader implements RecordReader { readRowType, blobIndex, firstRowId, - lastRowId, - entry.getKey()))); + lastRowId))); } } @@ -175,7 +172,7 @@ public void close() throws IOException { * Reads one blob sequence group (all blob files with the same max_seq_num) and emits * placeholder rows for row id gaps. For example, if the full row range is [0, 100], but there's * only one blob file with row range [20, 80], then the rows with row id [0, 19] and [81, 100] - * will be emitted as placeholder rows (and also maybe enriched with special fields). + * will be emitted as placeholder rows. */ public static class BlobSequenceGroupRecordReader implements RecordReader { @@ -186,13 +183,16 @@ public static class BlobSequenceGroupRecordReader implements RecordReader currentReader; private DataFileMeta currentFile; - private int fileIndex; + private int nextFileIndex; + private int nextRowRangeIndex; + // expected next row id private long nextRowId; + private InternalRow placeholderRow; + BlobSequenceGroupRecordReader( List files, BlobFileReaderFactory readerFactory, @@ -200,105 +200,120 @@ public static class BlobSequenceGroupRecordReader implements RecordReader readBatch() throws IOException { while (true) { - nextRowId = nextSelectedRowId(nextRowId); - if (nextRowId > lastRowId) { - return null; - } - if (currentReader != null) { RecordIterator batch = currentReader.readBatch(); if (batch != null) { return batch; } - long afterFile = lastRowId(currentFile) + 1; + // row ranges have been pushed to readers + // directly set nextRowId as the lastRowId + 1 + setNextRowId(lastRowId(currentFile) + 1); closeCurrentFileReader(); - nextRowId = afterFile; continue; } - while (fileIndex < files.size() && lastRowId(files.get(fileIndex)) < nextRowId) { - fileIndex++; + if (nextRowId > lastRowId) { + return null; + } + + // skip files whose ranges are before nextRowId + while (nextFileIndex < files.size() + && lastRowId(files.get(nextFileIndex)) < nextRowId) { + nextFileIndex++; } - if (fileIndex >= files.size()) { + if (nextFileIndex >= files.size()) { return placeHolderBatch(lastRowId); } - DataFileMeta nextFile = files.get(fileIndex); + DataFileMeta nextFile = files.get(nextFileIndex); if (nextFile.nonNullFirstRowId() > nextRowId) { return placeHolderBatch(nextFile.nonNullFirstRowId() - 1); } - currentFile = nextFile; - currentReader = readerFactory.create(nextFile); - fileIndex++; + createReader(nextFile); } } - private long nextSelectedRowId(long rowId) { - if (rowRanges == null) { - return rowId; + /** + * Set nextRowId and try to move to the next selected row id. So the final nextRowId may be + * greater than the input value. + */ + private void setNextRowId(long nextRowId) { + this.nextRowId = nextRowId; + tryMoveToSelectedRow(); + } + + private void tryMoveToSelectedRow() { + if (nextRowId > lastRowId || rowRanges == null) { + return; } - for (Range range : rowRanges) { - if (rowId < range.from) { - return range.from; - } - if (rowId <= range.to) { - return rowId; + + while (nextRowRangeIndex < rowRanges.size()) { + Range range = rowRanges.get(nextRowRangeIndex); + if (nextRowId >= range.from && nextRowId <= range.to) { + // if nextRowId is within the range, do not need to move + return; + } else if (nextRowId < range.from) { + // else if nextRowId < next range, move to next range's `from` + nextRowId = range.from; + return; } + // else nextRowId > range.to, try next range + nextRowRangeIndex++; } - return lastRowId + 1; + + // all ranges consumed, no need to read + nextRowId = lastRowId + 1; } private RecordIterator placeHolderBatch(long endRowId) { - long startRowId = nextRowId; - nextRowId = endRowId + 1; return new RecordIterator() { - - long rowId = startRowId; + long rowId; @Nullable @Override public InternalRow next() { - rowId = nextSelectedRowId(rowId); + rowId = nextRowId; if (rowId > endRowId) { return null; } - return placeHolderRow(rowId++); + setNextRowId(rowId + 1); + return placeHolderRow(); } @Override - public void releaseBatch() {} + public void releaseBatch() { + // nothing to release + } }; } - private GenericRow placeHolderRow(long rowId) { - GenericRow row = new GenericRow(readRowType.getFieldCount()); - row.setField(blobIndex, Blob.PLACE_HOLDER); - for (int i = 0; i < readRowType.getFieldCount(); i++) { - String fieldName = readRowType.getFieldNames().get(i); - if (SpecialFields.ROW_ID.name().equals(fieldName)) { - row.setField(i, rowId); - } else if (SpecialFields.SEQUENCE_NUMBER.name().equals(fieldName)) { - row.setField(i, sequenceNumber); - } + private InternalRow placeHolderRow() { + if (placeholderRow == null) { + GenericRow row = new GenericRow(readRowType.getFieldCount()); + row.setField(blobIndex, Blob.PLACE_HOLDER); + placeholderRow = row; } - return row; + return placeholderRow; } private long lastRowId(DataFileMeta file) { @@ -313,14 +328,20 @@ private void closeCurrentFileReader() throws IOException { currentFile = null; } + private void createReader(DataFileMeta nextFile) throws IOException { + currentFile = nextFile; + currentReader = readerFactory.create(nextFile); + nextFileIndex++; + } + @Override public void close() throws IOException { closeCurrentFileReader(); } } + /** Factory to create readers. */ interface BlobFileReaderFactory { - RecordReader create(DataFileMeta file) throws IOException; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index 261e3e7f8a12..0d2cd1015662 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -230,7 +230,8 @@ private DataEvolutionFileReader createUnionReader( || isVectorStoreFile(file.fileName()), "Only blob/vector-store files need to call this method."); return schemaFetcher.apply(file.schemaId()).logicalRowType(); - }); + }, + rowRanges != null); long rowCount = fieldsFiles.get(0).rowCount(); long firstRowId = fieldsFiles.get(0).files().get(0).nonNullFirstRowId(); @@ -480,7 +481,8 @@ public static List splitFieldBunches( int fieldId = rowType.getField(file.writeCols().get(0)).id(); final long expectedRowCount = rowCount; blobBunchMap - .computeIfAbsent(fieldId, key -> new BlobFileBunch(expectedRowCount)) + .computeIfAbsent( + fieldId, key -> new BlobFileBunch(expectedRowCount, rowIdPushDown)) .add(file); } else if (isVectorStoreFile(file.fileName())) { RowType rowType = fileToRowType.apply(file); @@ -542,11 +544,13 @@ static class BlobFileBunch implements FieldBunch { final List files; final List ranges; final long expectedRowCount; + final boolean rowIdPushdown; - BlobFileBunch(long expectedRowCount) { + BlobFileBunch(long expectedRowCount, boolean rowIdPushdown) { this.files = new ArrayList<>(); this.expectedRowCount = expectedRowCount; this.ranges = new ArrayList<>(); + this.rowIdPushdown = rowIdPushdown; } void add(DataFileMeta file) { @@ -566,16 +570,20 @@ void add(DataFileMeta file) { @Override public long rowCount() { List merged = Range.sortAndMergeOverlap(ranges, true); - Preconditions.checkState( - merged.size() == 1, - "Blob file bunch should always contain a contiguous row range."); - - long rowCount = merged.get(0).count(); - Preconditions.checkState( - rowCount == expectedRowCount, - "The merged rowCount of blob file bunch should be aligned with normal files."); + if (!rowIdPushdown) { + Preconditions.checkState( + merged.size() == 1, + "Blob file bunch should always contain a contiguous row range."); + + long rowCount = merged.get(0).count(); + Preconditions.checkState( + rowCount == expectedRowCount, + "The merged rowCount %s of blob file bunch should be aligned with normal files %s.", + rowCount, + expectedRowCount); + } - return rowCount; + return expectedRowCount; } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java new file mode 100644 index 000000000000..47e2ac80415b --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.operation.BlobFallbackRecordReader.BlobSequenceGroupRecordReader; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReader.RecordIterator; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link BlobFallbackRecordReader}. */ +public class BlobFallbackRecordReaderTest { + + private static final int BLOB_INDEX = 0; + private static final String BLOB_FIELD = "blob_col"; + private static final RowType READ_ROW_TYPE = + new RowType( + Arrays.asList( + new DataField(BLOB_INDEX, BLOB_FIELD, DataTypes.BLOB()), + new DataField(1, SpecialFields.ROW_ID.name(), DataTypes.BIGINT()), + new DataField( + 2, SpecialFields.SEQUENCE_NUMBER.name(), DataTypes.BIGINT()))); + + @Test + public void testBlobSequenceGroupReaderWithRowRanges() throws Exception { + List files = + Arrays.asList(blobFile("blob1", 0, 3, 10), blobFile("blob2", 5, 2, 10)); + List rowRanges = ranges(1, 1, 3, 5); + + ReadResult rows = readSequenceGroup(files, rowRanges, 0, 6, 10); + + assertThat(rows.rowIds).containsExactly(1L, 5L); + assertThat(rows.placeholderRowCount).isEqualTo(2); + assertThat(rows.batchSizes).containsExactly(1, 2, 1); + } + + @Test + public void testBlobSequenceGroupReaderWithMultipleRangesInFileAndGap() throws Exception { + DataFileMeta wideFile = blobFile("wide-file", 20, 31, 10); + List wideFileRanges = ranges(9, 10, 19, 20, 25, 30, 35, 40, 43, 45, 48, 52, 55, 56); + + ReadResult wideFileRows = + readSequenceGroup(Collections.singletonList(wideFile), wideFileRanges, 10, 60, 10); + + List wideFileActualRowIds = rowIdsInRanges(20, 50, wideFileRanges); + assertThat(wideFileRows.rowIds).containsExactlyElementsOf(wideFileActualRowIds); + assertThat(wideFileRows.placeholderRowCount) + .isEqualTo( + rowIdsInRanges(10, 19, wideFileRanges).size() + + rowIdsInRanges(51, 60, wideFileRanges).size()); + assertThat(wideFileRows.batchSizes) + .containsExactlyElementsOf(batchSizes(2, wideFileActualRowIds.size(), 1, 4)); + + DataFileMeta firstFile = blobFile("first-file", 0, 11, 10); + DataFileMeta secondFile = blobFile("second-file", 50, 11, 10); + List gapRanges = ranges(0, 0, 9, 12, 25, 30, 35, 40, 43, 45, 48, 52, 58, 62); + + ReadResult gapRows = + readSequenceGroup(Arrays.asList(firstFile, secondFile), gapRanges, 0, 62, 10); + + assertThat(gapRows.rowIds) + .containsExactlyElementsOf( + concat( + rowIdsInRanges(0, 10, gapRanges), + rowIdsInRanges(50, 60, gapRanges))); + assertThat(gapRows.placeholderRowCount) + .isEqualTo( + rowIdsInRanges(11, 49, gapRanges).size() + + rowIdsInRanges(61, 62, gapRanges).size()); + assertThat(gapRows.batchSizes).containsExactly(1, 1, 1, 19, 1, 1, 1, 1, 1, 1, 2); + } + + @Test + public void testBlobFallbackRecordReader() throws Exception { + DataFileMeta newFile = blobFile("new-file", 0, 3, 2); + DataFileMeta oldFile = blobFile("old-file", 0, 5, 1); + + ReadResult rows = + readFallback(Arrays.asList(newFile, oldFile), 5, null, placeholderRows(newFile, 1)); + + assertThat(rows.rowIds).containsExactly(0L, 1L, 2L, 3L, 4L); + assertThat(rows.sequenceNumbers).containsExactly(2L, 1L, 2L, 1L, 1L); + } + + @Test + public void testBlobFallbackRecordReaderThrowsIfAllRowsArePlaceholders() { + DataFileMeta newFile = blobFile("new-placeholder-file", 0, 1, 2); + DataFileMeta oldFile = blobFile("old-placeholder-file", 0, 1, 1); + + assertThatThrownBy( + () -> + readFallback( + Arrays.asList(newFile, oldFile), + 1, + null, + placeholderRows(newFile, 0, oldFile, 0))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("all blob files at the same row id store a placeholder"); + } + + @Test + public void testBlobFallbackRecordReaderWithRowRanges() throws Exception { + DataFileMeta oldFile = blobFile("old-file", 20, 26, 1); + DataFileMeta newFile1 = blobFile("new-file-1", 20, 11, 2); + DataFileMeta newFile2 = blobFile("new-file-2", 40, 6, 2); + List rowRanges = ranges(15, 20, 25, 26, 29, 33, 35, 41); + + ReadResult rows = + readFallback( + Arrays.asList(newFile2, oldFile, newFile1), + 26, + rowRanges, + Collections.emptySet()); + + assertThat(rows.rowIds) + .containsExactly( + 20L, 25L, 26L, 29L, 30L, 31L, 32L, 33L, 35L, 36L, 37L, 38L, 39L, 40L, 41L); + assertThat(rows.sequenceNumbers) + .containsExactly(2L, 2L, 2L, 2L, 2L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 2L, 2L); + } + + private static ReadResult readFallback( + List files, + long rowCount, + List rowRanges, + Set placeholderRows) + throws Exception { + return ReadResult.read( + new BlobFallbackRecordReader( + files, + rowCount, + file -> oneRowPerBatchReader(fileRows(file, rowRanges, placeholderRows)), + rowRanges, + READ_ROW_TYPE, + BLOB_INDEX)); + } + + private static ReadResult readSequenceGroup( + List files, + List rowRanges, + long firstRowId, + long lastRowId, + long sequenceNumber) + throws Exception { + return ReadResult.read( + new BlobSequenceGroupRecordReader( + files, + file -> oneRowPerBatchReader(fileRows(file, rowRanges)), + rowRanges, + READ_ROW_TYPE, + BLOB_INDEX, + firstRowId, + lastRowId)); + } + + private static DataFileMeta blobFile( + String fileName, long firstRowId, long rowCount, long maxSequenceNumber) { + return DataFileMeta.create( + fileName + ".blob", + rowCount, + rowCount, + DataFileMeta.EMPTY_MIN_KEY, + DataFileMeta.EMPTY_MAX_KEY, + SimpleStats.EMPTY_STATS, + SimpleStats.EMPTY_STATS, + 0, + maxSequenceNumber, + 0L, + DataFileMeta.DUMMY_LEVEL, + Collections.emptyList(), + Timestamp.fromEpochMillis(System.currentTimeMillis()), + rowCount, + null, + FileSource.APPEND, + null, + null, + firstRowId, + Arrays.asList(BLOB_FIELD)); + } + + private static List ranges(long... bounds) { + if (bounds.length % 2 != 0) { + throw new IllegalArgumentException("Range bounds should be paired."); + } + + List ranges = new ArrayList<>(); + for (int i = 0; i < bounds.length; i += 2) { + ranges.add(new Range(bounds[i], bounds[i + 1])); + } + return ranges; + } + + private static List fileRows(DataFileMeta file, List rowRanges) { + return fileRows(file, rowRanges, Collections.emptySet()); + } + + private static List fileRows( + DataFileMeta file, List rowRanges, Set placeholderRows) { + List rows = new ArrayList<>(); + long lastRowId = file.nonNullFirstRowId() + file.rowCount() - 1; + for (long rowId = file.nonNullFirstRowId(); rowId <= lastRowId; rowId++) { + if (selected(rowId, rowRanges)) { + rows.add( + blobRow( + rowId, + file.maxSequenceNumber(), + placeholderRows.contains(rowKey(file, rowId)))); + } + } + return rows; + } + + private static boolean selected(long rowId, List rowRanges) { + if (rowRanges == null) { + return true; + } + for (Range range : rowRanges) { + if (rowId < range.from) { + return false; + } + if (rowId <= range.to) { + return true; + } + } + return false; + } + + private static List rowIdsInRanges(long firstRowId, long lastRowId, List ranges) { + List rowIds = new ArrayList<>(); + for (long rowId = firstRowId; rowId <= lastRowId; rowId++) { + if (selected(rowId, ranges)) { + rowIds.add(rowId); + } + } + return rowIds; + } + + private static List concat(List first, List second) { + List all = new ArrayList<>(first); + all.addAll(second); + return all; + } + + private static List batchSizes( + int firstBatchSize, int repeatedBatchCount, int repeatedBatchSize, int lastBatchSize) { + List batchSizes = new ArrayList<>(); + batchSizes.add(firstBatchSize); + for (int i = 0; i < repeatedBatchCount; i++) { + batchSizes.add(repeatedBatchSize); + } + batchSizes.add(lastBatchSize); + return batchSizes; + } + + private static Set placeholderRows(DataFileMeta file, long rowId) { + Set keys = new HashSet<>(); + keys.add(rowKey(file, rowId)); + return keys; + } + + private static Set placeholderRows( + DataFileMeta firstFile, long firstRowId, DataFileMeta secondFile, long secondRowId) { + Set keys = placeholderRows(firstFile, firstRowId); + keys.add(rowKey(secondFile, secondRowId)); + return keys; + } + + private static String rowKey(DataFileMeta file, long rowId) { + return file.fileName() + "#" + rowId; + } + + private static InternalRow blobRow(long rowId, long sequenceNumber, boolean placeholder) { + GenericRow row = new GenericRow(3); + row.setField( + BLOB_INDEX, + placeholder ? Blob.PLACE_HOLDER : new BlobData(new byte[] {(byte) rowId})); + row.setField(1, rowId); + row.setField(2, sequenceNumber); + return row; + } + + private static RecordReader oneRowPerBatchReader(List rows) { + return new RecordReader() { + + int index; + + @Override + public RecordIterator readBatch() { + if (index >= rows.size()) { + return null; + } + InternalRow row = rows.get(index++); + return new RecordIterator() { + + boolean returned; + + @Override + public InternalRow next() { + if (returned) { + return null; + } + returned = true; + return row; + } + + @Override + public void releaseBatch() {} + }; + } + + @Override + public void close() {} + }; + } + + private static class ReadResult { + final List rowIds = new ArrayList<>(); + final List sequenceNumbers = new ArrayList<>(); + final List batchSizes = new ArrayList<>(); + int placeholderRowCount; + + static ReadResult read(RecordReader reader) throws Exception { + try { + ReadResult result = new ReadResult(); + RecordIterator batch; + while ((batch = reader.readBatch()) != null) { + int batchSize = 0; + InternalRow row; + while ((row = batch.next()) != null) { + result.add(row); + batchSize++; + } + batch.releaseBatch(); + result.batchSizes.add(batchSize); + } + return result; + } finally { + reader.close(); + } + } + + private void add(InternalRow row) { + if (row.getBlob(BLOB_INDEX) == Blob.PLACE_HOLDER) { + placeholderRowCount++; + } else { + rowIds.add(row.getLong(1)); + sequenceNumbers.add(row.getLong(2)); + } + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java index fb5cc101274b..e240b7fb422c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java @@ -18,25 +18,16 @@ package org.apache.paimon.operation; -import org.apache.paimon.data.Blob; -import org.apache.paimon.data.BlobData; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileSource; -import org.apache.paimon.operation.BlobFallbackRecordReader.BlobSequenceGroupRecordReader; import org.apache.paimon.operation.DataEvolutionSplitRead.BlobFileBunch; import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch; import org.apache.paimon.operation.DataEvolutionSplitRead.VectorFileBunch; -import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.reader.RecordReader.RecordIterator; import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.Range; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -442,71 +433,6 @@ private DataFileMeta createFile( writeCols); } - private static List fileRows(DataFileMeta file, List rowRanges) { - List rows = new ArrayList<>(); - long lastRowId = file.nonNullFirstRowId() + file.rowCount() - 1; - for (long rowId = file.nonNullFirstRowId(); rowId <= lastRowId; rowId++) { - if (selected(rowId, rowRanges)) { - rows.add(blobRow(rowId, file.maxSequenceNumber(), false)); - } - } - return rows; - } - - private static boolean selected(long rowId, List rowRanges) { - for (Range range : rowRanges) { - if (rowId < range.from) { - return false; - } - if (rowId <= range.to) { - return true; - } - } - return false; - } - - private static InternalRow blobRow(long rowId, long sequenceNumber, boolean placeholder) { - GenericRow row = new GenericRow(3); - row.setField(0, placeholder ? Blob.PLACE_HOLDER : new BlobData(new byte[] {(byte) rowId})); - row.setField(1, rowId); - row.setField(2, sequenceNumber); - return row; - } - - private static RecordReader oneRowPerBatchReader(List rows) { - return new RecordReader() { - - int index; - - @Override - public RecordIterator readBatch() { - if (index >= rows.size()) { - return null; - } - InternalRow row = rows.get(index++); - return new RecordIterator() { - - boolean returned; - - @Override - public InternalRow next() { - if (returned) { - return null; - } - returned = true; - return row; - } - - @Override - public void releaseBatch() {} - }; - } - - @Override - public void close() {} - }; - } - @Test void testAddVectorFilesWithDifferentSchemaId() { DataFileMeta vectorEntry1 = createVectorFileWithSchema("vector1", 0, 100, 1, 0L); @@ -520,7 +446,7 @@ void testAddVectorFilesWithDifferentSchemaId() { @Test void testAddBlobFilesWithDifferentSchemaId() { - BlobFileBunch blobBunch = new BlobFileBunch(300); + BlobFileBunch blobBunch = new BlobFileBunch(300, false); DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1, 0L); DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200, 1, 1L); @@ -555,53 +481,6 @@ public void testRowIdPushDown() { assertThatCode(() -> finalVectorBunch2.add(vectorEntry3)).doesNotThrowAnyException(); } - @Test - public void testBlobSequenceGroupReaderWithRowRanges() throws Exception { - List files = - Arrays.asList(createBlobFile("blob1", 0, 3, 10), createBlobFile("blob2", 5, 2, 10)); - List rowRanges = Arrays.asList(new Range(1, 1), new Range(3, 5)); - RowType readRowType = - new RowType( - Arrays.asList( - new DataField(0, "blob_col", DataTypes.BLOB()), - new DataField(1, SpecialFields.ROW_ID.name(), DataTypes.BIGINT()), - new DataField( - 2, - SpecialFields.SEQUENCE_NUMBER.name(), - DataTypes.BIGINT()))); - - BlobSequenceGroupRecordReader reader = - new BlobSequenceGroupRecordReader( - files, - file -> oneRowPerBatchReader(fileRows(file, rowRanges)), - rowRanges, - readRowType, - 0, - 0, - 6, - 10); - - List rowIds = new ArrayList<>(); - List placeholders = new ArrayList<>(); - List batchSizes = new ArrayList<>(); - RecordIterator batch; - while ((batch = reader.readBatch()) != null) { - int batchSize = 0; - InternalRow row; - while ((row = batch.next()) != null) { - rowIds.add(row.getLong(1)); - placeholders.add(row.getBlob(0) == Blob.PLACE_HOLDER); - batchSize++; - } - batch.releaseBatch(); - batchSizes.add(batchSize); - } - - assertThat(rowIds).containsExactly(1L, 3L, 4L, 5L); - assertThat(placeholders).containsExactly(false, true, true, false); - assertThat(batchSizes).containsExactly(1, 2, 1); - } - /** Creates a normal (non-blob) file for testing. */ private DataFileMeta createNormalFile( String fileName, From 064d136e40b4ff9dd093d544d4a76f1c411dd8e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Tue, 19 May 2026 14:18:47 +0800 Subject: [PATCH 3/6] add some robustness check --- .../operation/BlobFallbackRecordReader.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java index a1e6542baf13..fd54bb6881b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java @@ -25,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.Range; import javax.annotation.Nullable; @@ -83,6 +84,19 @@ public class BlobFallbackRecordReader implements RecordReader { // within each group, sort by first row id List groupFiles = entry.getValue(); groupFiles.sort(comparingLong(DataFileMeta::nonNullFirstRowId)); + + DataFileMeta current, next; + for (int i = 0; i < groupFiles.size() - 1; i++) { + current = groupFiles.get(i); + next = groupFiles.get(i + 1); + + Preconditions.checkState( + !current.nonNullRowIdRange().hasIntersection(next.nonNullRowIdRange()), + "Blob files within a same max_seq_num should not overlap. Find: %s, %s", + current, + next); + } + groupReaders.add( new ForceSingleBatchReader( new BlobSequenceGroupRecordReader( @@ -104,6 +118,7 @@ public RecordIterator readBatch() throws IOException { } returned = true; + // all readers are forced returning single batch RecordIterator[] iterators = new RecordIterator[groupReaders.size()]; for (int i = 0; i < groupReaders.size(); i++) { RecordIterator iterator = groupReaders.get(i).readBatch(); @@ -173,6 +188,8 @@ public void close() throws IOException { * placeholder rows for row id gaps. For example, if the full row range is [0, 100], but there's * only one blob file with row range [20, 80], then the rows with row id [0, 19] and [81, 100] * will be emitted as placeholder rows. + * + *

This reader should always be fully consumed, or the internal states may be broken. */ public static class BlobSequenceGroupRecordReader implements RecordReader { From 2722c239b3b5ef3b63458dd78a6b47101416f439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Tue, 19 May 2026 14:57:05 +0800 Subject: [PATCH 4/6] fix style --- .../org/apache/paimon/operation/BlobFallbackRecordReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java index fd54bb6881b6..e497dea6c069 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java @@ -189,7 +189,7 @@ public void close() throws IOException { * only one blob file with row range [20, 80], then the rows with row id [0, 19] and [81, 100] * will be emitted as placeholder rows. * - *

This reader should always be fully consumed, or the internal states may be broken. + *

This reader should always be fully consumed, or the internal states may be broken. */ public static class BlobSequenceGroupRecordReader implements RecordReader { From 7316698c57e67f9af681b9adf69620df878e29ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Tue, 19 May 2026 16:30:17 +0800 Subject: [PATCH 5/6] add python module --- .../read/reader/format_blob_reader.py | 20 ++++- paimon-python/pypaimon/table/row/blob.py | 15 ++++ paimon-python/pypaimon/tests/blob_test.py | 75 ++++++++++++++++++- .../pypaimon/write/blob_format_writer.py | 12 ++- 4 files changed, 114 insertions(+), 8 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py b/paimon-python/pypaimon/read/reader/format_blob_reader.py index e7e65394aa6d..8dd4e2fbd6bf 100644 --- a/paimon-python/pypaimon/read/reader/format_blob_reader.py +++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py @@ -32,6 +32,8 @@ class FormatBlobReader(RecordBatchReader): + NULL_LENGTH = -1 + PLACE_HOLDER_LENGTH = -2 def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool, @@ -97,6 +99,10 @@ def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch for field_name in self._fields: if blob is None: pydict_data[field_name].append(None) + elif blob is Blob.PLACE_HOLDER: + raise RuntimeError( + "Blob placeholder is not supported by FormatBlobReader yet." + ) elif self._blob_as_descriptor: pydict_data[field_name].append(blob.to_descriptor().serialize()) else: @@ -148,7 +154,7 @@ def _read_index(self) -> None: index_length = struct.unpack(' None: blob_offsets = [] offset = 0 for length in blob_lengths: - if length == -1: + if length < 0: blob_offsets.append(-1) else: blob_offsets.append(offset) @@ -175,6 +181,8 @@ def _read_index(self) -> None: class BlobRecordIterator: MAGIC_NUMBER_SIZE = 4 METADATA_OVERHEAD = 16 + NULL_LENGTH = -1 + PLACE_HOLDER_LENGTH = -2 def __init__(self, file_io: FileIO, file_path: str, blob_lengths: List[int], blob_offsets: List[int], field_name: str): @@ -192,13 +200,17 @@ def __next__(self) -> GenericRow: if self.current_position >= len(self.blob_lengths): raise StopIteration fields = [DataField(0, self.field_name, AtomicType("BLOB"))] - if self.blob_lengths[self.current_position] == -1: + length = self.blob_lengths[self.current_position] + if length == self.NULL_LENGTH: self.current_position += 1 return GenericRow([None], fields, RowKind.INSERT) + if length == self.PLACE_HOLDER_LENGTH: + self.current_position += 1 + return GenericRow([Blob.PLACE_HOLDER], fields, RowKind.INSERT) # Create blob reference for the current blob # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number - blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD + blob_length = length - self.METADATA_OVERHEAD blob = Blob.from_file(self.file_io, self.file_path, blob_offset, blob_length) self.current_position += 1 return GenericRow([blob], fields, RowKind.INSERT) diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index b619b6a76aec..73974d73e8e4 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -277,6 +277,21 @@ def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob' return BlobRef(uri_reader, descriptor) +class _PlaceholderBlob(Blob): + + def to_data(self) -> bytes: + raise RuntimeError("Should never call this method for placeholder blob.") + + def to_descriptor(self) -> BlobDescriptor: + raise RuntimeError("Should never call this method for placeholder blob.") + + def new_input_stream(self) -> BinaryIO: + raise RuntimeError("Should never call this method for placeholder blob.") + + +Blob.PLACE_HOLDER = _PlaceholderBlob() + + class BlobData(Blob): def __init__(self, data: Optional[Union[bytes, bytearray]] = None): diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index d9cf64210d61..1883e9dbbb3c 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -29,7 +29,7 @@ from pypaimon.common.file_io import FileIO from pypaimon.filesystem.local_file_io import LocalFileIO from pypaimon.common.options import Options -from pypaimon.read.reader.format_blob_reader import FormatBlobReader +from pypaimon.read.reader.format_blob_reader import BlobRecordIterator, FormatBlobReader from pypaimon.schema.data_types import AtomicType, DataField from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor from pypaimon.table.row.generic_row import GenericRowDeserializer, GenericRowSerializer, GenericRow @@ -1264,6 +1264,79 @@ def test_null_blob_read_as_descriptor(self): self.assertEqual(desc2.uri, blob_file_path) reader.close() + def test_placeholder_blob_write_read(self): + from pypaimon.write.blob_format_writer import BlobFormatWriter + + file_io = LocalFileIO(self.temp_dir, Options({})) + blob_file_path = os.path.join(self.temp_dir, "placeholder_blob.blob") + + output = open(blob_file_path, 'wb') + writer = BlobFormatWriter(output) + fields = [DataField(0, "blob_field", AtomicType("BLOB"))] + writer.add_element(GenericRow([BlobData(b"hello")], fields, RowKind.INSERT)) + writer.add_element(GenericRow([Blob.PLACE_HOLDER], fields, RowKind.INSERT)) + writer.add_element(GenericRow([None], fields, RowKind.INSERT)) + writer.add_element(GenericRow([BlobData(b"world")], fields, RowKind.INSERT)) + self.assertEqual( + writer.lengths[1:3], + [BlobFormatWriter.PLACE_HOLDER_LENGTH, BlobFormatWriter.NULL_LENGTH]) + writer.close() + + with open(blob_file_path, 'rb') as blob_file: + blob_file.seek(-1, os.SEEK_END) + self.assertEqual(blob_file.read(1), struct.pack(' None: blob_value = row.values[0] if blob_value is None: - self.lengths.append(-1) + self.lengths.append(self.NULL_LENGTH) return if not isinstance(blob_value, Blob): raise ValueError("Field must be Blob/BlobData instance") + if blob_value is Blob.PLACE_HOLDER: + self.lengths.append(self.PLACE_HOLDER_LENGTH) + return + previous_pos = self.position crc32 = 0 # Initialize CRC32 @@ -97,7 +103,7 @@ def write_value(self, col_data, fields, uri_reader_factory=None) -> None: if col_data is None: if not is_blob: raise RuntimeError("Null values are only supported for BLOB type fields") - self.lengths.append(-1) + self.lengths.append(self.NULL_LENGTH) return if is_blob: From e804bb22e39119fd58b9938e1a5300c4812355ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 20 May 2026 11:13:20 +0800 Subject: [PATCH 6/6] minor fix --- .../org/apache/paimon/operation/BlobFallbackRecordReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java index e497dea6c069..d8ff74d013aa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java @@ -72,7 +72,7 @@ public class BlobFallbackRecordReader implements RecordReader { files.stream().mapToLong(DataFileMeta::nonNullFirstRowId).min().getAsLong(); long lastRowId = firstRowId + rowCount - 1; - // sort descendent group readers in descending order + // sort group readers in descending order Map> sequenceGroups = new TreeMap<>(reverseOrder()); for (DataFileMeta file : files) { sequenceGroups