diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
index f11a056efbcf..c3ba17513bf0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
@@ -38,6 +38,31 @@
@Public
public interface Blob {
+ /**
+ * The placeholder blob, mainly for blob update in data-evolution. It should never be exposed to
+ * users.
+ */
+ Blob PLACE_HOLDER =
+ new Blob() {
+ @Override
+ public byte[] toData() {
+ throw new UnsupportedOperationException(
+ "Should never call this method for placeholder blob.");
+ }
+
+ @Override
+ public BlobDescriptor toDescriptor() {
+ throw new UnsupportedOperationException(
+ "Should never call this method for placeholder blob.");
+ }
+
+ @Override
+ public SeekableInputStream newInputStream() throws IOException {
+ throw new UnsupportedOperationException(
+ "Should never call this method for placeholder blob.");
+ }
+ };
+
byte[] toData();
BlobDescriptor toDescriptor();
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java
new file mode 100644
index 000000000000..d8ff74d013aa
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.append.ForceSingleBatchReader;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Collections.reverseOrder;
+import static java.util.Comparator.comparingLong;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Resolves blob placeholder rows by falling back through older sequence groups. The read logic is
+ * as below:
+ *
+ *
+ *
Group files by max-seq, higher-seq files will have newer records.
+ *
Sort files by range within each group, and create sequential readers for them. Note that
+ * absent ranges will be read as all-placeholder rows.
+ *
Sort by max-seq and merge read all readers, records at each index will be the first
+ * non-placeholder blob.
+ *
+ */
+public class BlobFallbackRecordReader implements RecordReader {
+
+ private final List> groupReaders = new ArrayList<>();
+ private final int blobIndex;
+ private boolean returned;
+
+ BlobFallbackRecordReader(
+ List files,
+ long rowCount,
+ BlobFileReaderFactory readerFactory,
+ List rowRanges,
+ RowType readRowType,
+ int blobIndex) {
+ this.blobIndex = blobIndex;
+
+ checkArgument(!files.isEmpty(), "Blob bunch should not be empty.");
+ long firstRowId =
+ files.stream().mapToLong(DataFileMeta::nonNullFirstRowId).min().getAsLong();
+ long lastRowId = firstRowId + rowCount - 1;
+
+ // sort group readers in descending order
+ Map> sequenceGroups = new TreeMap<>(reverseOrder());
+ for (DataFileMeta file : files) {
+ sequenceGroups
+ .computeIfAbsent(file.maxSequenceNumber(), ignored -> new ArrayList<>())
+ .add(file);
+ }
+
+ for (Map.Entry> entry : sequenceGroups.entrySet()) {
+ // within each group, sort by first row id
+ List groupFiles = entry.getValue();
+ groupFiles.sort(comparingLong(DataFileMeta::nonNullFirstRowId));
+
+ DataFileMeta current, next;
+ for (int i = 0; i < groupFiles.size() - 1; i++) {
+ current = groupFiles.get(i);
+ next = groupFiles.get(i + 1);
+
+ Preconditions.checkState(
+ !current.nonNullRowIdRange().hasIntersection(next.nonNullRowIdRange()),
+ "Blob files within a same max_seq_num should not overlap. Find: %s, %s",
+ current,
+ next);
+ }
+
+ groupReaders.add(
+ new ForceSingleBatchReader(
+ new BlobSequenceGroupRecordReader(
+ groupFiles,
+ readerFactory,
+ rowRanges,
+ readRowType,
+ blobIndex,
+ firstRowId,
+ lastRowId)));
+ }
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator readBatch() throws IOException {
+ if (returned) {
+ return null;
+ }
+ returned = true;
+
+ // all readers are forced returning single batch
+ RecordIterator[] iterators = new RecordIterator[groupReaders.size()];
+ for (int i = 0; i < groupReaders.size(); i++) {
+ RecordIterator iterator = groupReaders.get(i).readBatch();
+ if (iterator == null) {
+ return null;
+ }
+ iterators[i] = iterator;
+ }
+
+ return new RecordIterator() {
+ @Nullable
+ @Override
+ public InternalRow next() throws IOException {
+ InternalRow result = null;
+ // we should always move each iterator forward
+ for (RecordIterator iterator : iterators) {
+ InternalRow row = iterator.next();
+ if (row == null) {
+ return null;
+ }
+ // result is the first non-placeholder record
+ if (result == null && !isPlaceHolder(row)) {
+ result = row;
+ }
+ }
+ if (result == null) {
+ throw new IllegalStateException(
+ "Invalid state: all blob files at the same row id store a placeholder, it's a bug.");
+ }
+ return result;
+ }
+
+ @Override
+ public void releaseBatch() {
+ for (RecordIterator iterator : iterators) {
+ iterator.releaseBatch();
+ }
+ }
+ };
+ }
+
+ private boolean isPlaceHolder(InternalRow row) {
+ return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) == Blob.PLACE_HOLDER;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException exception = null;
+ for (RecordReader reader : groupReaders) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ if (exception == null) {
+ exception = e;
+ } else {
+ exception.addSuppressed(e);
+ }
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ /**
+ * Reads one blob sequence group (all blob files with the same max_seq_num) and emits
+ * placeholder rows for row id gaps. For example, if the full row range is [0, 100], but there's
+ * only one blob file with row range [20, 80], then the rows with row id [0, 19] and [81, 100]
+ * will be emitted as placeholder rows.
+ *
+ *
This reader should always be fully consumed, or the internal states may be broken.
+ */
+ public static class BlobSequenceGroupRecordReader implements RecordReader {
+
+ private final List files;
+ private final BlobFileReaderFactory readerFactory;
+ // pushed row ranges
+ private final List rowRanges;
+ private final RowType readRowType;
+ private final int blobIndex;
+ private final long lastRowId;
+
+ private RecordReader currentReader;
+ private DataFileMeta currentFile;
+ private int nextFileIndex;
+ private int nextRowRangeIndex;
+ // expected next row id
+ private long nextRowId;
+
+ private InternalRow placeholderRow;
+
+ BlobSequenceGroupRecordReader(
+ List files,
+ BlobFileReaderFactory readerFactory,
+ List rowRanges,
+ RowType readRowType,
+ int blobIndex,
+ long firstRowId,
+ long lastRowId) {
+ this.files = files;
+ this.readerFactory = readerFactory;
+ this.rowRanges = rowRanges == null ? null : Range.sortAndMergeOverlap(rowRanges);
+ this.readRowType = readRowType;
+ this.blobIndex = blobIndex;
+ this.lastRowId = lastRowId;
+
+ this.nextFileIndex = 0;
+ this.nextRowRangeIndex = 0;
+ setNextRowId(firstRowId);
+
+ this.placeholderRow = null;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator readBatch() throws IOException {
+ while (true) {
+ if (currentReader != null) {
+ RecordIterator batch = currentReader.readBatch();
+ if (batch != null) {
+ return batch;
+ }
+ // row ranges have been pushed to readers
+ // directly set nextRowId as the lastRowId + 1
+ setNextRowId(lastRowId(currentFile) + 1);
+ closeCurrentFileReader();
+ continue;
+ }
+
+ if (nextRowId > lastRowId) {
+ return null;
+ }
+
+ // skip files whose ranges are before nextRowId
+ while (nextFileIndex < files.size()
+ && lastRowId(files.get(nextFileIndex)) < nextRowId) {
+ nextFileIndex++;
+ }
+ if (nextFileIndex >= files.size()) {
+ return placeHolderBatch(lastRowId);
+ }
+
+ DataFileMeta nextFile = files.get(nextFileIndex);
+ if (nextFile.nonNullFirstRowId() > nextRowId) {
+ return placeHolderBatch(nextFile.nonNullFirstRowId() - 1);
+ }
+
+ createReader(nextFile);
+ }
+ }
+
+ /**
+ * Set nextRowId and try to move to the next selected row id. So the final nextRowId may be
+ * greater than the input value.
+ */
+ private void setNextRowId(long nextRowId) {
+ this.nextRowId = nextRowId;
+ tryMoveToSelectedRow();
+ }
+
+ private void tryMoveToSelectedRow() {
+ if (nextRowId > lastRowId || rowRanges == null) {
+ return;
+ }
+
+ while (nextRowRangeIndex < rowRanges.size()) {
+ Range range = rowRanges.get(nextRowRangeIndex);
+ if (nextRowId >= range.from && nextRowId <= range.to) {
+ // if nextRowId is within the range, do not need to move
+ return;
+ } else if (nextRowId < range.from) {
+ // else if nextRowId < next range, move to next range's `from`
+ nextRowId = range.from;
+ return;
+ }
+ // else nextRowId > range.to, try next range
+ nextRowRangeIndex++;
+ }
+
+ // all ranges consumed, no need to read
+ nextRowId = lastRowId + 1;
+ }
+
+ private RecordIterator placeHolderBatch(long endRowId) {
+ return new RecordIterator() {
+ long rowId;
+
+ @Nullable
+ @Override
+ public InternalRow next() {
+ rowId = nextRowId;
+ if (rowId > endRowId) {
+ return null;
+ }
+ setNextRowId(rowId + 1);
+ return placeHolderRow();
+ }
+
+ @Override
+ public void releaseBatch() {
+ // nothing to release
+ }
+ };
+ }
+
+ private InternalRow placeHolderRow() {
+ if (placeholderRow == null) {
+ GenericRow row = new GenericRow(readRowType.getFieldCount());
+ row.setField(blobIndex, Blob.PLACE_HOLDER);
+ placeholderRow = row;
+ }
+ return placeholderRow;
+ }
+
+ private long lastRowId(DataFileMeta file) {
+ return file.nonNullFirstRowId() + file.rowCount() - 1;
+ }
+
+ private void closeCurrentFileReader() throws IOException {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ currentFile = null;
+ }
+
+ private void createReader(DataFileMeta nextFile) throws IOException {
+ currentFile = nextFile;
+ currentReader = readerFactory.create(nextFile);
+ nextFileIndex++;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeCurrentFileReader();
+ }
+ }
+
+ /** Factory to create readers. */
+ interface BlobFileReaderFactory {
+ RecordReader create(DataFileMeta file) throws IOException;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index c2f10b9a2b0c..0d2cd1015662 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -46,10 +46,12 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.Builder;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;
import org.apache.paimon.utils.RoaringBitmap32;
@@ -228,7 +230,8 @@ private DataEvolutionFileReader createUnionReader(
|| isVectorStoreFile(file.fileName()),
"Only blob/vector-store files need to call this method.");
return schemaFetcher.apply(file.schemaId()).logicalRowType();
- });
+ },
+ rowRanges != null);
long rowCount = fieldsFiles.get(0).rowCount();
long firstRowId = fieldsFiles.get(0).files().get(0).nonNullFirstRowId();
@@ -302,15 +305,16 @@ private DataEvolutionFileReader createUnionReader(
dataSchema,
readFields,
false));
+ RowType partialReadRowType = new RowType(readFields);
fileRecordReaders[i] =
new ForceSingleBatchReader(
- createFileReader(
+ createFieldBunchReader(
partition,
bunch,
dataFilePathFactory,
formatReaderMapping,
rowRanges,
- readRowType));
+ partialReadRowType));
}
}
@@ -327,6 +331,84 @@ private DataEvolutionFileReader createUnionReader(
return new DataEvolutionFileReader(rowOffsets, fieldOffsets, fileRecordReaders);
}
+ private RecordReader createFieldBunchReader(
+ BinaryRow partition,
+ FieldBunch bunch,
+ DataFilePathFactory dataFilePathFactory,
+ FormatReaderMapping formatReaderMapping,
+ List rowRanges,
+ RowType readRowType)
+ throws IOException {
+ if (bunch instanceof DataBunch) {
+ // for data bunch, directly read the single file
+ return createFileReader(
+ partition,
+ bunch.files().get(0),
+ dataFilePathFactory,
+ formatReaderMapping,
+ rowRanges,
+ readRowType);
+ } else if (bunch instanceof VectorFileBunch) {
+ // for vector bunch, sequential read all data files and concat them
+ List> readerSuppliers = new ArrayList<>();
+ for (DataFileMeta file : bunch.files()) {
+ RoaringBitmap32 selection = file.toFileSelection(rowRanges);
+ FormatReaderContext formatReaderContext =
+ new FormatReaderContext(
+ fileIO,
+ dataFilePathFactory.toPath(file),
+ file.fileSize(),
+ selection);
+ readerSuppliers.add(
+ () ->
+ new DataFileRecordReader(
+ readRowType,
+ formatReaderMapping.getReaderFactory(),
+ formatReaderContext,
+ coreOptions.scanIgnoreCorruptFile(),
+ coreOptions.scanIgnoreLostFile(),
+ formatReaderMapping.getIndexMapping(),
+ formatReaderMapping.getCastMapping(),
+ PartitionUtils.create(
+ formatReaderMapping.getPartitionPair(), partition),
+ true,
+ file.firstRowId(),
+ file.maxSequenceNumber(),
+ formatReaderMapping.getSystemFields()));
+ }
+ return ConcatRecordReader.create(readerSuppliers);
+ } else if (bunch instanceof BlobFileBunch) {
+ // for blob funch, fallback on placeholders
+ int blobIndex = findBlobFieldIndex(readRowType);
+ checkArgument(blobIndex >= 0, "Blob bunch read type should contain a blob field.");
+ return new BlobFallbackRecordReader(
+ bunch.files(),
+ bunch.rowCount(),
+ file ->
+ createFileReader(
+ partition,
+ file,
+ dataFilePathFactory,
+ formatReaderMapping,
+ rowRanges,
+ readRowType),
+ rowRanges,
+ readRowType,
+ blobIndex);
+ } else {
+ throw new UnsupportedOperationException("Unsupported bunch type: " + bunch);
+ }
+ }
+
+ private static int findBlobFieldIndex(RowType rowType) {
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
private FileRecordReader createFileReader(
BinaryRow partition,
DataFilePathFactory dataFilePathFactory,
@@ -351,49 +433,6 @@ private FileRecordReader createFileReader(
partition, file, dataFilePathFactory, formatReaderMapping, rowRanges, readRowType);
}
- private RecordReader createFileReader(
- BinaryRow partition,
- FieldBunch bunch,
- DataFilePathFactory dataFilePathFactory,
- FormatReaderMapping formatReaderMapping,
- List rowRanges,
- RowType readRowType)
- throws IOException {
- if (bunch.files().size() == 1) {
- return createFileReader(
- partition,
- bunch.files().get(0),
- dataFilePathFactory,
- formatReaderMapping,
- rowRanges,
- readRowType);
- }
- List> readerSuppliers = new ArrayList<>();
- for (DataFileMeta file : bunch.files()) {
- RoaringBitmap32 selection = file.toFileSelection(rowRanges);
- FormatReaderContext formatReaderContext =
- new FormatReaderContext(
- fileIO, dataFilePathFactory.toPath(file), file.fileSize(), selection);
- readerSuppliers.add(
- () ->
- new DataFileRecordReader(
- readRowType,
- formatReaderMapping.getReaderFactory(),
- formatReaderContext,
- coreOptions.scanIgnoreCorruptFile(),
- coreOptions.scanIgnoreLostFile(),
- formatReaderMapping.getIndexMapping(),
- formatReaderMapping.getCastMapping(),
- PartitionUtils.create(
- formatReaderMapping.getPartitionPair(), partition),
- true,
- file.firstRowId(),
- file.maxSequenceNumber(),
- formatReaderMapping.getSystemFields()));
- }
- return ConcatRecordReader.create(readerSuppliers);
- }
-
private FileRecordReader createFileReader(
BinaryRow partition,
DataFileMeta file,
@@ -433,8 +472,8 @@ public static List splitFieldBunches(
Function fileToRowType,
boolean rowIdPushDown) {
List fieldsFiles = new ArrayList<>();
- Map blobBunchMap = new HashMap<>();
- Map vectorStoreBunchMap = new TreeMap<>();
+ Map blobBunchMap = new HashMap<>();
+ Map vectorStoreBunchMap = new TreeMap<>();
long rowCount = -1;
for (DataFileMeta file : needMergeFiles) {
if (isBlobFile(file.fileName())) {
@@ -443,8 +482,7 @@ public static List splitFieldBunches(
final long expectedRowCount = rowCount;
blobBunchMap
.computeIfAbsent(
- fieldId,
- key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown))
+ fieldId, key -> new BlobFileBunch(expectedRowCount, rowIdPushDown))
.add(file);
} else if (isVectorStoreFile(file.fileName())) {
RowType rowType = fileToRowType.apply(file);
@@ -456,7 +494,7 @@ public static List splitFieldBunches(
vectorStoreBunchMap
.computeIfAbsent(
vectorStoreKey,
- key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown))
+ key -> new VectorFileBunch(expectedRowCount, rowIdPushDown))
.add(file);
} else {
// Normal file, just add it to the current merge split
@@ -496,8 +534,67 @@ public List files() {
}
}
+ /**
+ * The {@link FieldBunch} for blobs. Compared to {@link VectorFileBunch} which only contains
+ * data files of the max max_seq number, {@link BlobFileBunch} contains all blob files.
+ */
@VisibleForTesting
- static class SpecialFieldBunch implements FieldBunch {
+ static class BlobFileBunch implements FieldBunch {
+
+ final List files;
+ final List ranges;
+ final long expectedRowCount;
+ final boolean rowIdPushdown;
+
+ BlobFileBunch(long expectedRowCount, boolean rowIdPushdown) {
+ this.files = new ArrayList<>();
+ this.expectedRowCount = expectedRowCount;
+ this.ranges = new ArrayList<>();
+ this.rowIdPushdown = rowIdPushdown;
+ }
+
+ void add(DataFileMeta file) {
+ if (!isBlobFile(file.fileName())) {
+ throw new IllegalArgumentException("Only blob file can be added to this bunch.");
+ }
+ if (!files.isEmpty()) {
+ checkArgument(
+ file.writeCols().equals(files.get(0).writeCols()),
+ "All files in this bunch should have the same write columns.");
+ }
+
+ files.add(file);
+ ranges.add(file.nonNullRowIdRange());
+ }
+
+ @Override
+ public long rowCount() {
+ List merged = Range.sortAndMergeOverlap(ranges, true);
+ if (!rowIdPushdown) {
+ Preconditions.checkState(
+ merged.size() == 1,
+ "Blob file bunch should always contain a contiguous row range.");
+
+ long rowCount = merged.get(0).count();
+ Preconditions.checkState(
+ rowCount == expectedRowCount,
+ "The merged rowCount %s of blob file bunch should be aligned with normal files %s.",
+ rowCount,
+ expectedRowCount);
+ }
+
+ return expectedRowCount;
+ }
+
+ @Override
+ public List files() {
+ return files;
+ }
+ }
+
+ /** {@link FieldBunch} for vector-store files. */
+ @VisibleForTesting
+ static class VectorFileBunch implements FieldBunch {
final List files;
final long expectedRowCount;
@@ -508,70 +605,65 @@ static class SpecialFieldBunch implements FieldBunch {
long latestMaxSequenceNumber = -1;
long rowCount;
- SpecialFieldBunch(long expectedRowCount, boolean rowIdPushDown) {
+ VectorFileBunch(long expectedRowCount, boolean rowIdPushDown) {
this.files = new ArrayList<>();
- this.rowCount = 0;
this.expectedRowCount = expectedRowCount;
this.rowIdPushDown = rowIdPushDown;
}
void add(DataFileMeta file) {
- if (!isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName())) {
+ if (!isVectorStoreFile(file.fileName())) {
throw new IllegalArgumentException(
- "Only blob/vector-store file can be added to this bunch.");
+ "Only vector-store file can be added to this bunch.");
}
-
if (file.nonNullFirstRowId() == latestFistRowId) {
if (file.maxSequenceNumber() >= latestMaxSequenceNumber) {
throw new IllegalArgumentException(
- "Blob/vector-store file with same first row id should have decreasing sequence number.");
+ "Vector file with same first row id should have decreasing sequence number.");
}
return;
}
+
if (!files.isEmpty()) {
long firstRowId = file.nonNullFirstRowId();
- if (rowIdPushDown) {
- if (firstRowId < expectedNextFirstRowId) {
- if (file.maxSequenceNumber() > latestMaxSequenceNumber) {
- DataFileMeta lastFile = files.remove(files.size() - 1);
- rowCount -= lastFile.rowCount();
- } else {
- return;
- }
- }
- } else {
- if (firstRowId < expectedNextFirstRowId) {
- checkArgument(
- file.maxSequenceNumber() < latestMaxSequenceNumber,
- "Blob/vector-store file with overlapping row id should have decreasing sequence number.");
+ if (rowIdPushDown && firstRowId < expectedNextFirstRowId) {
+ if (file.maxSequenceNumber() > latestMaxSequenceNumber) {
+ DataFileMeta lastFile = files.remove(files.size() - 1);
+ rowCount -= lastFile.rowCount();
+ } else {
return;
- } else if (firstRowId > expectedNextFirstRowId) {
- throw new IllegalArgumentException(
- "Blob/vector-store file first row id should be continuous, expect "
- + expectedNextFirstRowId
- + " but got "
- + firstRowId);
}
+ } else if (firstRowId < expectedNextFirstRowId) {
+ checkArgument(
+ file.maxSequenceNumber() < latestMaxSequenceNumber,
+ "Vector file with overlapping row id should have decreasing sequence number.");
+ return;
+ } else if (!rowIdPushDown && firstRowId > expectedNextFirstRowId) {
+ throw new IllegalArgumentException(
+ "Vector file first row id should be continuous, expect "
+ + expectedNextFirstRowId
+ + " but got "
+ + firstRowId);
}
+
if (!files.isEmpty()) {
- if (!isBlobFile(file.fileName())) {
- checkArgument(
- file.schemaId() == files.get(0).schemaId(),
- "All files in this bunch should have the same schema id.");
- }
+ checkArgument(
+ file.schemaId() == files.get(0).schemaId(),
+ "All files in this bunch should have the same schema id.");
checkArgument(
file.writeCols().equals(files.get(0).writeCols()),
"All files in this bunch should have the same write columns.");
}
}
+
files.add(file);
rowCount += file.rowCount();
checkArgument(
rowCount <= expectedRowCount,
- "Blob/vector-store files row count exceed the expect " + expectedRowCount);
- this.latestMaxSequenceNumber = file.maxSequenceNumber();
- this.latestFistRowId = file.nonNullFirstRowId();
- this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
+ "Vector files row count exceed the expect " + expectedRowCount);
+ latestMaxSequenceNumber = file.maxSequenceNumber();
+ latestFistRowId = file.nonNullFirstRowId();
+ expectedNextFirstRowId = latestFistRowId + file.rowCount();
}
@Override
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 38aabeda6a7b..8a3e0e5ff98b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -22,6 +22,7 @@
import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
@@ -30,15 +31,22 @@
import org.apache.paimon.data.BlobViewStruct;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.blob.BlobFileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.DataEvolutionSplitRead;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
@@ -48,6 +56,7 @@
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.RowTrackingTable;
import org.apache.paimon.types.DataField;
@@ -150,6 +159,73 @@ public void testBasic() throws Exception {
assertThat(integer.get()).isEqualTo(1000);
}
+ @Test
+ public void testReadBlobPlaceHolderFallback() throws Exception {
+ createTableDefault();
+ writeDataDefault(
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("first"), new BlobData(blobBytes)),
+ GenericRow.of(
+ 2, BinaryString.fromString("second"), new BlobData(blobBytes)),
+ GenericRow.of(
+ 3, BinaryString.fromString("third"), new BlobData(blobBytes))));
+
+ FileStoreTable table = getTableDefault();
+ List files =
+ table.store().newScan().plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+ DataFileMeta dataFile =
+ files.stream()
+ .filter(file -> !BlobFileFormat.isBlobFile(file.fileName()))
+ .findFirst()
+ .get();
+ DataFileMeta oldBlobFile =
+ files.stream()
+ .filter(file -> BlobFileFormat.isBlobFile(file.fileName()))
+ .findFirst()
+ .get();
+
+ byte[] updatedBytes = "updated-blob".getBytes();
+ DataFilePathFactory pathFactory =
+ table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+ DataFileMeta newBlobFile =
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(Blob.PLACE_HOLDER, new BlobData(updatedBytes)),
+ dataFile.nonNullFirstRowId(),
+ oldBlobFile.maxSequenceNumber() + 1,
+ oldBlobFile.schemaId(),
+ oldBlobFile.writeCols());
+
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath(pathFactory.parent().toString())
+ .withDataFiles(Arrays.asList(dataFile, newBlobFile, oldBlobFile))
+ .build();
+
+ DataEvolutionSplitRead read =
+ new DataEvolutionSplitRead(
+ table.fileIO(),
+ table.schemaManager(),
+ table.schema(),
+ table.rowType(),
+ table.coreOptions(),
+ table.store().pathFactory());
+
+ List actual = new ArrayList<>();
+ read.createReader(split).forEachRemaining(row -> actual.add(row.getBlob(2).toData()));
+
+ assertThat(actual.size()).isEqualTo(3);
+ assertThat(actual.get(0)).isEqualTo(blobBytes);
+ assertThat(actual.get(1)).isEqualTo(updatedBytes);
+ assertThat(actual.get(2)).isEqualTo(blobBytes);
+ }
+
@Test
public void testWriteByInputStream() throws Exception {
createTableDefault();
@@ -971,6 +1047,48 @@ private static void writeFile(FileIO fileIO, Path path, byte[] bytes) throws IOE
}
}
+ private static DataFileMeta writeBlobFile(
+ FileIO fileIO,
+ Path path,
+ List blobs,
+ long firstRowId,
+ long maxSequenceNumber,
+ long schemaId,
+ List writeCols)
+ throws IOException {
+ try (PositionOutputStream out = fileIO.newOutputStream(path, false)) {
+ FormatWriter writer =
+ new BlobFileFormat()
+ .createWriterFactory(RowType.of(DataTypes.BLOB()))
+ .create(out, "none");
+ for (Blob blob : blobs) {
+ writer.addElement(GenericRow.of(blob));
+ }
+ writer.close();
+ }
+ return DataFileMeta.create(
+ path.getName(),
+ fileIO.getFileSize(path),
+ blobs.size(),
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ maxSequenceNumber,
+ schemaId,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ 0L,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ writeCols);
+ }
+
private static long countFilesWithSuffix(FileIO fileIO, Path root, String suffix)
throws IOException {
long count = 0;
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
new file mode 100644
index 000000000000..47e2ac80415b
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.BlobFallbackRecordReader.BlobSequenceGroupRecordReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link BlobFallbackRecordReader}. */
+public class BlobFallbackRecordReaderTest {
+
+ private static final int BLOB_INDEX = 0;
+ private static final String BLOB_FIELD = "blob_col";
+ private static final RowType READ_ROW_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(BLOB_INDEX, BLOB_FIELD, DataTypes.BLOB()),
+ new DataField(1, SpecialFields.ROW_ID.name(), DataTypes.BIGINT()),
+ new DataField(
+ 2, SpecialFields.SEQUENCE_NUMBER.name(), DataTypes.BIGINT())));
+
+ @Test
+ public void testBlobSequenceGroupReaderWithRowRanges() throws Exception {
+ List files =
+ Arrays.asList(blobFile("blob1", 0, 3, 10), blobFile("blob2", 5, 2, 10));
+ List rowRanges = ranges(1, 1, 3, 5);
+
+ ReadResult rows = readSequenceGroup(files, rowRanges, 0, 6, 10);
+
+ assertThat(rows.rowIds).containsExactly(1L, 5L);
+ assertThat(rows.placeholderRowCount).isEqualTo(2);
+ assertThat(rows.batchSizes).containsExactly(1, 2, 1);
+ }
+
+ @Test
+ public void testBlobSequenceGroupReaderWithMultipleRangesInFileAndGap() throws Exception {
+ DataFileMeta wideFile = blobFile("wide-file", 20, 31, 10);
+ List wideFileRanges = ranges(9, 10, 19, 20, 25, 30, 35, 40, 43, 45, 48, 52, 55, 56);
+
+ ReadResult wideFileRows =
+ readSequenceGroup(Collections.singletonList(wideFile), wideFileRanges, 10, 60, 10);
+
+ List wideFileActualRowIds = rowIdsInRanges(20, 50, wideFileRanges);
+ assertThat(wideFileRows.rowIds).containsExactlyElementsOf(wideFileActualRowIds);
+ assertThat(wideFileRows.placeholderRowCount)
+ .isEqualTo(
+ rowIdsInRanges(10, 19, wideFileRanges).size()
+ + rowIdsInRanges(51, 60, wideFileRanges).size());
+ assertThat(wideFileRows.batchSizes)
+ .containsExactlyElementsOf(batchSizes(2, wideFileActualRowIds.size(), 1, 4));
+
+ DataFileMeta firstFile = blobFile("first-file", 0, 11, 10);
+ DataFileMeta secondFile = blobFile("second-file", 50, 11, 10);
+ List gapRanges = ranges(0, 0, 9, 12, 25, 30, 35, 40, 43, 45, 48, 52, 58, 62);
+
+ ReadResult gapRows =
+ readSequenceGroup(Arrays.asList(firstFile, secondFile), gapRanges, 0, 62, 10);
+
+ assertThat(gapRows.rowIds)
+ .containsExactlyElementsOf(
+ concat(
+ rowIdsInRanges(0, 10, gapRanges),
+ rowIdsInRanges(50, 60, gapRanges)));
+ assertThat(gapRows.placeholderRowCount)
+ .isEqualTo(
+ rowIdsInRanges(11, 49, gapRanges).size()
+ + rowIdsInRanges(61, 62, gapRanges).size());
+ assertThat(gapRows.batchSizes).containsExactly(1, 1, 1, 19, 1, 1, 1, 1, 1, 1, 2);
+ }
+
+ @Test
+ public void testBlobFallbackRecordReader() throws Exception {
+ DataFileMeta newFile = blobFile("new-file", 0, 3, 2);
+ DataFileMeta oldFile = blobFile("old-file", 0, 5, 1);
+
+ ReadResult rows =
+ readFallback(Arrays.asList(newFile, oldFile), 5, null, placeholderRows(newFile, 1));
+
+ assertThat(rows.rowIds).containsExactly(0L, 1L, 2L, 3L, 4L);
+ assertThat(rows.sequenceNumbers).containsExactly(2L, 1L, 2L, 1L, 1L);
+ }
+
+ @Test
+ public void testBlobFallbackRecordReaderThrowsIfAllRowsArePlaceholders() {
+ DataFileMeta newFile = blobFile("new-placeholder-file", 0, 1, 2);
+ DataFileMeta oldFile = blobFile("old-placeholder-file", 0, 1, 1);
+
+ assertThatThrownBy(
+ () ->
+ readFallback(
+ Arrays.asList(newFile, oldFile),
+ 1,
+ null,
+ placeholderRows(newFile, 0, oldFile, 0)))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("all blob files at the same row id store a placeholder");
+ }
+
+ @Test
+ public void testBlobFallbackRecordReaderWithRowRanges() throws Exception {
+ DataFileMeta oldFile = blobFile("old-file", 20, 26, 1);
+ DataFileMeta newFile1 = blobFile("new-file-1", 20, 11, 2);
+ DataFileMeta newFile2 = blobFile("new-file-2", 40, 6, 2);
+ List rowRanges = ranges(15, 20, 25, 26, 29, 33, 35, 41);
+
+ ReadResult rows =
+ readFallback(
+ Arrays.asList(newFile2, oldFile, newFile1),
+ 26,
+ rowRanges,
+ Collections.emptySet());
+
+ assertThat(rows.rowIds)
+ .containsExactly(
+ 20L, 25L, 26L, 29L, 30L, 31L, 32L, 33L, 35L, 36L, 37L, 38L, 39L, 40L, 41L);
+ assertThat(rows.sequenceNumbers)
+ .containsExactly(2L, 2L, 2L, 2L, 2L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 2L, 2L);
+ }
+
+ private static ReadResult readFallback(
+ List files,
+ long rowCount,
+ List rowRanges,
+ Set placeholderRows)
+ throws Exception {
+ return ReadResult.read(
+ new BlobFallbackRecordReader(
+ files,
+ rowCount,
+ file -> oneRowPerBatchReader(fileRows(file, rowRanges, placeholderRows)),
+ rowRanges,
+ READ_ROW_TYPE,
+ BLOB_INDEX));
+ }
+
+ private static ReadResult readSequenceGroup(
+ List files,
+ List rowRanges,
+ long firstRowId,
+ long lastRowId,
+ long sequenceNumber)
+ throws Exception {
+ return ReadResult.read(
+ new BlobSequenceGroupRecordReader(
+ files,
+ file -> oneRowPerBatchReader(fileRows(file, rowRanges)),
+ rowRanges,
+ READ_ROW_TYPE,
+ BLOB_INDEX,
+ firstRowId,
+ lastRowId));
+ }
+
+ private static DataFileMeta blobFile(
+ String fileName, long firstRowId, long rowCount, long maxSequenceNumber) {
+ return DataFileMeta.create(
+ fileName + ".blob",
+ rowCount,
+ rowCount,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ maxSequenceNumber,
+ 0L,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ rowCount,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ Arrays.asList(BLOB_FIELD));
+ }
+
+ private static List ranges(long... bounds) {
+ if (bounds.length % 2 != 0) {
+ throw new IllegalArgumentException("Range bounds should be paired.");
+ }
+
+ List ranges = new ArrayList<>();
+ for (int i = 0; i < bounds.length; i += 2) {
+ ranges.add(new Range(bounds[i], bounds[i + 1]));
+ }
+ return ranges;
+ }
+
+ private static List fileRows(DataFileMeta file, List rowRanges) {
+ return fileRows(file, rowRanges, Collections.emptySet());
+ }
+
+ private static List fileRows(
+ DataFileMeta file, List rowRanges, Set placeholderRows) {
+ List rows = new ArrayList<>();
+ long lastRowId = file.nonNullFirstRowId() + file.rowCount() - 1;
+ for (long rowId = file.nonNullFirstRowId(); rowId <= lastRowId; rowId++) {
+ if (selected(rowId, rowRanges)) {
+ rows.add(
+ blobRow(
+ rowId,
+ file.maxSequenceNumber(),
+ placeholderRows.contains(rowKey(file, rowId))));
+ }
+ }
+ return rows;
+ }
+
+ private static boolean selected(long rowId, List rowRanges) {
+ if (rowRanges == null) {
+ return true;
+ }
+ for (Range range : rowRanges) {
+ if (rowId < range.from) {
+ return false;
+ }
+ if (rowId <= range.to) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static List rowIdsInRanges(long firstRowId, long lastRowId, List ranges) {
+ List rowIds = new ArrayList<>();
+ for (long rowId = firstRowId; rowId <= lastRowId; rowId++) {
+ if (selected(rowId, ranges)) {
+ rowIds.add(rowId);
+ }
+ }
+ return rowIds;
+ }
+
+ private static List concat(List first, List second) {
+ List all = new ArrayList<>(first);
+ all.addAll(second);
+ return all;
+ }
+
+ private static List batchSizes(
+ int firstBatchSize, int repeatedBatchCount, int repeatedBatchSize, int lastBatchSize) {
+ List batchSizes = new ArrayList<>();
+ batchSizes.add(firstBatchSize);
+ for (int i = 0; i < repeatedBatchCount; i++) {
+ batchSizes.add(repeatedBatchSize);
+ }
+ batchSizes.add(lastBatchSize);
+ return batchSizes;
+ }
+
+ private static Set placeholderRows(DataFileMeta file, long rowId) {
+ Set keys = new HashSet<>();
+ keys.add(rowKey(file, rowId));
+ return keys;
+ }
+
+ private static Set placeholderRows(
+ DataFileMeta firstFile, long firstRowId, DataFileMeta secondFile, long secondRowId) {
+ Set keys = placeholderRows(firstFile, firstRowId);
+ keys.add(rowKey(secondFile, secondRowId));
+ return keys;
+ }
+
+ private static String rowKey(DataFileMeta file, long rowId) {
+ return file.fileName() + "#" + rowId;
+ }
+
+ private static InternalRow blobRow(long rowId, long sequenceNumber, boolean placeholder) {
+ GenericRow row = new GenericRow(3);
+ row.setField(
+ BLOB_INDEX,
+ placeholder ? Blob.PLACE_HOLDER : new BlobData(new byte[] {(byte) rowId}));
+ row.setField(1, rowId);
+ row.setField(2, sequenceNumber);
+ return row;
+ }
+
+ private static RecordReader oneRowPerBatchReader(List rows) {
+ return new RecordReader() {
+
+ int index;
+
+ @Override
+ public RecordIterator readBatch() {
+ if (index >= rows.size()) {
+ return null;
+ }
+ InternalRow row = rows.get(index++);
+ return new RecordIterator() {
+
+ boolean returned;
+
+ @Override
+ public InternalRow next() {
+ if (returned) {
+ return null;
+ }
+ returned = true;
+ return row;
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+
+ private static class ReadResult {
+ final List rowIds = new ArrayList<>();
+ final List sequenceNumbers = new ArrayList<>();
+ final List batchSizes = new ArrayList<>();
+ int placeholderRowCount;
+
+ static ReadResult read(RecordReader reader) throws Exception {
+ try {
+ ReadResult result = new ReadResult();
+ RecordIterator batch;
+ while ((batch = reader.readBatch()) != null) {
+ int batchSize = 0;
+ InternalRow row;
+ while ((row = batch.next()) != null) {
+ result.add(row);
+ batchSize++;
+ }
+ batch.releaseBatch();
+ result.batchSizes.add(batchSize);
+ }
+ return result;
+ } finally {
+ reader.close();
+ }
+ }
+
+ private void add(InternalRow row) {
+ if (row.getBlob(BLOB_INDEX) == Blob.PLACE_HOLDER) {
+ placeholderRowCount++;
+ } else {
+ rowIds.add(row.getLong(1));
+ sequenceNumbers.add(row.getLong(2));
+ }
+ }
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index 9bb9a7274ed1..e240b7fb422c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -21,8 +21,9 @@
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.DataEvolutionSplitRead.BlobFileBunch;
import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
-import org.apache.paimon.operation.DataEvolutionSplitRead.SpecialFieldBunch;
+import org.apache.paimon.operation.DataEvolutionSplitRead.VectorFileBunch;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -42,150 +43,149 @@
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link SpecialFieldBunch}. */
+/** Tests for blob and vector field bunches. */
public class DataEvolutionReadTest {
- private SpecialFieldBunch blobBunch;
+ private VectorFileBunch vectorBunch;
@BeforeEach
public void setUp() {
- blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, false);
+ vectorBunch = new VectorFileBunch(Long.MAX_VALUE, false);
}
@Test
- public void testAddSingleBlobEntry() {
- DataFileMeta blobEntry = createBlobFile("blob1", 0L, 100L, 1L);
+ public void testAddSingleVectorEntry() {
+ DataFileMeta vectorEntry = createVectorFile("vector1", 0L, 100L, 1L);
- blobBunch.add(blobEntry);
+ vectorBunch.add(vectorEntry);
- assertThat(blobBunch.files).hasSize(1);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
- assertThat(blobBunch.rowCount()).isEqualTo(100);
- assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
- assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ assertThat(vectorBunch.files).hasSize(1);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry);
+ assertThat(vectorBunch.rowCount()).isEqualTo(100);
+ assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+ assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
}
@Test
- public void testAddBlobEntryAndTail() {
- DataFileMeta blobEntry = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobTail = createBlobFile("blob2", 100, 200, 1);
-
- blobBunch.add(blobEntry);
- blobBunch.add(blobTail);
-
- assertThat(blobBunch.files).hasSize(2);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
- assertThat(blobBunch.files.get(1)).isEqualTo(blobTail);
- assertThat(blobBunch.rowCount()).isEqualTo(300);
- assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
- assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
- assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L);
+ public void testAddVectorEntryAndTail() {
+ DataFileMeta vectorEntry = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorTail = createVectorFile("vector2", 100, 200, 1);
+
+ vectorBunch.add(vectorEntry);
+ vectorBunch.add(vectorTail);
+
+ assertThat(vectorBunch.files).hasSize(2);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry);
+ assertThat(vectorBunch.files.get(1)).isEqualTo(vectorTail);
+ assertThat(vectorBunch.rowCount()).isEqualTo(300);
+ assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+ assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
+ assertThat(vectorBunch.files.get(0).schemaId()).isEqualTo(0L);
}
@Test
- public void testAddNonBlobFileThrowsException() {
+ public void testAddNonVectorFileThrowsException() {
DataFileMeta normalFile = createNormalFile("normal1.parquet", 0, 100, 1, 0L);
- assertThatThrownBy(() -> blobBunch.add(normalFile))
+ assertThatThrownBy(() -> vectorBunch.add(normalFile))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Only blob/vector-store file can be added to this bunch.");
+ .hasMessage("Only vector-store file can be added to this bunch.");
}
@Test
- public void testAddBlobFileWithSameFirstRowId() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 2);
+ public void testAddVectorFileWithSameFirstRowId() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 2);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with same firstRowId but higher sequence number should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob/vector-store file with same first row id should have decreasing sequence number.");
+ "Vector file with same first row id should have decreasing sequence number.");
}
@Test
- public void testAddBlobFileWithSameFirstRowIdAndLowerSequenceNumber() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 1);
+ public void testAddVectorFileWithSameFirstRowIdAndLowerSequenceNumber() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 1);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with same firstRowId and lower sequence number should be ignored
- blobBunch.add(blobEntry2);
+ vectorBunch.add(vectorEntry2);
- assertThat(blobBunch.files).hasSize(1);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+ assertThat(vectorBunch.files).hasSize(1);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1);
}
@Test
- public void testAddBlobFileWithOverlappingRowId() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 1);
+ public void testAddVectorFileWithOverlappingRowId() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 1);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with overlapping row id and lower sequence number should be ignored
- blobBunch.add(blobEntry2);
+ vectorBunch.add(vectorEntry2);
- assertThat(blobBunch.files).hasSize(1);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+ assertThat(vectorBunch.files).hasSize(1);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1);
}
@Test
- public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 2);
+ public void testAddVectorFileWithOverlappingRowIdAndHigherSequenceNumber() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 2);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with overlapping row id and higher sequence number should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob/vector-store file with overlapping row id should have decreasing sequence number.");
+ "Vector file with overlapping row id should have decreasing sequence number.");
}
@Test
- public void testAddBlobFileWithNonContinuousRowId() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
+ public void testAddVectorFileWithNonContinuousRowId() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with non-continuous row id should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob/vector-store file first row id should be continuous, expect 100 but got 200");
+ "Vector file first row id should be continuous, expect 100 but got 200");
}
@Test
- public void testAddBlobFileWithDifferentWriteCols() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 =
- createBlobFileWithCols("blob2", 100, 200, 1, Arrays.asList("different_col"));
+ public void testAddVectorFileWithDifferentWriteCols() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 =
+ createVectorFileWithCols("vector2", 100, 200, 1, Arrays.asList("different_col"));
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with different write columns should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("All files in this bunch should have the same write columns.");
}
@Test
- public void testComplexBlobBunchScenario() {
- // Create a complex scenario with multiple blob entries and a tail
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 100, 200, 1);
- DataFileMeta blobEntry3 = createBlobFile("blob3", 300, 300, 1);
- DataFileMeta blobTail = createBlobFile("blob4", 600, 400, 1);
-
- blobBunch.add(blobEntry1);
- blobBunch.add(blobEntry2);
- blobBunch.add(blobEntry3);
- blobBunch.add(blobTail);
-
- assertThat(blobBunch.files).hasSize(4);
- assertThat(blobBunch.rowCount()).isEqualTo(1000);
- assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
- assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ public void testComplexVectorBunchScenario() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 100, 200, 1);
+ DataFileMeta vectorEntry3 = createVectorFile("vector3", 300, 300, 1);
+ DataFileMeta vectorTail = createVectorFile("vector4", 600, 400, 1);
+
+ vectorBunch.add(vectorEntry1);
+ vectorBunch.add(vectorEntry2);
+ vectorBunch.add(vectorEntry3);
+ vectorBunch.add(vectorTail);
+
+ assertThat(vectorBunch.files).hasSize(4);
+ assertThat(vectorBunch.rowCount()).isEqualTo(1000);
+ assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+ assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
}
@Test
@@ -209,26 +209,31 @@ public void testComplexBlobBunchScenario2() {
List batch = batches.get(0);
- assertThat(batch.get(1).fileName()).contains("blob5"); // pick
- assertThat(batch.get(2).fileName()).contains("blob2"); // skip
- assertThat(batch.get(3).fileName()).contains("blob1"); // skip
- assertThat(batch.get(4).fileName()).contains("blob9"); // pick
- assertThat(batch.get(5).fileName()).contains("blob6"); // skip
- assertThat(batch.get(6).fileName()).contains("blob3"); // skip
- assertThat(batch.get(7).fileName()).contains("blob7"); // pick
- assertThat(batch.get(8).fileName()).contains("blob4"); // skip
- assertThat(batch.get(9).fileName()).contains("blob8"); // pick
+ assertThat(batch.get(1).fileName()).contains("blob5");
+ assertThat(batch.get(2).fileName()).contains("blob2");
+ assertThat(batch.get(3).fileName()).contains("blob1");
+ assertThat(batch.get(4).fileName()).contains("blob9");
+ assertThat(batch.get(5).fileName()).contains("blob6");
+ assertThat(batch.get(6).fileName()).contains("blob3");
+ assertThat(batch.get(7).fileName()).contains("blob7");
+ assertThat(batch.get(8).fileName()).contains("blob4");
+ assertThat(batch.get(9).fileName()).contains("blob8");
List fieldBunches =
splitFieldBunches(batch, file -> makeBlobRowType(file.writeCols(), f -> 0));
assertThat(fieldBunches.size()).isEqualTo(2);
- SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
- assertThat(blobBunch.files).hasSize(4);
+ BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1);
+ assertThat(blobBunch.files).hasSize(9);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
- assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
- assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
- assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob2");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob1");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob9");
+ assertThat(blobBunch.files.get(4).fileName()).contains("blob6");
+ assertThat(blobBunch.files.get(5).fileName()).contains("blob3");
+ assertThat(blobBunch.files.get(6).fileName()).contains("blob7");
+ assertThat(blobBunch.files.get(7).fileName()).contains("blob4");
+ assertThat(blobBunch.files.get(8).fileName()).contains("blob8");
}
@Test
@@ -275,19 +280,29 @@ public void testComplexBlobBunchScenario3() {
batch, file -> makeBlobRowType(file.writeCols(), String::hashCode));
assertThat(fieldBunches.size()).isEqualTo(3);
- SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
- assertThat(blobBunch.files).hasSize(4);
+ BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1);
+ assertThat(blobBunch.files).hasSize(9);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
- assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
- assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
- assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
-
- blobBunch = (SpecialFieldBunch) fieldBunches.get(2);
- assertThat(blobBunch.files).hasSize(4);
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob2");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob1");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob9");
+ assertThat(blobBunch.files.get(4).fileName()).contains("blob6");
+ assertThat(blobBunch.files.get(5).fileName()).contains("blob3");
+ assertThat(blobBunch.files.get(6).fileName()).contains("blob7");
+ assertThat(blobBunch.files.get(7).fileName()).contains("blob4");
+ assertThat(blobBunch.files.get(8).fileName()).contains("blob8");
+
+ blobBunch = (BlobFileBunch) fieldBunches.get(2);
+ assertThat(blobBunch.files).hasSize(9);
assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
- assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
- assertThat(blobBunch.files.get(2).fileName()).contains("blob17");
- assertThat(blobBunch.files.get(3).fileName()).contains("blob18");
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob12");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob11");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob19");
+ assertThat(blobBunch.files.get(4).fileName()).contains("blob16");
+ assertThat(blobBunch.files.get(5).fileName()).contains("blob13");
+ assertThat(blobBunch.files.get(6).fileName()).contains("blob17");
+ assertThat(blobBunch.files.get(7).fileName()).contains("blob14");
+ assertThat(blobBunch.files.get(8).fileName()).contains("blob18");
}
/** Creates a blob file with the specified parameters. */
@@ -357,8 +372,81 @@ private DataFileMeta createBlobFileWithCols(
writeCols);
}
+ private DataFileMeta createVectorFile(
+ String fileName, long firstRowId, long rowCount, long maxSequenceNumber) {
+ return createVectorFileWithCols(
+ fileName, firstRowId, rowCount, maxSequenceNumber, Arrays.asList("vector_col"));
+ }
+
+ private DataFileMeta createVectorFileWithSchema(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ long schemaId) {
+ return createFile(
+ fileName + ".vector.avro",
+ firstRowId,
+ rowCount,
+ maxSequenceNumber,
+ schemaId,
+ Arrays.asList("vector_col"));
+ }
+
+ private DataFileMeta createVectorFileWithCols(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ List writeCols) {
+ return createFile(
+ fileName + ".vector.avro", firstRowId, rowCount, maxSequenceNumber, 0L, writeCols);
+ }
+
+ private DataFileMeta createFile(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ long schemaId,
+ List writeCols) {
+ return DataFileMeta.create(
+ fileName,
+ rowCount,
+ rowCount,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ maxSequenceNumber,
+ schemaId,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ rowCount,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ writeCols);
+ }
+
+ @Test
+ void testAddVectorFilesWithDifferentSchemaId() {
+ DataFileMeta vectorEntry1 = createVectorFileWithSchema("vector1", 0, 100, 1, 0L);
+ DataFileMeta vectorEntry2 = createVectorFileWithSchema("vector2", 100, 200, 1, 1L);
+
+ vectorBunch.add(vectorEntry1);
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("All files in this bunch should have the same schema id.");
+ }
+
@Test
void testAddBlobFilesWithDifferentSchemaId() {
+ BlobFileBunch blobBunch = new BlobFileBunch(300, false);
DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1, 0L);
DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200, 1, 1L);
@@ -373,24 +461,24 @@ void testAddBlobFilesWithDifferentSchemaId() {
@Test
public void testRowIdPushDown() {
- SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true);
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
- blobBunch.add(blobEntry1);
- SpecialFieldBunch finalBlobBunch = blobBunch;
- DataFileMeta finalBlobEntry = blobEntry2;
- assertThatCode(() -> finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException();
-
- blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true);
- blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- blobEntry2 = createBlobFile("blob2", 50, 200, 2);
- blobBunch.add(blobEntry1);
- blobBunch.add(blobEntry2);
- assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2);
-
- SpecialFieldBunch finalBlobBunch2 = blobBunch;
- DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2);
- assertThatCode(() -> finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException();
+ VectorFileBunch vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true);
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1);
+ vectorBunch.add(vectorEntry1);
+ VectorFileBunch finalVectorBunch = vectorBunch;
+ DataFileMeta finalVectorEntry = vectorEntry2;
+ assertThatCode(() -> finalVectorBunch.add(finalVectorEntry)).doesNotThrowAnyException();
+
+ vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true);
+ vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ vectorEntry2 = createVectorFile("vector2", 50, 200, 2);
+ vectorBunch.add(vectorEntry1);
+ vectorBunch.add(vectorEntry2);
+ assertThat(vectorBunch.files).containsExactlyInAnyOrder(vectorEntry2);
+
+ VectorFileBunch finalVectorBunch2 = vectorBunch;
+ DataFileMeta vectorEntry3 = createVectorFile("vector2", 250, 100, 2);
+ assertThatCode(() -> finalVectorBunch2.add(vectorEntry3)).doesNotThrowAnyException();
}
/** Creates a normal (non-blob) file for testing. */
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
index 27a1ed56170a..d6d87542315f 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
@@ -42,7 +42,7 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm
byte[] header = new byte[5];
IOUtils.readFully(in, header);
byte version = header[4];
- if (version != 1) {
+ if (version != 1 && version != 2) {
throw new IOException("Unsupported version: " + version);
}
int indexLength = BytesUtils.getInt(header, 0);
@@ -55,7 +55,7 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm
long[] blobOffsets = new long[blobLengths.length];
long offset = 0;
for (int i = 0; i < blobLengths.length; i++) {
- if (blobLengths[i] == -1) {
+ if (blobLengths[i] < 0) {
blobOffsets[i] = -1;
} else {
blobOffsets[i] = offset;
@@ -86,7 +86,11 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm
}
public boolean isNull(int i) {
- return blobLengths[i] == -1;
+ return blobLengths[i] == BlobFormatWriter.NULL_LENGTH;
+ }
+
+ public boolean isPlaceHolder(int i) {
+ return blobLengths[i] == BlobFormatWriter.PLACE_HOLDER_LENGTH;
}
public long blobLength(int i) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 801964b86239..489e1d7c619b 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -94,6 +94,8 @@ public InternalRow next() {
Blob blob;
if (fileMeta.isNull(currentPosition)) {
blob = null;
+ } else if (fileMeta.isPlaceHolder(currentPosition)) {
+ blob = Blob.PLACE_HOLDER;
} else {
long offset = fileMeta.blobOffset(currentPosition) + 4;
long length = fileMeta.blobLength(currentPosition) - 16;
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
index 83d6c0eb915d..d032e1b043ea 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
@@ -43,9 +43,11 @@
/** {@link FormatWriter} for blob file. */
public class BlobFormatWriter implements FileAwareFormatWriter {
- public static final byte VERSION = 1;
+ public static final byte VERSION = 2;
public static final int MAGIC_NUMBER = 1481511375;
public static final byte[] MAGIC_NUMBER_BYTES = intToLittleEndian(MAGIC_NUMBER);
+ public static final long NULL_LENGTH = -1L;
+ public static final long PLACE_HOLDER_LENGTH = -2L;
private final PositionOutputStream out;
@Nullable private final BlobConsumer writeConsumer;
@@ -81,13 +83,17 @@ public boolean deleteFileUponAbort() {
public void addElement(InternalRow element) throws IOException {
checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only support one field.");
if (element.isNullAt(0)) {
- lengths.add(-1L);
+ lengths.add(NULL_LENGTH);
if (writeConsumer != null) {
writeConsumer.accept(blobFieldName, null);
}
return;
}
Blob blob = element.getBlob(0);
+ if (blob == Blob.PLACE_HOLDER) {
+ lengths.add(PLACE_HOLDER_LENGTH);
+ return;
+ }
long previousPos = out.getPos();
crc32.reset();
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index 0b66e15c2b88..028cc023d1dd 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -33,6 +33,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DeltaVarintCompressor;
import org.apache.paimon.utils.RoaringBitmap32;
import org.junit.jupiter.api.BeforeEach;
@@ -44,7 +45,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.zip.CRC32;
+import static org.apache.paimon.utils.StreamUtils.intToLittleEndian;
+import static org.apache.paimon.utils.StreamUtils.longToLittleEndian;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link BlobFileFormat}. */
@@ -73,22 +77,53 @@ public void testReadBlobInlineBytes() throws IOException {
innerTest(false);
}
+ @Test
+ public void testReadLegacyVersionOneBlobFile() throws IOException {
+ BlobFileFormat format = new BlobFileFormat(false);
+ RowType rowType = RowType.of(DataTypes.BLOB());
+ List blobs = Arrays.asList("hello".getBytes(), null, "world".getBytes());
+
+ try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
+ writeLegacyVersionOneBlobFile(out, blobs);
+ }
+
+ FormatReaderFactory readerFactory = format.createReaderFactory(null, rowType, null);
+ FormatReaderContext context =
+ new FormatReaderContext(fileIO, file, fileIO.getFileSize(file));
+ List