diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java index f11a056efbcf..c3ba17513bf0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java @@ -38,6 +38,31 @@ @Public public interface Blob { + /** + * The placeholder blob, mainly for blob update in data-evolution. It should never be exposed to + * users. + */ + Blob PLACE_HOLDER = + new Blob() { + @Override + public byte[] toData() { + throw new UnsupportedOperationException( + "Should never call this method for placeholder blob."); + } + + @Override + public BlobDescriptor toDescriptor() { + throw new UnsupportedOperationException( + "Should never call this method for placeholder blob."); + } + + @Override + public SeekableInputStream newInputStream() throws IOException { + throw new UnsupportedOperationException( + "Should never call this method for placeholder blob."); + } + }; + byte[] toData(); BlobDescriptor toDescriptor(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java new file mode 100644 index 000000000000..d8ff74d013aa --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.append.ForceSingleBatchReader; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.Range; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static java.util.Collections.reverseOrder; +import static java.util.Comparator.comparingLong; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Resolves blob placeholder rows by falling back through older sequence groups. The read logic is + * as below: + * + *
    + *
  1. Group files by max-seq, higher-seq files will have newer records. + *
  2. Sort files by range within each group, and create sequential readers for them. Note that + * absent ranges will be read as all-placeholder rows. + *
  3. Sort by max-seq and merge read all readers, records at each index will be the first + * non-placeholder blob. + *
+ */ +public class BlobFallbackRecordReader implements RecordReader { + + private final List> groupReaders = new ArrayList<>(); + private final int blobIndex; + private boolean returned; + + BlobFallbackRecordReader( + List files, + long rowCount, + BlobFileReaderFactory readerFactory, + List rowRanges, + RowType readRowType, + int blobIndex) { + this.blobIndex = blobIndex; + + checkArgument(!files.isEmpty(), "Blob bunch should not be empty."); + long firstRowId = + files.stream().mapToLong(DataFileMeta::nonNullFirstRowId).min().getAsLong(); + long lastRowId = firstRowId + rowCount - 1; + + // sort group readers in descending order + Map> sequenceGroups = new TreeMap<>(reverseOrder()); + for (DataFileMeta file : files) { + sequenceGroups + .computeIfAbsent(file.maxSequenceNumber(), ignored -> new ArrayList<>()) + .add(file); + } + + for (Map.Entry> entry : sequenceGroups.entrySet()) { + // within each group, sort by first row id + List groupFiles = entry.getValue(); + groupFiles.sort(comparingLong(DataFileMeta::nonNullFirstRowId)); + + DataFileMeta current, next; + for (int i = 0; i < groupFiles.size() - 1; i++) { + current = groupFiles.get(i); + next = groupFiles.get(i + 1); + + Preconditions.checkState( + !current.nonNullRowIdRange().hasIntersection(next.nonNullRowIdRange()), + "Blob files within a same max_seq_num should not overlap. Find: %s, %s", + current, + next); + } + + groupReaders.add( + new ForceSingleBatchReader( + new BlobSequenceGroupRecordReader( + groupFiles, + readerFactory, + rowRanges, + readRowType, + blobIndex, + firstRowId, + lastRowId))); + } + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + if (returned) { + return null; + } + returned = true; + + // all readers are forced returning single batch + RecordIterator[] iterators = new RecordIterator[groupReaders.size()]; + for (int i = 0; i < groupReaders.size(); i++) { + RecordIterator iterator = groupReaders.get(i).readBatch(); + if (iterator == null) { + return null; + } + iterators[i] = iterator; + } + + return new RecordIterator() { + @Nullable + @Override + public InternalRow next() throws IOException { + InternalRow result = null; + // we should always move each iterator forward + for (RecordIterator iterator : iterators) { + InternalRow row = iterator.next(); + if (row == null) { + return null; + } + // result is the first non-placeholder record + if (result == null && !isPlaceHolder(row)) { + result = row; + } + } + if (result == null) { + throw new IllegalStateException( + "Invalid state: all blob files at the same row id store a placeholder, it's a bug."); + } + return result; + } + + @Override + public void releaseBatch() { + for (RecordIterator iterator : iterators) { + iterator.releaseBatch(); + } + } + }; + } + + private boolean isPlaceHolder(InternalRow row) { + return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) == Blob.PLACE_HOLDER; + } + + @Override + public void close() throws IOException { + IOException exception = null; + for (RecordReader reader : groupReaders) { + try { + reader.close(); + } catch (IOException e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + } + } + if (exception != null) { + throw exception; + } + } + + /** + * Reads one blob sequence group (all blob files with the same max_seq_num) and emits + * placeholder rows for row id gaps. For example, if the full row range is [0, 100], but there's + * only one blob file with row range [20, 80], then the rows with row id [0, 19] and [81, 100] + * will be emitted as placeholder rows. + * + *

This reader should always be fully consumed, or the internal states may be broken. + */ + public static class BlobSequenceGroupRecordReader implements RecordReader { + + private final List files; + private final BlobFileReaderFactory readerFactory; + // pushed row ranges + private final List rowRanges; + private final RowType readRowType; + private final int blobIndex; + private final long lastRowId; + + private RecordReader currentReader; + private DataFileMeta currentFile; + private int nextFileIndex; + private int nextRowRangeIndex; + // expected next row id + private long nextRowId; + + private InternalRow placeholderRow; + + BlobSequenceGroupRecordReader( + List files, + BlobFileReaderFactory readerFactory, + List rowRanges, + RowType readRowType, + int blobIndex, + long firstRowId, + long lastRowId) { + this.files = files; + this.readerFactory = readerFactory; + this.rowRanges = rowRanges == null ? null : Range.sortAndMergeOverlap(rowRanges); + this.readRowType = readRowType; + this.blobIndex = blobIndex; + this.lastRowId = lastRowId; + + this.nextFileIndex = 0; + this.nextRowRangeIndex = 0; + setNextRowId(firstRowId); + + this.placeholderRow = null; + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + while (true) { + if (currentReader != null) { + RecordIterator batch = currentReader.readBatch(); + if (batch != null) { + return batch; + } + // row ranges have been pushed to readers + // directly set nextRowId as the lastRowId + 1 + setNextRowId(lastRowId(currentFile) + 1); + closeCurrentFileReader(); + continue; + } + + if (nextRowId > lastRowId) { + return null; + } + + // skip files whose ranges are before nextRowId + while (nextFileIndex < files.size() + && lastRowId(files.get(nextFileIndex)) < nextRowId) { + nextFileIndex++; + } + if (nextFileIndex >= files.size()) { + return placeHolderBatch(lastRowId); + } + + DataFileMeta nextFile = files.get(nextFileIndex); + if (nextFile.nonNullFirstRowId() > nextRowId) { + return placeHolderBatch(nextFile.nonNullFirstRowId() - 1); + } + + createReader(nextFile); + } + } + + /** + * Set nextRowId and try to move to the next selected row id. So the final nextRowId may be + * greater than the input value. + */ + private void setNextRowId(long nextRowId) { + this.nextRowId = nextRowId; + tryMoveToSelectedRow(); + } + + private void tryMoveToSelectedRow() { + if (nextRowId > lastRowId || rowRanges == null) { + return; + } + + while (nextRowRangeIndex < rowRanges.size()) { + Range range = rowRanges.get(nextRowRangeIndex); + if (nextRowId >= range.from && nextRowId <= range.to) { + // if nextRowId is within the range, do not need to move + return; + } else if (nextRowId < range.from) { + // else if nextRowId < next range, move to next range's `from` + nextRowId = range.from; + return; + } + // else nextRowId > range.to, try next range + nextRowRangeIndex++; + } + + // all ranges consumed, no need to read + nextRowId = lastRowId + 1; + } + + private RecordIterator placeHolderBatch(long endRowId) { + return new RecordIterator() { + long rowId; + + @Nullable + @Override + public InternalRow next() { + rowId = nextRowId; + if (rowId > endRowId) { + return null; + } + setNextRowId(rowId + 1); + return placeHolderRow(); + } + + @Override + public void releaseBatch() { + // nothing to release + } + }; + } + + private InternalRow placeHolderRow() { + if (placeholderRow == null) { + GenericRow row = new GenericRow(readRowType.getFieldCount()); + row.setField(blobIndex, Blob.PLACE_HOLDER); + placeholderRow = row; + } + return placeholderRow; + } + + private long lastRowId(DataFileMeta file) { + return file.nonNullFirstRowId() + file.rowCount() - 1; + } + + private void closeCurrentFileReader() throws IOException { + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + currentFile = null; + } + + private void createReader(DataFileMeta nextFile) throws IOException { + currentFile = nextFile; + currentReader = readerFactory.create(nextFile); + nextFileIndex++; + } + + @Override + public void close() throws IOException { + closeCurrentFileReader(); + } + } + + /** Factory to create readers. */ + interface BlobFileReaderFactory { + RecordReader create(DataFileMeta file) throws IOException; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index c2f10b9a2b0c..0d2cd1015662 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -46,10 +46,12 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.FormatReaderMapping; import org.apache.paimon.utils.FormatReaderMapping.Builder; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RangeHelper; import org.apache.paimon.utils.RoaringBitmap32; @@ -228,7 +230,8 @@ private DataEvolutionFileReader createUnionReader( || isVectorStoreFile(file.fileName()), "Only blob/vector-store files need to call this method."); return schemaFetcher.apply(file.schemaId()).logicalRowType(); - }); + }, + rowRanges != null); long rowCount = fieldsFiles.get(0).rowCount(); long firstRowId = fieldsFiles.get(0).files().get(0).nonNullFirstRowId(); @@ -302,15 +305,16 @@ private DataEvolutionFileReader createUnionReader( dataSchema, readFields, false)); + RowType partialReadRowType = new RowType(readFields); fileRecordReaders[i] = new ForceSingleBatchReader( - createFileReader( + createFieldBunchReader( partition, bunch, dataFilePathFactory, formatReaderMapping, rowRanges, - readRowType)); + partialReadRowType)); } } @@ -327,6 +331,84 @@ private DataEvolutionFileReader createUnionReader( return new DataEvolutionFileReader(rowOffsets, fieldOffsets, fileRecordReaders); } + private RecordReader createFieldBunchReader( + BinaryRow partition, + FieldBunch bunch, + DataFilePathFactory dataFilePathFactory, + FormatReaderMapping formatReaderMapping, + List rowRanges, + RowType readRowType) + throws IOException { + if (bunch instanceof DataBunch) { + // for data bunch, directly read the single file + return createFileReader( + partition, + bunch.files().get(0), + dataFilePathFactory, + formatReaderMapping, + rowRanges, + readRowType); + } else if (bunch instanceof VectorFileBunch) { + // for vector bunch, sequential read all data files and concat them + List> readerSuppliers = new ArrayList<>(); + for (DataFileMeta file : bunch.files()) { + RoaringBitmap32 selection = file.toFileSelection(rowRanges); + FormatReaderContext formatReaderContext = + new FormatReaderContext( + fileIO, + dataFilePathFactory.toPath(file), + file.fileSize(), + selection); + readerSuppliers.add( + () -> + new DataFileRecordReader( + readRowType, + formatReaderMapping.getReaderFactory(), + formatReaderContext, + coreOptions.scanIgnoreCorruptFile(), + coreOptions.scanIgnoreLostFile(), + formatReaderMapping.getIndexMapping(), + formatReaderMapping.getCastMapping(), + PartitionUtils.create( + formatReaderMapping.getPartitionPair(), partition), + true, + file.firstRowId(), + file.maxSequenceNumber(), + formatReaderMapping.getSystemFields())); + } + return ConcatRecordReader.create(readerSuppliers); + } else if (bunch instanceof BlobFileBunch) { + // for blob funch, fallback on placeholders + int blobIndex = findBlobFieldIndex(readRowType); + checkArgument(blobIndex >= 0, "Blob bunch read type should contain a blob field."); + return new BlobFallbackRecordReader( + bunch.files(), + bunch.rowCount(), + file -> + createFileReader( + partition, + file, + dataFilePathFactory, + formatReaderMapping, + rowRanges, + readRowType), + rowRanges, + readRowType, + blobIndex); + } else { + throw new UnsupportedOperationException("Unsupported bunch type: " + bunch); + } + } + + private static int findBlobFieldIndex(RowType rowType) { + for (int i = 0; i < rowType.getFieldCount(); i++) { + if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) { + return i; + } + } + return -1; + } + private FileRecordReader createFileReader( BinaryRow partition, DataFilePathFactory dataFilePathFactory, @@ -351,49 +433,6 @@ private FileRecordReader createFileReader( partition, file, dataFilePathFactory, formatReaderMapping, rowRanges, readRowType); } - private RecordReader createFileReader( - BinaryRow partition, - FieldBunch bunch, - DataFilePathFactory dataFilePathFactory, - FormatReaderMapping formatReaderMapping, - List rowRanges, - RowType readRowType) - throws IOException { - if (bunch.files().size() == 1) { - return createFileReader( - partition, - bunch.files().get(0), - dataFilePathFactory, - formatReaderMapping, - rowRanges, - readRowType); - } - List> readerSuppliers = new ArrayList<>(); - for (DataFileMeta file : bunch.files()) { - RoaringBitmap32 selection = file.toFileSelection(rowRanges); - FormatReaderContext formatReaderContext = - new FormatReaderContext( - fileIO, dataFilePathFactory.toPath(file), file.fileSize(), selection); - readerSuppliers.add( - () -> - new DataFileRecordReader( - readRowType, - formatReaderMapping.getReaderFactory(), - formatReaderContext, - coreOptions.scanIgnoreCorruptFile(), - coreOptions.scanIgnoreLostFile(), - formatReaderMapping.getIndexMapping(), - formatReaderMapping.getCastMapping(), - PartitionUtils.create( - formatReaderMapping.getPartitionPair(), partition), - true, - file.firstRowId(), - file.maxSequenceNumber(), - formatReaderMapping.getSystemFields())); - } - return ConcatRecordReader.create(readerSuppliers); - } - private FileRecordReader createFileReader( BinaryRow partition, DataFileMeta file, @@ -433,8 +472,8 @@ public static List splitFieldBunches( Function fileToRowType, boolean rowIdPushDown) { List fieldsFiles = new ArrayList<>(); - Map blobBunchMap = new HashMap<>(); - Map vectorStoreBunchMap = new TreeMap<>(); + Map blobBunchMap = new HashMap<>(); + Map vectorStoreBunchMap = new TreeMap<>(); long rowCount = -1; for (DataFileMeta file : needMergeFiles) { if (isBlobFile(file.fileName())) { @@ -443,8 +482,7 @@ public static List splitFieldBunches( final long expectedRowCount = rowCount; blobBunchMap .computeIfAbsent( - fieldId, - key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown)) + fieldId, key -> new BlobFileBunch(expectedRowCount, rowIdPushDown)) .add(file); } else if (isVectorStoreFile(file.fileName())) { RowType rowType = fileToRowType.apply(file); @@ -456,7 +494,7 @@ public static List splitFieldBunches( vectorStoreBunchMap .computeIfAbsent( vectorStoreKey, - key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown)) + key -> new VectorFileBunch(expectedRowCount, rowIdPushDown)) .add(file); } else { // Normal file, just add it to the current merge split @@ -496,8 +534,67 @@ public List files() { } } + /** + * The {@link FieldBunch} for blobs. Compared to {@link VectorFileBunch} which only contains + * data files of the max max_seq number, {@link BlobFileBunch} contains all blob files. + */ @VisibleForTesting - static class SpecialFieldBunch implements FieldBunch { + static class BlobFileBunch implements FieldBunch { + + final List files; + final List ranges; + final long expectedRowCount; + final boolean rowIdPushdown; + + BlobFileBunch(long expectedRowCount, boolean rowIdPushdown) { + this.files = new ArrayList<>(); + this.expectedRowCount = expectedRowCount; + this.ranges = new ArrayList<>(); + this.rowIdPushdown = rowIdPushdown; + } + + void add(DataFileMeta file) { + if (!isBlobFile(file.fileName())) { + throw new IllegalArgumentException("Only blob file can be added to this bunch."); + } + if (!files.isEmpty()) { + checkArgument( + file.writeCols().equals(files.get(0).writeCols()), + "All files in this bunch should have the same write columns."); + } + + files.add(file); + ranges.add(file.nonNullRowIdRange()); + } + + @Override + public long rowCount() { + List merged = Range.sortAndMergeOverlap(ranges, true); + if (!rowIdPushdown) { + Preconditions.checkState( + merged.size() == 1, + "Blob file bunch should always contain a contiguous row range."); + + long rowCount = merged.get(0).count(); + Preconditions.checkState( + rowCount == expectedRowCount, + "The merged rowCount %s of blob file bunch should be aligned with normal files %s.", + rowCount, + expectedRowCount); + } + + return expectedRowCount; + } + + @Override + public List files() { + return files; + } + } + + /** {@link FieldBunch} for vector-store files. */ + @VisibleForTesting + static class VectorFileBunch implements FieldBunch { final List files; final long expectedRowCount; @@ -508,70 +605,65 @@ static class SpecialFieldBunch implements FieldBunch { long latestMaxSequenceNumber = -1; long rowCount; - SpecialFieldBunch(long expectedRowCount, boolean rowIdPushDown) { + VectorFileBunch(long expectedRowCount, boolean rowIdPushDown) { this.files = new ArrayList<>(); - this.rowCount = 0; this.expectedRowCount = expectedRowCount; this.rowIdPushDown = rowIdPushDown; } void add(DataFileMeta file) { - if (!isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName())) { + if (!isVectorStoreFile(file.fileName())) { throw new IllegalArgumentException( - "Only blob/vector-store file can be added to this bunch."); + "Only vector-store file can be added to this bunch."); } - if (file.nonNullFirstRowId() == latestFistRowId) { if (file.maxSequenceNumber() >= latestMaxSequenceNumber) { throw new IllegalArgumentException( - "Blob/vector-store file with same first row id should have decreasing sequence number."); + "Vector file with same first row id should have decreasing sequence number."); } return; } + if (!files.isEmpty()) { long firstRowId = file.nonNullFirstRowId(); - if (rowIdPushDown) { - if (firstRowId < expectedNextFirstRowId) { - if (file.maxSequenceNumber() > latestMaxSequenceNumber) { - DataFileMeta lastFile = files.remove(files.size() - 1); - rowCount -= lastFile.rowCount(); - } else { - return; - } - } - } else { - if (firstRowId < expectedNextFirstRowId) { - checkArgument( - file.maxSequenceNumber() < latestMaxSequenceNumber, - "Blob/vector-store file with overlapping row id should have decreasing sequence number."); + if (rowIdPushDown && firstRowId < expectedNextFirstRowId) { + if (file.maxSequenceNumber() > latestMaxSequenceNumber) { + DataFileMeta lastFile = files.remove(files.size() - 1); + rowCount -= lastFile.rowCount(); + } else { return; - } else if (firstRowId > expectedNextFirstRowId) { - throw new IllegalArgumentException( - "Blob/vector-store file first row id should be continuous, expect " - + expectedNextFirstRowId - + " but got " - + firstRowId); } + } else if (firstRowId < expectedNextFirstRowId) { + checkArgument( + file.maxSequenceNumber() < latestMaxSequenceNumber, + "Vector file with overlapping row id should have decreasing sequence number."); + return; + } else if (!rowIdPushDown && firstRowId > expectedNextFirstRowId) { + throw new IllegalArgumentException( + "Vector file first row id should be continuous, expect " + + expectedNextFirstRowId + + " but got " + + firstRowId); } + if (!files.isEmpty()) { - if (!isBlobFile(file.fileName())) { - checkArgument( - file.schemaId() == files.get(0).schemaId(), - "All files in this bunch should have the same schema id."); - } + checkArgument( + file.schemaId() == files.get(0).schemaId(), + "All files in this bunch should have the same schema id."); checkArgument( file.writeCols().equals(files.get(0).writeCols()), "All files in this bunch should have the same write columns."); } } + files.add(file); rowCount += file.rowCount(); checkArgument( rowCount <= expectedRowCount, - "Blob/vector-store files row count exceed the expect " + expectedRowCount); - this.latestMaxSequenceNumber = file.maxSequenceNumber(); - this.latestFistRowId = file.nonNullFirstRowId(); - this.expectedNextFirstRowId = latestFistRowId + file.rowCount(); + "Vector files row count exceed the expect " + expectedRowCount); + latestMaxSequenceNumber = file.maxSequenceNumber(); + latestFistRowId = file.nonNullFirstRowId(); + expectedNextFirstRowId = latestFistRowId + file.rowCount(); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index 38aabeda6a7b..8a3e0e5ff98b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator; import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobData; @@ -30,15 +31,22 @@ import org.apache.paimon.data.BlobViewStruct; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.blob.BlobFileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.DataEvolutionSplitRead; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; @@ -48,6 +56,7 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.system.RowTrackingTable; import org.apache.paimon.types.DataField; @@ -150,6 +159,73 @@ public void testBasic() throws Exception { assertThat(integer.get()).isEqualTo(1000); } + @Test + public void testReadBlobPlaceHolderFallback() throws Exception { + createTableDefault(); + writeDataDefault( + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("first"), new BlobData(blobBytes)), + GenericRow.of( + 2, BinaryString.fromString("second"), new BlobData(blobBytes)), + GenericRow.of( + 3, BinaryString.fromString("third"), new BlobData(blobBytes)))); + + FileStoreTable table = getTableDefault(); + List files = + table.store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + DataFileMeta dataFile = + files.stream() + .filter(file -> !BlobFileFormat.isBlobFile(file.fileName())) + .findFirst() + .get(); + DataFileMeta oldBlobFile = + files.stream() + .filter(file -> BlobFileFormat.isBlobFile(file.fileName())) + .findFirst() + .get(); + + byte[] updatedBytes = "updated-blob".getBytes(); + DataFilePathFactory pathFactory = + table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0); + DataFileMeta newBlobFile = + writeBlobFile( + table.fileIO(), + pathFactory.newBlobPath(), + Arrays.asList(Blob.PLACE_HOLDER, new BlobData(updatedBytes)), + dataFile.nonNullFirstRowId(), + oldBlobFile.maxSequenceNumber() + 1, + oldBlobFile.schemaId(), + oldBlobFile.writeCols()); + + DataSplit split = + DataSplit.builder() + .withSnapshot(1L) + .withPartition(BinaryRow.EMPTY_ROW) + .withBucket(0) + .withBucketPath(pathFactory.parent().toString()) + .withDataFiles(Arrays.asList(dataFile, newBlobFile, oldBlobFile)) + .build(); + + DataEvolutionSplitRead read = + new DataEvolutionSplitRead( + table.fileIO(), + table.schemaManager(), + table.schema(), + table.rowType(), + table.coreOptions(), + table.store().pathFactory()); + + List actual = new ArrayList<>(); + read.createReader(split).forEachRemaining(row -> actual.add(row.getBlob(2).toData())); + + assertThat(actual.size()).isEqualTo(3); + assertThat(actual.get(0)).isEqualTo(blobBytes); + assertThat(actual.get(1)).isEqualTo(updatedBytes); + assertThat(actual.get(2)).isEqualTo(blobBytes); + } + @Test public void testWriteByInputStream() throws Exception { createTableDefault(); @@ -971,6 +1047,48 @@ private static void writeFile(FileIO fileIO, Path path, byte[] bytes) throws IOE } } + private static DataFileMeta writeBlobFile( + FileIO fileIO, + Path path, + List blobs, + long firstRowId, + long maxSequenceNumber, + long schemaId, + List writeCols) + throws IOException { + try (PositionOutputStream out = fileIO.newOutputStream(path, false)) { + FormatWriter writer = + new BlobFileFormat() + .createWriterFactory(RowType.of(DataTypes.BLOB())) + .create(out, "none"); + for (Blob blob : blobs) { + writer.addElement(GenericRow.of(blob)); + } + writer.close(); + } + return DataFileMeta.create( + path.getName(), + fileIO.getFileSize(path), + blobs.size(), + DataFileMeta.EMPTY_MIN_KEY, + DataFileMeta.EMPTY_MAX_KEY, + SimpleStats.EMPTY_STATS, + SimpleStats.EMPTY_STATS, + 0, + maxSequenceNumber, + schemaId, + DataFileMeta.DUMMY_LEVEL, + Collections.emptyList(), + Timestamp.fromEpochMillis(System.currentTimeMillis()), + 0L, + null, + FileSource.APPEND, + null, + null, + firstRowId, + writeCols); + } + private static long countFilesWithSuffix(FileIO fileIO, Path root, String suffix) throws IOException { long count = 0; diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java new file mode 100644 index 000000000000..47e2ac80415b --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.operation.BlobFallbackRecordReader.BlobSequenceGroupRecordReader; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReader.RecordIterator; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link BlobFallbackRecordReader}. */ +public class BlobFallbackRecordReaderTest { + + private static final int BLOB_INDEX = 0; + private static final String BLOB_FIELD = "blob_col"; + private static final RowType READ_ROW_TYPE = + new RowType( + Arrays.asList( + new DataField(BLOB_INDEX, BLOB_FIELD, DataTypes.BLOB()), + new DataField(1, SpecialFields.ROW_ID.name(), DataTypes.BIGINT()), + new DataField( + 2, SpecialFields.SEQUENCE_NUMBER.name(), DataTypes.BIGINT()))); + + @Test + public void testBlobSequenceGroupReaderWithRowRanges() throws Exception { + List files = + Arrays.asList(blobFile("blob1", 0, 3, 10), blobFile("blob2", 5, 2, 10)); + List rowRanges = ranges(1, 1, 3, 5); + + ReadResult rows = readSequenceGroup(files, rowRanges, 0, 6, 10); + + assertThat(rows.rowIds).containsExactly(1L, 5L); + assertThat(rows.placeholderRowCount).isEqualTo(2); + assertThat(rows.batchSizes).containsExactly(1, 2, 1); + } + + @Test + public void testBlobSequenceGroupReaderWithMultipleRangesInFileAndGap() throws Exception { + DataFileMeta wideFile = blobFile("wide-file", 20, 31, 10); + List wideFileRanges = ranges(9, 10, 19, 20, 25, 30, 35, 40, 43, 45, 48, 52, 55, 56); + + ReadResult wideFileRows = + readSequenceGroup(Collections.singletonList(wideFile), wideFileRanges, 10, 60, 10); + + List wideFileActualRowIds = rowIdsInRanges(20, 50, wideFileRanges); + assertThat(wideFileRows.rowIds).containsExactlyElementsOf(wideFileActualRowIds); + assertThat(wideFileRows.placeholderRowCount) + .isEqualTo( + rowIdsInRanges(10, 19, wideFileRanges).size() + + rowIdsInRanges(51, 60, wideFileRanges).size()); + assertThat(wideFileRows.batchSizes) + .containsExactlyElementsOf(batchSizes(2, wideFileActualRowIds.size(), 1, 4)); + + DataFileMeta firstFile = blobFile("first-file", 0, 11, 10); + DataFileMeta secondFile = blobFile("second-file", 50, 11, 10); + List gapRanges = ranges(0, 0, 9, 12, 25, 30, 35, 40, 43, 45, 48, 52, 58, 62); + + ReadResult gapRows = + readSequenceGroup(Arrays.asList(firstFile, secondFile), gapRanges, 0, 62, 10); + + assertThat(gapRows.rowIds) + .containsExactlyElementsOf( + concat( + rowIdsInRanges(0, 10, gapRanges), + rowIdsInRanges(50, 60, gapRanges))); + assertThat(gapRows.placeholderRowCount) + .isEqualTo( + rowIdsInRanges(11, 49, gapRanges).size() + + rowIdsInRanges(61, 62, gapRanges).size()); + assertThat(gapRows.batchSizes).containsExactly(1, 1, 1, 19, 1, 1, 1, 1, 1, 1, 2); + } + + @Test + public void testBlobFallbackRecordReader() throws Exception { + DataFileMeta newFile = blobFile("new-file", 0, 3, 2); + DataFileMeta oldFile = blobFile("old-file", 0, 5, 1); + + ReadResult rows = + readFallback(Arrays.asList(newFile, oldFile), 5, null, placeholderRows(newFile, 1)); + + assertThat(rows.rowIds).containsExactly(0L, 1L, 2L, 3L, 4L); + assertThat(rows.sequenceNumbers).containsExactly(2L, 1L, 2L, 1L, 1L); + } + + @Test + public void testBlobFallbackRecordReaderThrowsIfAllRowsArePlaceholders() { + DataFileMeta newFile = blobFile("new-placeholder-file", 0, 1, 2); + DataFileMeta oldFile = blobFile("old-placeholder-file", 0, 1, 1); + + assertThatThrownBy( + () -> + readFallback( + Arrays.asList(newFile, oldFile), + 1, + null, + placeholderRows(newFile, 0, oldFile, 0))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("all blob files at the same row id store a placeholder"); + } + + @Test + public void testBlobFallbackRecordReaderWithRowRanges() throws Exception { + DataFileMeta oldFile = blobFile("old-file", 20, 26, 1); + DataFileMeta newFile1 = blobFile("new-file-1", 20, 11, 2); + DataFileMeta newFile2 = blobFile("new-file-2", 40, 6, 2); + List rowRanges = ranges(15, 20, 25, 26, 29, 33, 35, 41); + + ReadResult rows = + readFallback( + Arrays.asList(newFile2, oldFile, newFile1), + 26, + rowRanges, + Collections.emptySet()); + + assertThat(rows.rowIds) + .containsExactly( + 20L, 25L, 26L, 29L, 30L, 31L, 32L, 33L, 35L, 36L, 37L, 38L, 39L, 40L, 41L); + assertThat(rows.sequenceNumbers) + .containsExactly(2L, 2L, 2L, 2L, 2L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 2L, 2L); + } + + private static ReadResult readFallback( + List files, + long rowCount, + List rowRanges, + Set placeholderRows) + throws Exception { + return ReadResult.read( + new BlobFallbackRecordReader( + files, + rowCount, + file -> oneRowPerBatchReader(fileRows(file, rowRanges, placeholderRows)), + rowRanges, + READ_ROW_TYPE, + BLOB_INDEX)); + } + + private static ReadResult readSequenceGroup( + List files, + List rowRanges, + long firstRowId, + long lastRowId, + long sequenceNumber) + throws Exception { + return ReadResult.read( + new BlobSequenceGroupRecordReader( + files, + file -> oneRowPerBatchReader(fileRows(file, rowRanges)), + rowRanges, + READ_ROW_TYPE, + BLOB_INDEX, + firstRowId, + lastRowId)); + } + + private static DataFileMeta blobFile( + String fileName, long firstRowId, long rowCount, long maxSequenceNumber) { + return DataFileMeta.create( + fileName + ".blob", + rowCount, + rowCount, + DataFileMeta.EMPTY_MIN_KEY, + DataFileMeta.EMPTY_MAX_KEY, + SimpleStats.EMPTY_STATS, + SimpleStats.EMPTY_STATS, + 0, + maxSequenceNumber, + 0L, + DataFileMeta.DUMMY_LEVEL, + Collections.emptyList(), + Timestamp.fromEpochMillis(System.currentTimeMillis()), + rowCount, + null, + FileSource.APPEND, + null, + null, + firstRowId, + Arrays.asList(BLOB_FIELD)); + } + + private static List ranges(long... bounds) { + if (bounds.length % 2 != 0) { + throw new IllegalArgumentException("Range bounds should be paired."); + } + + List ranges = new ArrayList<>(); + for (int i = 0; i < bounds.length; i += 2) { + ranges.add(new Range(bounds[i], bounds[i + 1])); + } + return ranges; + } + + private static List fileRows(DataFileMeta file, List rowRanges) { + return fileRows(file, rowRanges, Collections.emptySet()); + } + + private static List fileRows( + DataFileMeta file, List rowRanges, Set placeholderRows) { + List rows = new ArrayList<>(); + long lastRowId = file.nonNullFirstRowId() + file.rowCount() - 1; + for (long rowId = file.nonNullFirstRowId(); rowId <= lastRowId; rowId++) { + if (selected(rowId, rowRanges)) { + rows.add( + blobRow( + rowId, + file.maxSequenceNumber(), + placeholderRows.contains(rowKey(file, rowId)))); + } + } + return rows; + } + + private static boolean selected(long rowId, List rowRanges) { + if (rowRanges == null) { + return true; + } + for (Range range : rowRanges) { + if (rowId < range.from) { + return false; + } + if (rowId <= range.to) { + return true; + } + } + return false; + } + + private static List rowIdsInRanges(long firstRowId, long lastRowId, List ranges) { + List rowIds = new ArrayList<>(); + for (long rowId = firstRowId; rowId <= lastRowId; rowId++) { + if (selected(rowId, ranges)) { + rowIds.add(rowId); + } + } + return rowIds; + } + + private static List concat(List first, List second) { + List all = new ArrayList<>(first); + all.addAll(second); + return all; + } + + private static List batchSizes( + int firstBatchSize, int repeatedBatchCount, int repeatedBatchSize, int lastBatchSize) { + List batchSizes = new ArrayList<>(); + batchSizes.add(firstBatchSize); + for (int i = 0; i < repeatedBatchCount; i++) { + batchSizes.add(repeatedBatchSize); + } + batchSizes.add(lastBatchSize); + return batchSizes; + } + + private static Set placeholderRows(DataFileMeta file, long rowId) { + Set keys = new HashSet<>(); + keys.add(rowKey(file, rowId)); + return keys; + } + + private static Set placeholderRows( + DataFileMeta firstFile, long firstRowId, DataFileMeta secondFile, long secondRowId) { + Set keys = placeholderRows(firstFile, firstRowId); + keys.add(rowKey(secondFile, secondRowId)); + return keys; + } + + private static String rowKey(DataFileMeta file, long rowId) { + return file.fileName() + "#" + rowId; + } + + private static InternalRow blobRow(long rowId, long sequenceNumber, boolean placeholder) { + GenericRow row = new GenericRow(3); + row.setField( + BLOB_INDEX, + placeholder ? Blob.PLACE_HOLDER : new BlobData(new byte[] {(byte) rowId})); + row.setField(1, rowId); + row.setField(2, sequenceNumber); + return row; + } + + private static RecordReader oneRowPerBatchReader(List rows) { + return new RecordReader() { + + int index; + + @Override + public RecordIterator readBatch() { + if (index >= rows.size()) { + return null; + } + InternalRow row = rows.get(index++); + return new RecordIterator() { + + boolean returned; + + @Override + public InternalRow next() { + if (returned) { + return null; + } + returned = true; + return row; + } + + @Override + public void releaseBatch() {} + }; + } + + @Override + public void close() {} + }; + } + + private static class ReadResult { + final List rowIds = new ArrayList<>(); + final List sequenceNumbers = new ArrayList<>(); + final List batchSizes = new ArrayList<>(); + int placeholderRowCount; + + static ReadResult read(RecordReader reader) throws Exception { + try { + ReadResult result = new ReadResult(); + RecordIterator batch; + while ((batch = reader.readBatch()) != null) { + int batchSize = 0; + InternalRow row; + while ((row = batch.next()) != null) { + result.add(row); + batchSize++; + } + batch.releaseBatch(); + result.batchSizes.add(batchSize); + } + return result; + } finally { + reader.close(); + } + } + + private void add(InternalRow row) { + if (row.getBlob(BLOB_INDEX) == Blob.PLACE_HOLDER) { + placeholderRowCount++; + } else { + rowIds.add(row.getLong(1)); + sequenceNumbers.add(row.getLong(2)); + } + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java index 9bb9a7274ed1..e240b7fb422c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java @@ -21,8 +21,9 @@ import org.apache.paimon.data.Timestamp; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.operation.DataEvolutionSplitRead.BlobFileBunch; import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch; -import org.apache.paimon.operation.DataEvolutionSplitRead.SpecialFieldBunch; +import org.apache.paimon.operation.DataEvolutionSplitRead.VectorFileBunch; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -42,150 +43,149 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Tests for {@link SpecialFieldBunch}. */ +/** Tests for blob and vector field bunches. */ public class DataEvolutionReadTest { - private SpecialFieldBunch blobBunch; + private VectorFileBunch vectorBunch; @BeforeEach public void setUp() { - blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, false); + vectorBunch = new VectorFileBunch(Long.MAX_VALUE, false); } @Test - public void testAddSingleBlobEntry() { - DataFileMeta blobEntry = createBlobFile("blob1", 0L, 100L, 1L); + public void testAddSingleVectorEntry() { + DataFileMeta vectorEntry = createVectorFile("vector1", 0L, 100L, 1L); - blobBunch.add(blobEntry); + vectorBunch.add(vectorEntry); - assertThat(blobBunch.files).hasSize(1); - assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry); - assertThat(blobBunch.rowCount()).isEqualTo(100); - assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0); - assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col")); + assertThat(vectorBunch.files).hasSize(1); + assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry); + assertThat(vectorBunch.rowCount()).isEqualTo(100); + assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0); + assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col")); } @Test - public void testAddBlobEntryAndTail() { - DataFileMeta blobEntry = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobTail = createBlobFile("blob2", 100, 200, 1); - - blobBunch.add(blobEntry); - blobBunch.add(blobTail); - - assertThat(blobBunch.files).hasSize(2); - assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry); - assertThat(blobBunch.files.get(1)).isEqualTo(blobTail); - assertThat(blobBunch.rowCount()).isEqualTo(300); - assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0); - assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col")); - assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L); + public void testAddVectorEntryAndTail() { + DataFileMeta vectorEntry = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorTail = createVectorFile("vector2", 100, 200, 1); + + vectorBunch.add(vectorEntry); + vectorBunch.add(vectorTail); + + assertThat(vectorBunch.files).hasSize(2); + assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry); + assertThat(vectorBunch.files.get(1)).isEqualTo(vectorTail); + assertThat(vectorBunch.rowCount()).isEqualTo(300); + assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0); + assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col")); + assertThat(vectorBunch.files.get(0).schemaId()).isEqualTo(0L); } @Test - public void testAddNonBlobFileThrowsException() { + public void testAddNonVectorFileThrowsException() { DataFileMeta normalFile = createNormalFile("normal1.parquet", 0, 100, 1, 0L); - assertThatThrownBy(() -> blobBunch.add(normalFile)) + assertThatThrownBy(() -> vectorBunch.add(normalFile)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Only blob/vector-store file can be added to this bunch."); + .hasMessage("Only vector-store file can be added to this bunch."); } @Test - public void testAddBlobFileWithSameFirstRowId() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 2); + public void testAddVectorFileWithSameFirstRowId() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 2); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with same firstRowId but higher sequence number should throw exception - assertThatThrownBy(() -> blobBunch.add(blobEntry2)) + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob/vector-store file with same first row id should have decreasing sequence number."); + "Vector file with same first row id should have decreasing sequence number."); } @Test - public void testAddBlobFileWithSameFirstRowIdAndLowerSequenceNumber() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2); - DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 1); + public void testAddVectorFileWithSameFirstRowIdAndLowerSequenceNumber() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 1); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with same firstRowId and lower sequence number should be ignored - blobBunch.add(blobEntry2); + vectorBunch.add(vectorEntry2); - assertThat(blobBunch.files).hasSize(1); - assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1); + assertThat(vectorBunch.files).hasSize(1); + assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1); } @Test - public void testAddBlobFileWithOverlappingRowId() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2); - DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 1); + public void testAddVectorFileWithOverlappingRowId() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 1); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with overlapping row id and lower sequence number should be ignored - blobBunch.add(blobEntry2); + vectorBunch.add(vectorEntry2); - assertThat(blobBunch.files).hasSize(1); - assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1); + assertThat(vectorBunch.files).hasSize(1); + assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1); } @Test - public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 2); + public void testAddVectorFileWithOverlappingRowIdAndHigherSequenceNumber() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 2); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with overlapping row id and higher sequence number should throw exception - assertThatThrownBy(() -> blobBunch.add(blobEntry2)) + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob/vector-store file with overlapping row id should have decreasing sequence number."); + "Vector file with overlapping row id should have decreasing sequence number."); } @Test - public void testAddBlobFileWithNonContinuousRowId() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1); + public void testAddVectorFileWithNonContinuousRowId() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with non-continuous row id should throw exception - assertThatThrownBy(() -> blobBunch.add(blobEntry2)) + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob/vector-store file first row id should be continuous, expect 100 but got 200"); + "Vector file first row id should be continuous, expect 100 but got 200"); } @Test - public void testAddBlobFileWithDifferentWriteCols() { - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = - createBlobFileWithCols("blob2", 100, 200, 1, Arrays.asList("different_col")); + public void testAddVectorFileWithDifferentWriteCols() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = + createVectorFileWithCols("vector2", 100, 200, 1, Arrays.asList("different_col")); - blobBunch.add(blobEntry1); + vectorBunch.add(vectorEntry1); // Adding file with different write columns should throw exception - assertThatThrownBy(() -> blobBunch.add(blobEntry2)) + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("All files in this bunch should have the same write columns."); } @Test - public void testComplexBlobBunchScenario() { - // Create a complex scenario with multiple blob entries and a tail - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 100, 200, 1); - DataFileMeta blobEntry3 = createBlobFile("blob3", 300, 300, 1); - DataFileMeta blobTail = createBlobFile("blob4", 600, 400, 1); - - blobBunch.add(blobEntry1); - blobBunch.add(blobEntry2); - blobBunch.add(blobEntry3); - blobBunch.add(blobTail); - - assertThat(blobBunch.files).hasSize(4); - assertThat(blobBunch.rowCount()).isEqualTo(1000); - assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0); - assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col")); + public void testComplexVectorBunchScenario() { + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 100, 200, 1); + DataFileMeta vectorEntry3 = createVectorFile("vector3", 300, 300, 1); + DataFileMeta vectorTail = createVectorFile("vector4", 600, 400, 1); + + vectorBunch.add(vectorEntry1); + vectorBunch.add(vectorEntry2); + vectorBunch.add(vectorEntry3); + vectorBunch.add(vectorTail); + + assertThat(vectorBunch.files).hasSize(4); + assertThat(vectorBunch.rowCount()).isEqualTo(1000); + assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0); + assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col")); } @Test @@ -209,26 +209,31 @@ public void testComplexBlobBunchScenario2() { List batch = batches.get(0); - assertThat(batch.get(1).fileName()).contains("blob5"); // pick - assertThat(batch.get(2).fileName()).contains("blob2"); // skip - assertThat(batch.get(3).fileName()).contains("blob1"); // skip - assertThat(batch.get(4).fileName()).contains("blob9"); // pick - assertThat(batch.get(5).fileName()).contains("blob6"); // skip - assertThat(batch.get(6).fileName()).contains("blob3"); // skip - assertThat(batch.get(7).fileName()).contains("blob7"); // pick - assertThat(batch.get(8).fileName()).contains("blob4"); // skip - assertThat(batch.get(9).fileName()).contains("blob8"); // pick + assertThat(batch.get(1).fileName()).contains("blob5"); + assertThat(batch.get(2).fileName()).contains("blob2"); + assertThat(batch.get(3).fileName()).contains("blob1"); + assertThat(batch.get(4).fileName()).contains("blob9"); + assertThat(batch.get(5).fileName()).contains("blob6"); + assertThat(batch.get(6).fileName()).contains("blob3"); + assertThat(batch.get(7).fileName()).contains("blob7"); + assertThat(batch.get(8).fileName()).contains("blob4"); + assertThat(batch.get(9).fileName()).contains("blob8"); List fieldBunches = splitFieldBunches(batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); assertThat(fieldBunches.size()).isEqualTo(2); - SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1); - assertThat(blobBunch.files).hasSize(4); + BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1); + assertThat(blobBunch.files).hasSize(9); assertThat(blobBunch.files.get(0).fileName()).contains("blob5"); - assertThat(blobBunch.files.get(1).fileName()).contains("blob9"); - assertThat(blobBunch.files.get(2).fileName()).contains("blob7"); - assertThat(blobBunch.files.get(3).fileName()).contains("blob8"); + assertThat(blobBunch.files.get(1).fileName()).contains("blob2"); + assertThat(blobBunch.files.get(2).fileName()).contains("blob1"); + assertThat(blobBunch.files.get(3).fileName()).contains("blob9"); + assertThat(blobBunch.files.get(4).fileName()).contains("blob6"); + assertThat(blobBunch.files.get(5).fileName()).contains("blob3"); + assertThat(blobBunch.files.get(6).fileName()).contains("blob7"); + assertThat(blobBunch.files.get(7).fileName()).contains("blob4"); + assertThat(blobBunch.files.get(8).fileName()).contains("blob8"); } @Test @@ -275,19 +280,29 @@ public void testComplexBlobBunchScenario3() { batch, file -> makeBlobRowType(file.writeCols(), String::hashCode)); assertThat(fieldBunches.size()).isEqualTo(3); - SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1); - assertThat(blobBunch.files).hasSize(4); + BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1); + assertThat(blobBunch.files).hasSize(9); assertThat(blobBunch.files.get(0).fileName()).contains("blob5"); - assertThat(blobBunch.files.get(1).fileName()).contains("blob9"); - assertThat(blobBunch.files.get(2).fileName()).contains("blob7"); - assertThat(blobBunch.files.get(3).fileName()).contains("blob8"); - - blobBunch = (SpecialFieldBunch) fieldBunches.get(2); - assertThat(blobBunch.files).hasSize(4); + assertThat(blobBunch.files.get(1).fileName()).contains("blob2"); + assertThat(blobBunch.files.get(2).fileName()).contains("blob1"); + assertThat(blobBunch.files.get(3).fileName()).contains("blob9"); + assertThat(blobBunch.files.get(4).fileName()).contains("blob6"); + assertThat(blobBunch.files.get(5).fileName()).contains("blob3"); + assertThat(blobBunch.files.get(6).fileName()).contains("blob7"); + assertThat(blobBunch.files.get(7).fileName()).contains("blob4"); + assertThat(blobBunch.files.get(8).fileName()).contains("blob8"); + + blobBunch = (BlobFileBunch) fieldBunches.get(2); + assertThat(blobBunch.files).hasSize(9); assertThat(blobBunch.files.get(0).fileName()).contains("blob15"); - assertThat(blobBunch.files.get(1).fileName()).contains("blob19"); - assertThat(blobBunch.files.get(2).fileName()).contains("blob17"); - assertThat(blobBunch.files.get(3).fileName()).contains("blob18"); + assertThat(blobBunch.files.get(1).fileName()).contains("blob12"); + assertThat(blobBunch.files.get(2).fileName()).contains("blob11"); + assertThat(blobBunch.files.get(3).fileName()).contains("blob19"); + assertThat(blobBunch.files.get(4).fileName()).contains("blob16"); + assertThat(blobBunch.files.get(5).fileName()).contains("blob13"); + assertThat(blobBunch.files.get(6).fileName()).contains("blob17"); + assertThat(blobBunch.files.get(7).fileName()).contains("blob14"); + assertThat(blobBunch.files.get(8).fileName()).contains("blob18"); } /** Creates a blob file with the specified parameters. */ @@ -357,8 +372,81 @@ private DataFileMeta createBlobFileWithCols( writeCols); } + private DataFileMeta createVectorFile( + String fileName, long firstRowId, long rowCount, long maxSequenceNumber) { + return createVectorFileWithCols( + fileName, firstRowId, rowCount, maxSequenceNumber, Arrays.asList("vector_col")); + } + + private DataFileMeta createVectorFileWithSchema( + String fileName, + long firstRowId, + long rowCount, + long maxSequenceNumber, + long schemaId) { + return createFile( + fileName + ".vector.avro", + firstRowId, + rowCount, + maxSequenceNumber, + schemaId, + Arrays.asList("vector_col")); + } + + private DataFileMeta createVectorFileWithCols( + String fileName, + long firstRowId, + long rowCount, + long maxSequenceNumber, + List writeCols) { + return createFile( + fileName + ".vector.avro", firstRowId, rowCount, maxSequenceNumber, 0L, writeCols); + } + + private DataFileMeta createFile( + String fileName, + long firstRowId, + long rowCount, + long maxSequenceNumber, + long schemaId, + List writeCols) { + return DataFileMeta.create( + fileName, + rowCount, + rowCount, + DataFileMeta.EMPTY_MIN_KEY, + DataFileMeta.EMPTY_MAX_KEY, + SimpleStats.EMPTY_STATS, + SimpleStats.EMPTY_STATS, + 0, + maxSequenceNumber, + schemaId, + DataFileMeta.DUMMY_LEVEL, + Collections.emptyList(), + Timestamp.fromEpochMillis(System.currentTimeMillis()), + rowCount, + null, + FileSource.APPEND, + null, + null, + firstRowId, + writeCols); + } + + @Test + void testAddVectorFilesWithDifferentSchemaId() { + DataFileMeta vectorEntry1 = createVectorFileWithSchema("vector1", 0, 100, 1, 0L); + DataFileMeta vectorEntry2 = createVectorFileWithSchema("vector2", 100, 200, 1, 1L); + + vectorBunch.add(vectorEntry1); + assertThatThrownBy(() -> vectorBunch.add(vectorEntry2)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("All files in this bunch should have the same schema id."); + } + @Test void testAddBlobFilesWithDifferentSchemaId() { + BlobFileBunch blobBunch = new BlobFileBunch(300, false); DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1, 0L); DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200, 1, 1L); @@ -373,24 +461,24 @@ void testAddBlobFilesWithDifferentSchemaId() { @Test public void testRowIdPushDown() { - SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true); - DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); - DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1); - blobBunch.add(blobEntry1); - SpecialFieldBunch finalBlobBunch = blobBunch; - DataFileMeta finalBlobEntry = blobEntry2; - assertThatCode(() -> finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException(); - - blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true); - blobEntry1 = createBlobFile("blob1", 0, 100, 1); - blobEntry2 = createBlobFile("blob2", 50, 200, 2); - blobBunch.add(blobEntry1); - blobBunch.add(blobEntry2); - assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2); - - SpecialFieldBunch finalBlobBunch2 = blobBunch; - DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2); - assertThatCode(() -> finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException(); + VectorFileBunch vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true); + DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1); + vectorBunch.add(vectorEntry1); + VectorFileBunch finalVectorBunch = vectorBunch; + DataFileMeta finalVectorEntry = vectorEntry2; + assertThatCode(() -> finalVectorBunch.add(finalVectorEntry)).doesNotThrowAnyException(); + + vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true); + vectorEntry1 = createVectorFile("vector1", 0, 100, 1); + vectorEntry2 = createVectorFile("vector2", 50, 200, 2); + vectorBunch.add(vectorEntry1); + vectorBunch.add(vectorEntry2); + assertThat(vectorBunch.files).containsExactlyInAnyOrder(vectorEntry2); + + VectorFileBunch finalVectorBunch2 = vectorBunch; + DataFileMeta vectorEntry3 = createVectorFile("vector2", 250, 100, 2); + assertThatCode(() -> finalVectorBunch2.add(vectorEntry3)).doesNotThrowAnyException(); } /** Creates a normal (non-blob) file for testing. */ diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java index 27a1ed56170a..d6d87542315f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java @@ -42,7 +42,7 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm byte[] header = new byte[5]; IOUtils.readFully(in, header); byte version = header[4]; - if (version != 1) { + if (version != 1 && version != 2) { throw new IOException("Unsupported version: " + version); } int indexLength = BytesUtils.getInt(header, 0); @@ -55,7 +55,7 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm long[] blobOffsets = new long[blobLengths.length]; long offset = 0; for (int i = 0; i < blobLengths.length; i++) { - if (blobLengths[i] == -1) { + if (blobLengths[i] < 0) { blobOffsets[i] = -1; } else { blobOffsets[i] = offset; @@ -86,7 +86,11 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm } public boolean isNull(int i) { - return blobLengths[i] == -1; + return blobLengths[i] == BlobFormatWriter.NULL_LENGTH; + } + + public boolean isPlaceHolder(int i) { + return blobLengths[i] == BlobFormatWriter.PLACE_HOLDER_LENGTH; } public long blobLength(int i) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java index 801964b86239..489e1d7c619b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java @@ -94,6 +94,8 @@ public InternalRow next() { Blob blob; if (fileMeta.isNull(currentPosition)) { blob = null; + } else if (fileMeta.isPlaceHolder(currentPosition)) { + blob = Blob.PLACE_HOLDER; } else { long offset = fileMeta.blobOffset(currentPosition) + 4; long length = fileMeta.blobLength(currentPosition) - 16; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java index 83d6c0eb915d..d032e1b043ea 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java @@ -43,9 +43,11 @@ /** {@link FormatWriter} for blob file. */ public class BlobFormatWriter implements FileAwareFormatWriter { - public static final byte VERSION = 1; + public static final byte VERSION = 2; public static final int MAGIC_NUMBER = 1481511375; public static final byte[] MAGIC_NUMBER_BYTES = intToLittleEndian(MAGIC_NUMBER); + public static final long NULL_LENGTH = -1L; + public static final long PLACE_HOLDER_LENGTH = -2L; private final PositionOutputStream out; @Nullable private final BlobConsumer writeConsumer; @@ -81,13 +83,17 @@ public boolean deleteFileUponAbort() { public void addElement(InternalRow element) throws IOException { checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only support one field."); if (element.isNullAt(0)) { - lengths.add(-1L); + lengths.add(NULL_LENGTH); if (writeConsumer != null) { writeConsumer.accept(blobFieldName, null); } return; } Blob blob = element.getBlob(0); + if (blob == Blob.PLACE_HOLDER) { + lengths.add(PLACE_HOLDER_LENGTH); + return; + } long previousPos = out.getPos(); crc32.reset(); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java index 0b66e15c2b88..028cc023d1dd 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DeltaVarintCompressor; import org.apache.paimon.utils.RoaringBitmap32; import org.junit.jupiter.api.BeforeEach; @@ -44,7 +45,10 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.zip.CRC32; +import static org.apache.paimon.utils.StreamUtils.intToLittleEndian; +import static org.apache.paimon.utils.StreamUtils.longToLittleEndian; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link BlobFileFormat}. */ @@ -73,22 +77,53 @@ public void testReadBlobInlineBytes() throws IOException { innerTest(false); } + @Test + public void testReadLegacyVersionOneBlobFile() throws IOException { + BlobFileFormat format = new BlobFileFormat(false); + RowType rowType = RowType.of(DataTypes.BLOB()); + List blobs = Arrays.asList("hello".getBytes(), null, "world".getBytes()); + + try (PositionOutputStream out = fileIO.newOutputStream(file, false)) { + writeLegacyVersionOneBlobFile(out, blobs); + } + + FormatReaderFactory readerFactory = format.createReaderFactory(null, rowType, null); + FormatReaderContext context = + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)); + List result = new ArrayList<>(); + readerFactory + .createReader(context) + .forEachRemaining( + row -> result.add(row.isNullAt(0) ? null : row.getBlob(0).toData())); + + assertThat(result).hasSize(blobs.size()); + assertThat((byte[]) result.get(0)).isEqualTo(blobs.get(0)); + assertThat(result.get(1)).isNull(); + assertThat((byte[]) result.get(2)).isEqualTo(blobs.get(2)); + } + private void innerTest(boolean blobAsDescriptor) throws IOException { BlobFileFormat format = new BlobFileFormat(blobAsDescriptor); RowType rowType = RowType.of(DataTypes.BLOB()); // write FormatWriterFactory writerFactory = format.createWriterFactory(rowType); - List blobs = - Arrays.asList("hello".getBytes(), null, "world".getBytes(), new byte[0]); + List blobs = + Arrays.asList( + "hello".getBytes(), + null, + Blob.PLACE_HOLDER, + "world".getBytes(), + new byte[0]); try (PositionOutputStream out = fileIO.newOutputStream(file, false)) { FormatWriter formatWriter = writerFactory.create(out, null); - for (byte[] bytes : blobs) { - if (bytes == null) { + for (Object blob : blobs) { + if (blob == null) { formatWriter.addElement(GenericRow.of((Object) null)); - continue; + } else if (blob == Blob.PLACE_HOLDER) { + formatWriter.addElement(GenericRow.of(Blob.PLACE_HOLDER)); } else { - formatWriter.addElement(GenericRow.of(new BlobData(bytes))); + formatWriter.addElement(GenericRow.of(new BlobData((byte[]) blob))); } } formatWriter.close(); @@ -98,7 +133,7 @@ private void innerTest(boolean blobAsDescriptor) throws IOException { FormatReaderFactory readerFactory = format.createReaderFactory(null, rowType, null); FormatReaderContext context = new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)); - List result = new ArrayList<>(); + List result = new ArrayList<>(); readerFactory .createReader(context) .forEachRemaining( @@ -107,7 +142,10 @@ private void innerTest(boolean blobAsDescriptor) throws IOException { result.add(null); } else { Blob blob = row.getBlob(0); - if (blobAsDescriptor) { + if (blob == Blob.PLACE_HOLDER) { + result.add(Blob.PLACE_HOLDER); + return; + } else if (blobAsDescriptor) { assertThat(blob).isInstanceOf(BlobRef.class); } else { assertThat(blob).isInstanceOf(BlobData.class); @@ -117,19 +155,59 @@ private void innerTest(boolean blobAsDescriptor) throws IOException { }); // assert - assertThat(result).containsExactlyElementsOf(blobs); + assertThat(result).hasSize(blobs.size()); + assertThat((byte[]) result.get(0)).isEqualTo((byte[]) blobs.get(0)); + assertThat(result.get(1)).isNull(); + assertThat(result.get(2)).isSameAs(Blob.PLACE_HOLDER); + assertThat((byte[]) result.get(3)).isEqualTo((byte[]) blobs.get(3)); + assertThat((byte[]) result.get(4)).isEqualTo((byte[]) blobs.get(4)); // read with selection RoaringBitmap32 selection = new RoaringBitmap32(); selection.add(2); context = new FormatReaderContext(fileIO, file, fileIO.getFileSize(file), selection); result.clear(); - readerFactory - .createReader(context) - .forEachRemaining(row -> result.add(row.getBlob(0).toData())); + readerFactory.createReader(context).forEachRemaining(row -> result.add(row.getBlob(0))); // assert - assertThat(result).containsOnly(blobs.get(2)); + assertThat(result).hasSize(1); + assertThat(result.get(0)).isSameAs(Blob.PLACE_HOLDER); + } + + private void writeLegacyVersionOneBlobFile(PositionOutputStream out, List blobs) + throws IOException { + CRC32 crc32 = new CRC32(); + long[] lengths = new long[blobs.size()]; + for (int i = 0; i < blobs.size(); i++) { + byte[] blob = blobs.get(i); + if (blob == null) { + lengths[i] = BlobFormatWriter.NULL_LENGTH; + continue; + } + + long previousPos = out.getPos(); + crc32.reset(); + + crc32.update( + BlobFormatWriter.MAGIC_NUMBER_BYTES, + 0, + BlobFormatWriter.MAGIC_NUMBER_BYTES.length); + out.write(BlobFormatWriter.MAGIC_NUMBER_BYTES); + crc32.update(blob, 0, blob.length); + out.write(blob); + + long binLength = out.getPos() - previousPos + 12; + lengths[i] = binLength; + byte[] lengthBytes = longToLittleEndian(binLength); + crc32.update(lengthBytes, 0, lengthBytes.length); + out.write(lengthBytes); + out.write(intToLittleEndian((int) crc32.getValue())); + } + + byte[] indexBytes = DeltaVarintCompressor.compress(lengths); + out.write(indexBytes); + out.write(intToLittleEndian(indexBytes.length)); + out.write(1); } @Test diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py b/paimon-python/pypaimon/read/reader/format_blob_reader.py index e7e65394aa6d..8dd4e2fbd6bf 100644 --- a/paimon-python/pypaimon/read/reader/format_blob_reader.py +++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py @@ -32,6 +32,8 @@ class FormatBlobReader(RecordBatchReader): + NULL_LENGTH = -1 + PLACE_HOLDER_LENGTH = -2 def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool, @@ -97,6 +99,10 @@ def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch for field_name in self._fields: if blob is None: pydict_data[field_name].append(None) + elif blob is Blob.PLACE_HOLDER: + raise RuntimeError( + "Blob placeholder is not supported by FormatBlobReader yet." + ) elif self._blob_as_descriptor: pydict_data[field_name].append(blob.to_descriptor().serialize()) else: @@ -148,7 +154,7 @@ def _read_index(self) -> None: index_length = struct.unpack(' None: blob_offsets = [] offset = 0 for length in blob_lengths: - if length == -1: + if length < 0: blob_offsets.append(-1) else: blob_offsets.append(offset) @@ -175,6 +181,8 @@ def _read_index(self) -> None: class BlobRecordIterator: MAGIC_NUMBER_SIZE = 4 METADATA_OVERHEAD = 16 + NULL_LENGTH = -1 + PLACE_HOLDER_LENGTH = -2 def __init__(self, file_io: FileIO, file_path: str, blob_lengths: List[int], blob_offsets: List[int], field_name: str): @@ -192,13 +200,17 @@ def __next__(self) -> GenericRow: if self.current_position >= len(self.blob_lengths): raise StopIteration fields = [DataField(0, self.field_name, AtomicType("BLOB"))] - if self.blob_lengths[self.current_position] == -1: + length = self.blob_lengths[self.current_position] + if length == self.NULL_LENGTH: self.current_position += 1 return GenericRow([None], fields, RowKind.INSERT) + if length == self.PLACE_HOLDER_LENGTH: + self.current_position += 1 + return GenericRow([Blob.PLACE_HOLDER], fields, RowKind.INSERT) # Create blob reference for the current blob # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number - blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD + blob_length = length - self.METADATA_OVERHEAD blob = Blob.from_file(self.file_io, self.file_path, blob_offset, blob_length) self.current_position += 1 return GenericRow([blob], fields, RowKind.INSERT) diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index b619b6a76aec..73974d73e8e4 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -277,6 +277,21 @@ def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob' return BlobRef(uri_reader, descriptor) +class _PlaceholderBlob(Blob): + + def to_data(self) -> bytes: + raise RuntimeError("Should never call this method for placeholder blob.") + + def to_descriptor(self) -> BlobDescriptor: + raise RuntimeError("Should never call this method for placeholder blob.") + + def new_input_stream(self) -> BinaryIO: + raise RuntimeError("Should never call this method for placeholder blob.") + + +Blob.PLACE_HOLDER = _PlaceholderBlob() + + class BlobData(Blob): def __init__(self, data: Optional[Union[bytes, bytearray]] = None): diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index d9cf64210d61..1883e9dbbb3c 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -29,7 +29,7 @@ from pypaimon.common.file_io import FileIO from pypaimon.filesystem.local_file_io import LocalFileIO from pypaimon.common.options import Options -from pypaimon.read.reader.format_blob_reader import FormatBlobReader +from pypaimon.read.reader.format_blob_reader import BlobRecordIterator, FormatBlobReader from pypaimon.schema.data_types import AtomicType, DataField from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor from pypaimon.table.row.generic_row import GenericRowDeserializer, GenericRowSerializer, GenericRow @@ -1264,6 +1264,79 @@ def test_null_blob_read_as_descriptor(self): self.assertEqual(desc2.uri, blob_file_path) reader.close() + def test_placeholder_blob_write_read(self): + from pypaimon.write.blob_format_writer import BlobFormatWriter + + file_io = LocalFileIO(self.temp_dir, Options({})) + blob_file_path = os.path.join(self.temp_dir, "placeholder_blob.blob") + + output = open(blob_file_path, 'wb') + writer = BlobFormatWriter(output) + fields = [DataField(0, "blob_field", AtomicType("BLOB"))] + writer.add_element(GenericRow([BlobData(b"hello")], fields, RowKind.INSERT)) + writer.add_element(GenericRow([Blob.PLACE_HOLDER], fields, RowKind.INSERT)) + writer.add_element(GenericRow([None], fields, RowKind.INSERT)) + writer.add_element(GenericRow([BlobData(b"world")], fields, RowKind.INSERT)) + self.assertEqual( + writer.lengths[1:3], + [BlobFormatWriter.PLACE_HOLDER_LENGTH, BlobFormatWriter.NULL_LENGTH]) + writer.close() + + with open(blob_file_path, 'rb') as blob_file: + blob_file.seek(-1, os.SEEK_END) + self.assertEqual(blob_file.read(1), struct.pack(' None: blob_value = row.values[0] if blob_value is None: - self.lengths.append(-1) + self.lengths.append(self.NULL_LENGTH) return if not isinstance(blob_value, Blob): raise ValueError("Field must be Blob/BlobData instance") + if blob_value is Blob.PLACE_HOLDER: + self.lengths.append(self.PLACE_HOLDER_LENGTH) + return + previous_pos = self.position crc32 = 0 # Initialize CRC32 @@ -97,7 +103,7 @@ def write_value(self, col_data, fields, uri_reader_factory=None) -> None: if col_data is None: if not is_blob: raise RuntimeError("Null values are only supported for BLOB type fields") - self.lengths.append(-1) + self.lengths.append(self.NULL_LENGTH) return if is_blob: