diff --git a/docs/docs/primary-key-table/sequence-rowkind.mdx b/docs/docs/primary-key-table/sequence-rowkind.mdx index 96fcc19963cb..8a0f49fb7307 100644 --- a/docs/docs/primary-key-table/sequence-rowkind.mdx +++ b/docs/docs/primary-key-table/sequence-rowkind.mdx @@ -71,3 +71,50 @@ By default, the primary key table determines the row kind according to the input `'rowkind.field'` to use a field to extract row kind. The valid row kind string should be `'+I'`, `'-U'`, `'+U'` or `'-D'`. + +## Snapshot Ordering + +In multi-writer scenarios where wall-clock sequence numbers cannot be globally ordered across writers, +you can enable `'sequence.snapshot-ordering'` to use the commit snapshot id as the ordering key when +merging records with the same primary key. Records from later snapshots are considered newer, +regardless of their per-record sequence number. + + + + +```sql +CREATE TABLE my_table ( + pk BIGINT PRIMARY KEY NOT ENFORCED, + v1 DOUBLE, + v2 BIGINT +) WITH ( + 'sequence.snapshot-ordering' = 'true', + 'write-only' = 'true' +); +``` + + + + +:::warning +This option requires `'write-only' = 'true'`. Compaction must be performed by a separate dedicated +compact job. This ensures that compaction correctly preserves the snapshot id of each record. +::: + +:::info +`'sequence.snapshot-ordering'` is mutually exclusive with `'sequence.field'`. You cannot enable both +on the same table. +::: + +:::info +The ordering key is the commit snapshot id only; the order of records **within the same snapshot** is +not guaranteed, and this is by design. Under the default configuration it is harmless: a writer +buffers a commit's writes (`'write-buffer-spillable' = 'true'`) and runs them through the merge +function before flushing, so at most one record per primary key is written per snapshot — the common +case is fully covered. We therefore deliberately do not handle the case where the same key is spread +across multiple files of one snapshot. That case only arises with `'write-buffer-spillable' = 'false'`, +or when the spilled data exceeds `'write-buffer-spill-disk-size'`, where the buffer may be flushed +mid-commit; the same key can then land in multiple files of the same snapshot with equal sequence +numbers and their relative order becomes undefined. This affects only intra-snapshot order, never the +cross-snapshot ordering this feature provides. +::: diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index ebd8a5aca7d1..6c2cc37a0358 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -1337,6 +1337,12 @@

Enum

Specify the order of sequence.field.

