createRecordReader(
new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());
}
- return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level());
+ // In snapshot-ordering mode, APPEND files carry the commit snapshot id in
+ // minSequenceNumber (stamped at commit time); override per-record sequence with it so
+ // later snapshots win during merge. COMPACT files already carry the snapshot id in their
+ // per-record _SEQUENCE_NUMBER and are left untouched.
+ boolean overrideSequenceWithSnapshotId = false;
+ if (snapshotSequenceOrdering) {
+ Preconditions.checkState(
+ file.fileSource().isPresent(),
+ "sequence.snapshot-ordering requires data files with fileSource metadata. "
+ + "This option is only safe for newly-created tables or empty tables. "
+ + "Legacy files without fileSource cannot be ordered by commit snapshot id.");
+ overrideSequenceWithSnapshotId = file.fileSource().get() == FileSource.APPEND;
+ }
+ return new KeyValueDataFileRecordReader(
+ fileRecordReader,
+ keyType,
+ valueType,
+ file.level(),
+ overrideSequenceWithSnapshotId,
+ file.minSequenceNumber());
}
public static Builder builder(
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 3f9fdb9f1c0c..e15698f36179 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -989,6 +989,10 @@ CommitResult tryCommitOnce(
deltaFiles = assigned.assignedEntries;
}
+ if (options.snapshotSequenceOrdering()) {
+ deltaFiles = stampSequenceWithSnapshotId(newSnapshotId, commitKind, deltaFiles);
+ }
+
// the added records subtract the deleted records from
long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;
@@ -1265,4 +1269,31 @@ public void close() {
IOUtils.closeAllQuietly(commitCallbacks);
IOUtils.closeQuietly(snapshotCommit);
}
+
+ /**
+ * Stamps the commit snapshot id into {@link DataFileMeta#minSequenceNumber()} / {@link
+ * DataFileMeta#maxSequenceNumber()} of APPEND files, reusing these fields instead of adding a
+ * new one (same pattern as {@link RowTrackingCommitUtils#assignRowTracking}). COMPACT files are
+ * returned unchanged: their input was read through the override path, so their per-record
+ * {@code _SEQUENCE_NUMBER} already carries the snapshot id.
+ *
+ * All records of a snapshot share one id, so intra-snapshot order is not preserved. This is
+ * accepted: the default spillable writer collapses a commit's writes through the merge function
+ * to one record per key before flush, and the feature targets cross-snapshot ordering only.
+ */
+ private static List stampSequenceWithSnapshotId(
+ long snapshotId, CommitKind commitKind, List files) {
+ if (commitKind == CommitKind.COMPACT) {
+ return files;
+ }
+ List result = new ArrayList<>(files.size());
+ for (ManifestEntry entry : files) {
+ if (entry.kind() == FileKind.ADD) {
+ result.add(entry.assignSequenceNumber(snapshotId, snapshotId));
+ } else {
+ result.add(entry);
+ }
+ }
+ return result;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index d07ad2581944..424b9cec7dfd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -105,6 +105,18 @@ public class SchemaValidation {
* @param schema the schema to be validated
*/
public static void validateTableSchema(TableSchema schema) {
+ validateTableSchema(schema, Collections.emptySet());
+ }
+
+ /**
+ * Validate the {@link TableSchema} and {@link CoreOptions}.
+ *
+ * @param schema the schema to be validated
+ * @param dynamicOptionKeys option keys that are overridden dynamically at runtime (e.g. by
+ * dedicated compaction jobs) and should therefore be excluded from certain static
+ * validations such as the {@code write-only} requirement for snapshot ordering
+ */
+ public static void validateTableSchema(TableSchema schema, Set dynamicOptionKeys) {
CoreOptions options = new CoreOptions(schema.options());
validateOnlyContainPrimitiveType(schema.fields(), schema.primaryKeys(), "primary key");
@@ -288,6 +300,10 @@ public static void validateTableSchema(TableSchema schema) {
"deletion-vectors.merge-on-read requires deletion-vectors.enabled to be true.");
}
+ if (options.snapshotSequenceOrdering()) {
+ validateSnapshotSequenceOrdering(schema, options, dynamicOptionKeys);
+ }
+
// vector field names must point to vector type
Set fieldNamesSpecifiedAsVector = options.vectorField();
schema.fields()
@@ -612,6 +628,32 @@ private static void validateFileIndex(TableSchema schema) {
}
}
+ private static void validateSnapshotSequenceOrdering(
+ TableSchema schema, CoreOptions options, Set dynamicOptionKeys) {
+ checkArgument(
+ !schema.primaryKeys().isEmpty(),
+ "%s = true requires a primary-key table; append-only tables cannot use "
+ + "snapshot-based sequence ordering.",
+ CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key());
+ checkArgument(
+ options.sequenceField().isEmpty(),
+ "%s = true is mutually exclusive with %s; the snapshot id is the sole tiebreaker.",
+ CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(),
+ CoreOptions.SEQUENCE_FIELD.key());
+ // Skip writeOnly check when write-only is dynamically overridden (e.g. by dedicated
+ // compact jobs that override write-only=false at runtime).
+ if (!dynamicOptionKeys.contains(CoreOptions.WRITE_ONLY.key())) {
+ checkArgument(
+ options.writeOnly(),
+ "%s = true requires %s = true. Snapshot ordering relies on snapshot id to "
+ + "determine record order, but inline compaction happens before "
+ + "snapshot creation — files have not been stamped with the correct "
+ + "snapshot id yet. Use dedicated compaction job instead.",
+ CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(),
+ CoreOptions.WRITE_ONLY.key());
+ }
+ }
+
private static void validateForDeletionVectors(CoreOptions options) {
checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 23214ff05fea..0249a8e6b397 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -374,7 +374,7 @@ protected FileStoreTable copyInternal(
}
// validate schema with new options
- SchemaValidation.validateTableSchema(newTableSchema);
+ SchemaValidation.validateTableSchema(newTableSchema, dynamicOptions.keySet());
return copy(newTableSchema);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
new file mode 100644
index 000000000000..0e29efd17102
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions.SortEngine;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for snapshot-ordering in sort-merge readers. With {@code sequence.snapshot-ordering}, the
+ * commit snapshot id is carried in each record's {@code sequenceNumber} (stamped at read time for
+ * APPEND files), so the sort-merge readers need no snapshot-specific branch: comparing by {@code
+ * sequenceNumber} already makes records from later snapshots win.
+ */
+public class SortMergeSnapshotOrderingTest {
+
+ private static final Comparator KEY_COMPARATOR =
+ (a, b) -> Integer.compare(a.getInt(0), b.getInt(0));
+
+ private static final RowType VALUE_TYPE = RowType.of(DataTypes.INT());
+
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testLaterSnapshotWins(SortEngine sortEngine) throws IOException {
+ // seq carries the snapshot id: snapshot 6 wins over snapshot 5.
+ KeyValue winner = merge(sortEngine, kv(1, 5, 999), kv(1, 6, 1));
+ assertThat(winner.value().getInt(0)).isEqualTo(1);
+ assertThat(winner.sequenceNumber()).isEqualTo(6);
+ }
+
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testHigherSequenceWins(SortEngine sortEngine) throws IOException {
+ KeyValue winner = merge(sortEngine, kv(1, 100, 100), kv(1, 50, 50));
+ assertThat(winner.value().getInt(0)).isEqualTo(100);
+ }
+
+ private static KeyValue kv(int key, long seq, int value) {
+ return new KeyValue()
+ .replace(GenericRow.of(key), seq, RowKind.INSERT, GenericRow.of(value));
+ }
+
+ private static KeyValue merge(SortEngine sortEngine, KeyValue... kvs) throws IOException {
+ List> readers = new ArrayList<>();
+ for (KeyValue kv : kvs) {
+ readers.add(new SingleKvReader(kv));
+ }
+
+ MergeFunctionWrapper wrapper =
+ new ReducerMergeFunctionWrapper(DeduplicateMergeFunction.factory().create());
+
+ RecordReader reader =
+ SortMergeReader.createSortMergeReader(
+ readers, KEY_COMPARATOR, null, wrapper, sortEngine);
+
+ RecordReader.RecordIterator batch = reader.readBatch();
+ assertThat(batch).isNotNull();
+ KeyValue result = batch.next();
+ assertThat(result).isNotNull();
+ assertThat(batch.next()).isNull();
+ batch.releaseBatch();
+ reader.close();
+ return result;
+ }
+
+ private static class SingleKvReader implements RecordReader {
+ private KeyValue kv;
+
+ SingleKvReader(KeyValue kv) {
+ this.kv = kv;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator readBatch() {
+ if (kv == null) {
+ return null;
+ }
+ KeyValue toReturn = kv;
+ kv = null;
+ return new RecordIterator() {
+ private boolean returned = false;
+
+ @Nullable
+ @Override
+ public KeyValue next() {
+ if (returned) {
+ return null;
+ }
+ returned = true;
+ return toReturn;
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 14652ec883c2..1cf0c9369ff9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -470,6 +470,55 @@ private void validateTableSchemaWithMapField(Map options) {
new TableSchema(1, fields, 10, emptyList(), singletonList("f1"), options, ""));
}
+ @Test
+ public void testSnapshotSequenceOrderingHappyPath() {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ assertThatNoException().isThrownBy(() -> validateTableSchemaExec(options));
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingRejectsNonWriteOnly() {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ assertThatThrownBy(() -> validateTableSchemaExec(options))
+ .hasMessageContaining(CoreOptions.WRITE_ONLY.key());
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingRejectsSequenceField() {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ options.put(CoreOptions.SEQUENCE_FIELD.key(), "f2");
+ assertThatThrownBy(() -> validateTableSchemaExec(options))
+ .hasMessageContaining("sequence.field");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingRejectsNonPkTable() {
+ List fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()));
+ Map options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ options.put(BUCKET.key(), String.valueOf(-1));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ emptyList(),
+ emptyList(),
+ options,
+ "")))
+ .hasMessageContaining("primary-key");
+ }
+
@Test
public void testFileFormatPerLevelRejectsIncompatibleSchema() {
List fields =
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 82b097d8949a..ae8038fcf043 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -2688,4 +2688,573 @@ public void testMergeBranchPrimaryKeyTable() throws Exception {
assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
.satisfies(anyCauseMatches(IllegalArgumentException.class, "append-only tables"));
}
+
+ @Test
+ public void testSnapshotSequenceOrdering() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: write pk=(1,10) many times so that the per-record sequence number is high.
+ for (int i = 0; i < 100; i++) {
+ write.write(rowData(1, 10, 999L));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: write pk=(1,10) once with a lower value. Because the snapshot id (2)
+ // is larger than snapshot 1, this record should win even though its per-record sequence
+ // number is much lower.
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingWithMinHeap() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CoreOptions.SORT_ENGINE, CoreOptions.SortEngine.MIN_HEAP);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ for (int i = 0; i < 100; i++) {
+ write.write(rowData(1, 10, 999L));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingFallsBackToSequenceWithinSnapshot() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Within a single snapshot, sequence number is the tiebreaker. The later write (999)
+ // gets a higher sequence number and should win.
+ write.write(rowData(1, 10, 1L));
+ write.write(rowData(1, 10, 999L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|999");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingCompactionPreservesInputSnapshotId() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: write pk=(1,10) with val=100
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: write pk=(1,10) with val=200 (this should win after compaction)
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: write a DIFFERENT key pk=(1,20)
+ write.write(rowData(1, 20, 300L));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // Snapshot 4: compact using dedicated compact writer (simulates compact job)
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+ table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(3, compactWrite.prepareCommit(true, 3));
+ compactWrite.close();
+ compactCommit.close();
+
+ List splits = table.newSnapshotReader().read().dataSplits();
+ for (DataSplit split : splits) {
+ for (DataFileMeta file : split.dataFiles()) {
+ // The compacted file's minSequenceNumber should reflect the min snapshot id
+ // of records inside (from per-record _SEQUENCE_NUMBER values written during
+ // compaction), NOT the compaction commit's snapshot id (4).
+ assertThat(file.minSequenceNumber())
+ .as(
+ "Compacted file %s should have minSequenceNumber from per-record "
+ + "snapshot ids, not the compaction commit's snapshot id",
+ file.fileName())
+ .isLessThanOrEqualTo(3);
+ }
+ }
+
+ // Also verify the read result is correct
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, toSplits(splits), toString);
+ assertThat(result).containsExactlyInAnyOrder("1|10|200", "1|20|300");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingCompactionNoOrderingReversal() throws Exception {
+ // Reproduces the scenario from the PR review: compaction of files from
+ // snapshot 1 and 3 must NOT cause records from snapshot 1 to win over
+ // an uncompacted file from snapshot 2.
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CoreOptions.BUCKET, 1);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: write pk=(1,10) with val=100
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: write SAME key pk=(1,10) with val=200 — this should win
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: write DIFFERENT key pk=(1,20) with val=300
+ write.write(rowData(1, 20, 300L));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // Compact all files using dedicated compact writer
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+ table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(3, compactWrite.prepareCommit(true, 3));
+ compactWrite.close();
+ compactCommit.close();
+
+ // Write pk=(1,10) again with val=999 — snapshot 5 should definitely win
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(rowData(1, 10, 999L));
+ commit.commit(4, write.prepareCommit(false, 4));
+
+ write.close();
+ commit.close();
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ // pk=(1,10): snapshot 5 (val=999) wins over snapshot 2 (val=200) and snapshot 1 (val=100)
+ // pk=(1,20): snapshot 3 (val=300) is the only version
+ assertThat(result).containsExactlyInAnyOrder("1|10|999", "1|20|300");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingMultiRoundCompaction() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CoreOptions.BUCKET, 1);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: pk=(1,10) val=100
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: pk=(1,10) val=200 — should win over snapshot 1
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: pk=(1,20) val=300
+ write.write(rowData(1, 20, 300L));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // First compaction (snapshot 4) using dedicated compact writer
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+ table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(3, compactWrite.prepareCommit(true, 3));
+ compactWrite.close();
+ compactCommit.close();
+
+ // Snapshot 5: pk=(1,10) val=500 — should win over everything
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(rowData(1, 10, 500L));
+ commit.commit(4, write.prepareCommit(false, 4));
+
+ // Snapshot 6: pk=(1,30) val=600
+ write.write(rowData(1, 30, 600L));
+ commit.commit(5, write.prepareCommit(false, 5));
+
+ // Second compaction (snapshot 7) using dedicated compact writer
+ write.close();
+ commit.close();
+ compactWrite = compactTable.newWrite(commitUser);
+ compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(6, compactWrite.prepareCommit(true, 6));
+ compactWrite.close();
+ compactCommit.close();
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactlyInAnyOrder("1|10|500", "1|20|300", "1|30|600");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingWithChangelogInput() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CHANGELOG_PRODUCER, ChangelogProducer.INPUT);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingWithChangelogLookup() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CHANGELOG_PRODUCER, LOOKUP);
+ });
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingDeleteFromLaterSnapshot() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).isEmpty();
+
+ write.close();
+ commit.close();
+ }
+
+ /**
+ * Regression: with snapshot-ordering on, a partial-update merge function must keep its result's
+ * {@code sequenceNumber} equal to the snapshot id carried by its inputs. The compacted file's
+ * per-record {@code _SEQUENCE_NUMBER} (and therefore its file-level minSequenceNumber) must
+ * stay a real snapshot id (>= 0); a regression to -1 would break ordering against later
+ * snapshots.
+ */
+ @Test
+ public void testSnapshotSequenceOrderingPartialUpdateCompactionPreservesSnapshotId()
+ throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(MERGE_ENGINE, PARTIAL_UPDATE);
+ conf.set(BUCKET, 1);
+ },
+ rowType);
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: partial write of column b
+ write.write(GenericRow.of(1, 1, 100, null));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: partial write of column c — partial-update merges with snapshot 1's row
+ write.write(GenericRow.of(1, 1, null, 200));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: compact files from snapshots 1+2 using a dedicated compact writer. The
+ // compaction reader merges the two partial rows through PartialUpdateMergeFunction; the
+ // merged record's sequenceNumber must stay a real snapshot id.
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+ table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(2, compactWrite.prepareCommit(true, 2));
+ compactWrite.close();
+ compactCommit.close();
+
+ List splitsAfterCompact = table.newSnapshotReader().read().dataSplits();
+ for (DataSplit split : splitsAfterCompact) {
+ for (DataFileMeta file : split.dataFiles()) {
+ assertThat(file.minSequenceNumber())
+ .as(
+ "Compacted file %s must carry a real snapshot id in"
+ + " minSequenceNumber (>= 0). A value of -1 means the"
+ + " partial-update merge result lost its snapshot id"
+ + " during compaction.",
+ file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ }
+ }
+
+ // Snapshot 4: write a fresh value of b — this snapshot must win.
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 1, 999, null));
+ commit.commit(3, write.prepareCommit(false, 3));
+
+ // Snapshot 5: another compaction using dedicated compact writer
+ write.close();
+ commit.close();
+ compactWrite = compactTable.newWrite(commitUser);
+ compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(4, compactWrite.prepareCommit(true, 4));
+ compactWrite.close();
+ compactCommit.close();
+ for (DataSplit split : table.newSnapshotReader().read().dataSplits()) {
+ for (DataFileMeta file : split.dataFiles()) {
+ assertThat(file.minSequenceNumber())
+ .as("Final compacted file %s minSequenceNumber", file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ assertThat(file.maxSequenceNumber())
+ .as("Final compacted file %s maxSequenceNumber", file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ }
+ }
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r ->
+ r.getInt(0)
+ + "|"
+ + r.getInt(1)
+ + "|"
+ + (r.isNullAt(2) ? "null" : r.getInt(2))
+ + "|"
+ + (r.isNullAt(3) ? "null" : r.getInt(3));
+ List result = getResult(read, splits, toString);
+ // b=999 (snapshot 4 wins over snapshot 1's 100), c=200 (only snapshot 2 wrote it)
+ assertThat(result).containsExactly("1|1|999|200");
+
+ write.close();
+ commit.close();
+ }
+
+ /**
+ * Regression: with snapshot-ordering on, an aggregate merge function must keep its result's
+ * {@code sequenceNumber} equal to the snapshot id carried by its inputs. Mirrors the
+ * partial-update regression — if the merged record loses the snapshot id, the compacted file's
+ * minSequenceNumber regresses to -1.
+ */
+ @Test
+ public void testSnapshotSequenceOrderingAggregateCompactionPreservesSnapshotId()
+ throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(MERGE_ENGINE, AGGREGATE);
+ conf.set(BUCKET, 1);
+ conf.set("fields.b.aggregate-function", "sum");
+ conf.set("fields.c.aggregate-function", "max");
+ },
+ rowType);
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1
+ write.write(GenericRow.of(1, 1, 10, 100));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: aggregates with snapshot 1's row.
+ write.write(GenericRow.of(1, 1, 20, 50));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: compact using dedicated compact writer
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+ table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(2, compactWrite.prepareCommit(true, 2));
+ compactWrite.close();
+ compactCommit.close();
+
+ for (DataSplit split : table.newSnapshotReader().read().dataSplits()) {
+ for (DataFileMeta file : split.dataFiles()) {
+ assertThat(file.minSequenceNumber())
+ .as(
+ "Aggregate-compacted file %s must carry a real snapshot id in"
+ + " minSequenceNumber (>= 0). A value of -1 means the"
+ + " aggregate merge result lost its snapshot id during"
+ + " compaction.",
+ file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ }
+ }
+
+ // Snapshot 4: another insert that must aggregate on top of the compacted result.
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 1, 5, 999));
+ commit.commit(3, write.prepareCommit(false, 3));
+
+ // Snapshot 5: final compaction using dedicated compact writer
+ write.close();
+ commit.close();
+ compactWrite = compactTable.newWrite(commitUser);
+ compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(4, compactWrite.prepareCommit(true, 4));
+ compactWrite.close();
+ compactCommit.close();
+ for (DataSplit split : table.newSnapshotReader().read().dataSplits()) {
+ for (DataFileMeta file : split.dataFiles()) {
+ assertThat(file.minSequenceNumber())
+ .as("Final compacted file %s minSequenceNumber", file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ assertThat(file.maxSequenceNumber())
+ .as("Final compacted file %s maxSequenceNumber", file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ }
+ }
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getInt(2) + "|" + r.getInt(3);
+ List result = getResult(read, splits, toString);
+ // b = sum(10, 20, 5) = 35, c = max(100, 50, 999) = 999
+ assertThat(result).containsExactly("1|1|35|999");
+
+ write.close();
+ commit.close();
+ }
}