Possible values: + +
sequence.snapshot-ordering
+ false + Boolean + When enabled, merge uses the commit snapshot id as the ordering key for primary-key conflicts: records from later snapshots always win. Designed for multi-writer scenarios on the same primary-key table where wall-clock sequence numbers cannot be globally ordered. The order of records within the same snapshot is not guaranteed. Mutually exclusive with sequence.field. Requires a primary-key table with write-only=true. Inline compaction is not allowed because snapshot ids are assigned only after commit. To compact such tables, run a dedicated compaction job/action with write-only=false. +
sink.process-time-zone
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index f09a6edb4a74..d0ce21e5f903 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -967,6 +967,24 @@ public InlineElement getDescription() { .defaultValue(SortOrder.ASCENDING) .withDescription("Specify the order of sequence.field."); + @Immutable + public static final ConfigOption SEQUENCE_SNAPSHOT_ORDERING = + key("sequence.snapshot-ordering") + .booleanType() + .defaultValue(false) + .withDescription( + "When enabled, merge uses the commit snapshot id as the ordering key " + + "for primary-key conflicts: records from later snapshots " + + "always win. Designed for multi-writer scenarios on the same " + + "primary-key table where wall-clock sequence numbers cannot " + + "be globally ordered. The order of records within the same " + + "snapshot is not guaranteed. Mutually exclusive with " + + "sequence.field. Requires a primary-key table with " + + "write-only=true. Inline compaction is not allowed because " + + "snapshot ids are assigned only after commit. To compact such " + + "tables, run a dedicated compaction job/action with " + + "write-only=false."); + @Immutable public static final ConfigOption AGGREGATION_REMOVE_RECORD_ON_DELETE = key("aggregation.remove-record-on-delete") @@ -3415,6 +3433,10 @@ public boolean sequenceFieldSortOrderIsAscending() { return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING; } + public boolean snapshotSequenceOrdering() { + return options.get(SEQUENCE_SNAPSHOT_ORDERING); + } + public Optional rowkindField() { return options.getOptional(ROWKIND_FIELD); } diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java index c314f21107a7..4c4ff61c24b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java @@ -89,6 +89,11 @@ public long sequenceNumber() { return sequenceNumber; } + public KeyValue setSequenceNumber(long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + return this; + } + public RowKind valueKind() { return valueKind; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 23b34947bcc9..a8cdc031e134 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -277,8 +277,15 @@ static DataFileMeta create( SimpleStats valueStats(); + /** + * Minimum sequence number of records in this file. When {@code sequence.snapshot-ordering} is + * enabled for a primary-key table, this field is repurposed to carry the commit snapshot id + * instead of the per-record sequence number range (the snapshot id is stamped into it at commit + * time by {@code FileStoreCommitImpl}). + */ long minSequenceNumber(); + /** @see #minSequenceNumber() */ long maxSequenceNumber(); long schemaId(); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java index 6cf08769703f..2538c08a2127 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java @@ -36,12 +36,26 @@ public class KeyValueDataFileRecordReader implements FileRecordReader private final FileRecordReader reader; private final KeyValueSerializer serializer; private final int level; + private final boolean overrideSequenceWithSnapshotId; + private final long snapshotId; public KeyValueDataFileRecordReader( FileRecordReader reader, RowType keyType, RowType valueType, int level) { + this(reader, keyType, valueType, level, false, KeyValue.UNKNOWN_SEQUENCE); + } + + public KeyValueDataFileRecordReader( + FileRecordReader reader, + RowType keyType, + RowType valueType, + int level, + boolean overrideSequenceWithSnapshotId, + long snapshotId) { this.reader = reader; this.serializer = new KeyValueSerializer(keyType, valueType); this.level = level; + this.overrideSequenceWithSnapshotId = overrideSequenceWithSnapshotId; + this.snapshotId = snapshotId; } @Nullable @@ -53,10 +67,20 @@ public FileRecordIterator readBatch() throws IOException { } return iterator.transform( - internalRow -> - internalRow == null - ? null - : serializer.fromRow(internalRow).setLevel(level)); + internalRow -> { + if (internalRow == null) { + return null; + } + KeyValue kv = serializer.fromRow(internalRow).setLevel(level); + // In snapshot-ordering mode, an APPEND file's on-disk per-record sequence + // numbers are stale; we override them with the commit snapshot id so later + // snapshots win during merge. Any read path bypassing this override would + // order APPEND records incorrectly. + if (overrideSequenceWithSnapshotId) { + kv.setSequenceNumber(snapshotId); + } + return kv; + }); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index fc505e19c270..5f7e3741927e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -30,6 +30,7 @@ import org.apache.paimon.format.OrcFormatReaderContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.partition.PartitionUtils; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.FileRecordReader; @@ -42,6 +43,7 @@ import org.apache.paimon.utils.AsyncRecordReader; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.FormatReaderMapping; +import org.apache.paimon.utils.Preconditions; import javax.annotation.Nullable; @@ -68,6 +70,7 @@ public class KeyValueFileReaderFactory implements FileReaderFactory { private final long asyncThreshold; private final boolean ignoreCorruptFiles; private final boolean ignoreLostFiles; + private final boolean snapshotSequenceOrdering; private final Map formatReaderMappings; private final BinaryRow partition; private final DeletionVector.Factory dvFactory; @@ -93,6 +96,7 @@ protected KeyValueFileReaderFactory( this.asyncThreshold = coreOptions.fileReaderAsyncThreshold().getBytes(); this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile(); this.ignoreLostFiles = coreOptions.scanIgnoreLostFile(); + this.snapshotSequenceOrdering = coreOptions.snapshotSequenceOrdering(); this.partition = partition; this.formatReaderMappings = new HashMap<>(); this.dvFactory = dvFactory; @@ -168,7 +172,26 @@ private FileRecordReader createRecordReader( new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get()); } - return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level()); + // In snapshot-ordering mode, APPEND files carry the commit snapshot id in + // minSequenceNumber (stamped at commit time); override per-record sequence with it so + // later snapshots win during merge. COMPACT files already carry the snapshot id in their + // per-record _SEQUENCE_NUMBER and are left untouched. + boolean overrideSequenceWithSnapshotId = false; + if (snapshotSequenceOrdering) { + Preconditions.checkState( + file.fileSource().isPresent(), + "sequence.snapshot-ordering requires data files with fileSource metadata. " + + "This option is only safe for newly-created tables or empty tables. " + + "Legacy files without fileSource cannot be ordered by commit snapshot id."); + overrideSequenceWithSnapshotId = file.fileSource().get() == FileSource.APPEND; + } + return new KeyValueDataFileRecordReader( + fileRecordReader, + keyType, + valueType, + file.level(), + overrideSequenceWithSnapshotId, + file.minSequenceNumber()); } public static Builder builder( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 3f9fdb9f1c0c..e15698f36179 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -989,6 +989,10 @@ CommitResult tryCommitOnce( deltaFiles = assigned.assignedEntries; } + if (options.snapshotSequenceOrdering()) { + deltaFiles = stampSequenceWithSnapshotId(newSnapshotId, commitKind, deltaFiles); + } + // the added records subtract the deleted records from long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles); long totalRecordCount = previousTotalRecordCount + deltaRecordCount; @@ -1265,4 +1269,31 @@ public void close() { IOUtils.closeAllQuietly(commitCallbacks); IOUtils.closeQuietly(snapshotCommit); } + + /** + * Stamps the commit snapshot id into {@link DataFileMeta#minSequenceNumber()} / {@link + * DataFileMeta#maxSequenceNumber()} of APPEND files, reusing these fields instead of adding a + * new one (same pattern as {@link RowTrackingCommitUtils#assignRowTracking}). COMPACT files are + * returned unchanged: their input was read through the override path, so their per-record + * {@code _SEQUENCE_NUMBER} already carries the snapshot id. + * + *

All records of a snapshot share one id, so intra-snapshot order is not preserved. This is + * accepted: the default spillable writer collapses a commit's writes through the merge function + * to one record per key before flush, and the feature targets cross-snapshot ordering only. + */ + private static List stampSequenceWithSnapshotId( + long snapshotId, CommitKind commitKind, List files) { + if (commitKind == CommitKind.COMPACT) { + return files; + } + List result = new ArrayList<>(files.size()); + for (ManifestEntry entry : files) { + if (entry.kind() == FileKind.ADD) { + result.add(entry.assignSequenceNumber(snapshotId, snapshotId)); + } else { + result.add(entry); + } + } + return result; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index d07ad2581944..424b9cec7dfd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -105,6 +105,18 @@ public class SchemaValidation { * @param schema the schema to be validated */ public static void validateTableSchema(TableSchema schema) { + validateTableSchema(schema, Collections.emptySet()); + } + + /** + * Validate the {@link TableSchema} and {@link CoreOptions}. + * + * @param schema the schema to be validated + * @param dynamicOptionKeys option keys that are overridden dynamically at runtime (e.g. by + * dedicated compaction jobs) and should therefore be excluded from certain static + * validations such as the {@code write-only} requirement for snapshot ordering + */ + public static void validateTableSchema(TableSchema schema, Set dynamicOptionKeys) { CoreOptions options = new CoreOptions(schema.options()); validateOnlyContainPrimitiveType(schema.fields(), schema.primaryKeys(), "primary key"); @@ -288,6 +300,10 @@ public static void validateTableSchema(TableSchema schema) { "deletion-vectors.merge-on-read requires deletion-vectors.enabled to be true."); } + if (options.snapshotSequenceOrdering()) { + validateSnapshotSequenceOrdering(schema, options, dynamicOptionKeys); + } + // vector field names must point to vector type Set fieldNamesSpecifiedAsVector = options.vectorField(); schema.fields() @@ -612,6 +628,32 @@ private static void validateFileIndex(TableSchema schema) { } } + private static void validateSnapshotSequenceOrdering( + TableSchema schema, CoreOptions options, Set dynamicOptionKeys) { + checkArgument( + !schema.primaryKeys().isEmpty(), + "%s = true requires a primary-key table; append-only tables cannot use " + + "snapshot-based sequence ordering.", + CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key()); + checkArgument( + options.sequenceField().isEmpty(), + "%s = true is mutually exclusive with %s; the snapshot id is the sole tiebreaker.", + CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), + CoreOptions.SEQUENCE_FIELD.key()); + // Skip writeOnly check when write-only is dynamically overridden (e.g. by dedicated + // compact jobs that override write-only=false at runtime). + if (!dynamicOptionKeys.contains(CoreOptions.WRITE_ONLY.key())) { + checkArgument( + options.writeOnly(), + "%s = true requires %s = true. Snapshot ordering relies on snapshot id to " + + "determine record order, but inline compaction happens before " + + "snapshot creation — files have not been stamped with the correct " + + "snapshot id yet. Use dedicated compaction job instead.", + CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), + CoreOptions.WRITE_ONLY.key()); + } + } + private static void validateForDeletionVectors(CoreOptions options) { checkArgument( options.changelogProducer() == ChangelogProducer.NONE diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 23214ff05fea..0249a8e6b397 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -374,7 +374,7 @@ protected FileStoreTable copyInternal( } // validate schema with new options - SchemaValidation.validateTableSchema(newTableSchema); + SchemaValidation.validateTableSchema(newTableSchema, dynamicOptions.keySet()); return copy(newTableSchema); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java new file mode 100644 index 000000000000..0e29efd17102 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact; + +import org.apache.paimon.CoreOptions.SortEngine; +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for snapshot-ordering in sort-merge readers. With {@code sequence.snapshot-ordering}, the + * commit snapshot id is carried in each record's {@code sequenceNumber} (stamped at read time for + * APPEND files), so the sort-merge readers need no snapshot-specific branch: comparing by {@code + * sequenceNumber} already makes records from later snapshots win. + */ +public class SortMergeSnapshotOrderingTest { + + private static final Comparator KEY_COMPARATOR = + (a, b) -> Integer.compare(a.getInt(0), b.getInt(0)); + + private static final RowType VALUE_TYPE = RowType.of(DataTypes.INT()); + + @ParameterizedTest + @EnumSource(SortEngine.class) + public void testLaterSnapshotWins(SortEngine sortEngine) throws IOException { + // seq carries the snapshot id: snapshot 6 wins over snapshot 5. + KeyValue winner = merge(sortEngine, kv(1, 5, 999), kv(1, 6, 1)); + assertThat(winner.value().getInt(0)).isEqualTo(1); + assertThat(winner.sequenceNumber()).isEqualTo(6); + } + + @ParameterizedTest + @EnumSource(SortEngine.class) + public void testHigherSequenceWins(SortEngine sortEngine) throws IOException { + KeyValue winner = merge(sortEngine, kv(1, 100, 100), kv(1, 50, 50)); + assertThat(winner.value().getInt(0)).isEqualTo(100); + } + + private static KeyValue kv(int key, long seq, int value) { + return new KeyValue() + .replace(GenericRow.of(key), seq, RowKind.INSERT, GenericRow.of(value)); + } + + private static KeyValue merge(SortEngine sortEngine, KeyValue... kvs) throws IOException { + List> readers = new ArrayList<>(); + for (KeyValue kv : kvs) { + readers.add(new SingleKvReader(kv)); + } + + MergeFunctionWrapper wrapper = + new ReducerMergeFunctionWrapper(DeduplicateMergeFunction.factory().create()); + + RecordReader reader = + SortMergeReader.createSortMergeReader( + readers, KEY_COMPARATOR, null, wrapper, sortEngine); + + RecordReader.RecordIterator batch = reader.readBatch(); + assertThat(batch).isNotNull(); + KeyValue result = batch.next(); + assertThat(result).isNotNull(); + assertThat(batch.next()).isNull(); + batch.releaseBatch(); + reader.close(); + return result; + } + + private static class SingleKvReader implements RecordReader { + private KeyValue kv; + + SingleKvReader(KeyValue kv) { + this.kv = kv; + } + + @Nullable + @Override + public RecordIterator readBatch() { + if (kv == null) { + return null; + } + KeyValue toReturn = kv; + kv = null; + return new RecordIterator() { + private boolean returned = false; + + @Nullable + @Override + public KeyValue next() { + if (returned) { + return null; + } + returned = true; + return toReturn; + } + + @Override + public void releaseBatch() {} + }; + } + + @Override + public void close() {} + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 14652ec883c2..1cf0c9369ff9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -470,6 +470,55 @@ private void validateTableSchemaWithMapField(Map options) { new TableSchema(1, fields, 10, emptyList(), singletonList("f1"), options, "")); } + @Test + public void testSnapshotSequenceOrderingHappyPath() { + Map options = new HashMap<>(); + options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true"); + options.put(CoreOptions.WRITE_ONLY.key(), "true"); + assertThatNoException().isThrownBy(() -> validateTableSchemaExec(options)); + } + + @Test + public void testSnapshotSequenceOrderingRejectsNonWriteOnly() { + Map options = new HashMap<>(); + options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .hasMessageContaining(CoreOptions.WRITE_ONLY.key()); + } + + @Test + public void testSnapshotSequenceOrderingRejectsSequenceField() { + Map options = new HashMap<>(); + options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true"); + options.put(CoreOptions.WRITE_ONLY.key(), "true"); + options.put(CoreOptions.SEQUENCE_FIELD.key(), "f2"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .hasMessageContaining("sequence.field"); + } + + @Test + public void testSnapshotSequenceOrderingRejectsNonPkTable() { + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.INT())); + Map options = new HashMap<>(); + options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true"); + options.put(BUCKET.key(), String.valueOf(-1)); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options, + ""))) + .hasMessageContaining("primary-key"); + } + @Test public void testFileFormatPerLevelRejectsIncompatibleSchema() { List fields = diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index 82b097d8949a..ae8038fcf043 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -2688,4 +2688,573 @@ public void testMergeBranchPrimaryKeyTable() throws Exception { assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) .satisfies(anyCauseMatches(IllegalArgumentException.class, "append-only tables")); } + + @Test + public void testSnapshotSequenceOrdering() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + }); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Snapshot 1: write pk=(1,10) many times so that the per-record sequence number is high. + for (int i = 0; i < 100; i++) { + write.write(rowData(1, 10, 999L)); + } + commit.commit(0, write.prepareCommit(false, 0)); + + // Snapshot 2: write pk=(1,10) once with a lower value. Because the snapshot id (2) + // is larger than snapshot 1, this record should win even though its per-record sequence + // number is much lower. + write.write(rowData(1, 10, 1L)); + commit.commit(1, write.prepareCommit(false, 1)); + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2); + List result = getResult(read, splits, toString); + assertThat(result).containsExactly("1|10|1"); + + write.close(); + commit.close(); + } + + @Test + public void testSnapshotSequenceOrderingWithMinHeap() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + conf.set(CoreOptions.SORT_ENGINE, CoreOptions.SortEngine.MIN_HEAP); + }); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + for (int i = 0; i < 100; i++) { + write.write(rowData(1, 10, 999L)); + } + commit.commit(0, write.prepareCommit(false, 0)); + + write.write(rowData(1, 10, 1L)); + commit.commit(1, write.prepareCommit(false, 1)); + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2); + List result = getResult(read, splits, toString); + assertThat(result).containsExactly("1|10|1"); + + write.close(); + commit.close(); + } + + @Test + public void testSnapshotSequenceOrderingFallsBackToSequenceWithinSnapshot() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + }); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Within a single snapshot, sequence number is the tiebreaker. The later write (999) + // gets a higher sequence number and should win. + write.write(rowData(1, 10, 1L)); + write.write(rowData(1, 10, 999L)); + commit.commit(0, write.prepareCommit(false, 0)); + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2); + List result = getResult(read, splits, toString); + assertThat(result).containsExactly("1|10|999"); + + write.close(); + commit.close(); + } + + @Test + public void testSnapshotSequenceOrderingCompactionPreservesInputSnapshotId() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + }); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Snapshot 1: write pk=(1,10) with val=100 + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 0)); + + // Snapshot 2: write pk=(1,10) with val=200 (this should win after compaction) + write.write(rowData(1, 10, 200L)); + commit.commit(1, write.prepareCommit(false, 1)); + + // Snapshot 3: write a DIFFERENT key pk=(1,20) + write.write(rowData(1, 20, 300L)); + commit.commit(2, write.prepareCommit(false, 2)); + + // Snapshot 4: compact using dedicated compact writer (simulates compact job) + write.close(); + commit.close(); + FileStoreTable compactTable = + table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")); + StreamTableWrite compactWrite = compactTable.newWrite(commitUser); + StreamTableCommit compactCommit = compactTable.newCommit(commitUser); + compactWrite.compact(binaryRow(1), 0, true); + compactCommit.commit(3, compactWrite.prepareCommit(true, 3)); + compactWrite.close(); + compactCommit.close(); + + List splits = table.newSnapshotReader().read().dataSplits(); + for (DataSplit split : splits) { + for (DataFileMeta file : split.dataFiles()) { + // The compacted file's minSequenceNumber should reflect the min snapshot id + // of records inside (from per-record _SEQUENCE_NUMBER values written during + // compaction), NOT the compaction commit's snapshot id (4). + assertThat(file.minSequenceNumber()) + .as( + "Compacted file %s should have minSequenceNumber from per-record " + + "snapshot ids, not the compaction commit's snapshot id", + file.fileName()) + .isLessThanOrEqualTo(3); + } + } + + // Also verify the read result is correct + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2); + List result = getResult(read, toSplits(splits), toString); + assertThat(result).containsExactlyInAnyOrder("1|10|200", "1|20|300"); + } + + @Test + public void testSnapshotSequenceOrderingCompactionNoOrderingReversal() throws Exception { + // Reproduces the scenario from the PR review: compaction of files from + // snapshot 1 and 3 must NOT cause records from snapshot 1 to win over + // an uncompacted file from snapshot 2. + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + conf.set(CoreOptions.BUCKET, 1); + }); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Snapshot 1: write pk=(1,10) with val=100 + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 0)); + + // Snapshot 2: write SAME key pk=(1,10) with val=200 — this should win + write.write(rowData(1, 10, 200L)); + commit.commit(1, write.prepareCommit(false, 1)); + + // Snapshot 3: write DIFFERENT key pk=(1,20) with val=300 + write.write(rowData(1, 20, 300L)); + commit.commit(2, write.prepareCommit(false, 2)); + + // Compact all files using dedicated compact writer + write.close(); + commit.close(); + FileStoreTable compactTable = + table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")); + StreamTableWrite compactWrite = compactTable.newWrite(commitUser); + StreamTableCommit compactCommit = compactTable.newCommit(commitUser); + compactWrite.compact(binaryRow(1), 0, true); + compactCommit.commit(3, compactWrite.prepareCommit(true, 3)); + compactWrite.close(); + compactCommit.close(); + + // Write pk=(1,10) again with val=999 — snapshot 5 should definitely win + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + write.write(rowData(1, 10, 999L)); + commit.commit(4, write.prepareCommit(false, 4)); + + write.close(); + commit.close(); + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2); + List result = getResult(read, splits, toString); + // pk=(1,10): snapshot 5 (val=999) wins over snapshot 2 (val=200) and snapshot 1 (val=100) + // pk=(1,20): snapshot 3 (val=300) is the only version + assertThat(result).containsExactlyInAnyOrder("1|10|999", "1|20|300"); + } + + @Test + public void testSnapshotSequenceOrderingMultiRoundCompaction() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + conf.set(CoreOptions.BUCKET, 1); + }); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Snapshot 1: pk=(1,10) val=100 + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 0)); + + // Snapshot 2: pk=(1,10) val=200 — should win over snapshot 1 + write.write(rowData(1, 10, 200L)); + commit.commit(1, write.prepareCommit(false, 1)); + + // Snapshot 3: pk=(1,20) val=300 + write.write(rowData(1, 20, 300L)); + commit.commit(2, write.prepareCommit(false, 2)); + + // First compaction (snapshot 4) using dedicated compact writer + write.close(); + commit.close(); + FileStoreTable compactTable = + table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")); + StreamTableWrite compactWrite = compactTable.newWrite(commitUser); + StreamTableCommit compactCommit = compactTable.newCommit(commitUser); + compactWrite.compact(binaryRow(1), 0, true); + compactCommit.commit(3, compactWrite.prepareCommit(true, 3)); + compactWrite.close(); + compactCommit.close(); + + // Snapshot 5: pk=(1,10) val=500 — should win over everything + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + write.write(rowData(1, 10, 500L)); + commit.commit(4, write.prepareCommit(false, 4)); + + // Snapshot 6: pk=(1,30) val=600 + write.write(rowData(1, 30, 600L)); + commit.commit(5, write.prepareCommit(false, 5)); + + // Second compaction (snapshot 7) using dedicated compact writer + write.close(); + commit.close(); + compactWrite = compactTable.newWrite(commitUser); + compactCommit = compactTable.newCommit(commitUser); + compactWrite.compact(binaryRow(1), 0, true); + compactCommit.commit(6, compactWrite.prepareCommit(true, 6)); + compactWrite.close(); + compactCommit.close(); + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2); + List result = getResult(read, splits, toString); + assertThat(result).containsExactlyInAnyOrder("1|10|500", "1|20|300", "1|30|600"); + } + + @Test + public void testSnapshotSequenceOrderingWithChangelogInput() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + conf.set(CHANGELOG_PRODUCER, ChangelogProducer.INPUT); + }); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 0)); + + write.write(rowData(1, 10, 1L)); + commit.commit(1, write.prepareCommit(false, 1)); + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2); + List result = getResult(read, splits, toString); + assertThat(result).containsExactly("1|10|1"); + + write.close(); + commit.close(); + } + + @Test + public void testSnapshotSequenceOrderingWithChangelogLookup() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + conf.set(CHANGELOG_PRODUCER, LOOKUP); + }); + StreamTableWrite write = + table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 0)); + + write.write(rowData(1, 10, 1L)); + commit.commit(1, write.prepareCommit(false, 1)); + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2); + List result = getResult(read, splits, toString); + assertThat(result).containsExactly("1|10|1"); + + write.close(); + commit.close(); + } + + @Test + public void testSnapshotSequenceOrderingDeleteFromLaterSnapshot() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + }); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 0)); + + write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 1)); + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2); + List result = getResult(read, splits, toString); + assertThat(result).isEmpty(); + + write.close(); + commit.close(); + } + + /** + * Regression: with snapshot-ordering on, a partial-update merge function must keep its result's + * {@code sequenceNumber} equal to the snapshot id carried by its inputs. The compacted file's + * per-record {@code _SEQUENCE_NUMBER} (and therefore its file-level minSequenceNumber) must + * stay a real snapshot id (>= 0); a regression to -1 would break ordering against later + * snapshots. + */ + @Test + public void testSnapshotSequenceOrderingPartialUpdateCompactionPreservesSnapshotId() + throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT() + }, + new String[] {"pt", "a", "b", "c"}); + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + conf.set(MERGE_ENGINE, PARTIAL_UPDATE); + conf.set(BUCKET, 1); + }, + rowType); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Snapshot 1: partial write of column b + write.write(GenericRow.of(1, 1, 100, null)); + commit.commit(0, write.prepareCommit(false, 0)); + + // Snapshot 2: partial write of column c — partial-update merges with snapshot 1's row + write.write(GenericRow.of(1, 1, null, 200)); + commit.commit(1, write.prepareCommit(false, 1)); + + // Snapshot 3: compact files from snapshots 1+2 using a dedicated compact writer. The + // compaction reader merges the two partial rows through PartialUpdateMergeFunction; the + // merged record's sequenceNumber must stay a real snapshot id. + write.close(); + commit.close(); + FileStoreTable compactTable = + table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")); + StreamTableWrite compactWrite = compactTable.newWrite(commitUser); + StreamTableCommit compactCommit = compactTable.newCommit(commitUser); + compactWrite.compact(binaryRow(1), 0, true); + compactCommit.commit(2, compactWrite.prepareCommit(true, 2)); + compactWrite.close(); + compactCommit.close(); + + List splitsAfterCompact = table.newSnapshotReader().read().dataSplits(); + for (DataSplit split : splitsAfterCompact) { + for (DataFileMeta file : split.dataFiles()) { + assertThat(file.minSequenceNumber()) + .as( + "Compacted file %s must carry a real snapshot id in" + + " minSequenceNumber (>= 0). A value of -1 means the" + + " partial-update merge result lost its snapshot id" + + " during compaction.", + file.fileName()) + .isGreaterThanOrEqualTo(0L); + } + } + + // Snapshot 4: write a fresh value of b — this snapshot must win. + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + write.write(GenericRow.of(1, 1, 999, null)); + commit.commit(3, write.prepareCommit(false, 3)); + + // Snapshot 5: another compaction using dedicated compact writer + write.close(); + commit.close(); + compactWrite = compactTable.newWrite(commitUser); + compactCommit = compactTable.newCommit(commitUser); + compactWrite.compact(binaryRow(1), 0, true); + compactCommit.commit(4, compactWrite.prepareCommit(true, 4)); + compactWrite.close(); + compactCommit.close(); + for (DataSplit split : table.newSnapshotReader().read().dataSplits()) { + for (DataFileMeta file : split.dataFiles()) { + assertThat(file.minSequenceNumber()) + .as("Final compacted file %s minSequenceNumber", file.fileName()) + .isGreaterThanOrEqualTo(0L); + assertThat(file.maxSequenceNumber()) + .as("Final compacted file %s maxSequenceNumber", file.fileName()) + .isGreaterThanOrEqualTo(0L); + } + } + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> + r.getInt(0) + + "|" + + r.getInt(1) + + "|" + + (r.isNullAt(2) ? "null" : r.getInt(2)) + + "|" + + (r.isNullAt(3) ? "null" : r.getInt(3)); + List result = getResult(read, splits, toString); + // b=999 (snapshot 4 wins over snapshot 1's 100), c=200 (only snapshot 2 wrote it) + assertThat(result).containsExactly("1|1|999|200"); + + write.close(); + commit.close(); + } + + /** + * Regression: with snapshot-ordering on, an aggregate merge function must keep its result's + * {@code sequenceNumber} equal to the snapshot id carried by its inputs. Mirrors the + * partial-update regression — if the merged record loses the snapshot id, the compacted file's + * minSequenceNumber regresses to -1. + */ + @Test + public void testSnapshotSequenceOrderingAggregateCompactionPreservesSnapshotId() + throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT() + }, + new String[] {"pt", "a", "b", "c"}); + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true); + conf.set(CoreOptions.WRITE_ONLY, true); + conf.set(MERGE_ENGINE, AGGREGATE); + conf.set(BUCKET, 1); + conf.set("fields.b.aggregate-function", "sum"); + conf.set("fields.c.aggregate-function", "max"); + }, + rowType); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Snapshot 1 + write.write(GenericRow.of(1, 1, 10, 100)); + commit.commit(0, write.prepareCommit(false, 0)); + + // Snapshot 2: aggregates with snapshot 1's row. + write.write(GenericRow.of(1, 1, 20, 50)); + commit.commit(1, write.prepareCommit(false, 1)); + + // Snapshot 3: compact using dedicated compact writer + write.close(); + commit.close(); + FileStoreTable compactTable = + table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")); + StreamTableWrite compactWrite = compactTable.newWrite(commitUser); + StreamTableCommit compactCommit = compactTable.newCommit(commitUser); + compactWrite.compact(binaryRow(1), 0, true); + compactCommit.commit(2, compactWrite.prepareCommit(true, 2)); + compactWrite.close(); + compactCommit.close(); + + for (DataSplit split : table.newSnapshotReader().read().dataSplits()) { + for (DataFileMeta file : split.dataFiles()) { + assertThat(file.minSequenceNumber()) + .as( + "Aggregate-compacted file %s must carry a real snapshot id in" + + " minSequenceNumber (>= 0). A value of -1 means the" + + " aggregate merge result lost its snapshot id during" + + " compaction.", + file.fileName()) + .isGreaterThanOrEqualTo(0L); + } + } + + // Snapshot 4: another insert that must aggregate on top of the compacted result. + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + write.write(GenericRow.of(1, 1, 5, 999)); + commit.commit(3, write.prepareCommit(false, 3)); + + // Snapshot 5: final compaction using dedicated compact writer + write.close(); + commit.close(); + compactWrite = compactTable.newWrite(commitUser); + compactCommit = compactTable.newCommit(commitUser); + compactWrite.compact(binaryRow(1), 0, true); + compactCommit.commit(4, compactWrite.prepareCommit(true, 4)); + compactWrite.close(); + compactCommit.close(); + for (DataSplit split : table.newSnapshotReader().read().dataSplits()) { + for (DataFileMeta file : split.dataFiles()) { + assertThat(file.minSequenceNumber()) + .as("Final compacted file %s minSequenceNumber", file.fileName()) + .isGreaterThanOrEqualTo(0L); + assertThat(file.maxSequenceNumber()) + .as("Final compacted file %s maxSequenceNumber", file.fileName()) + .isGreaterThanOrEqualTo(0L); + } + } + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newReadBuilder().newRead(); + Function toString = + r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getInt(2) + "|" + r.getInt(3); + List result = getResult(read, splits, toString); + // b = sum(10, 20, 5) = 35, c = max(100, 50, 999) = 999 + assertThat(result).containsExactly("1|1|35|999"); + + write.close(); + commit.close(); + } }