Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/docs/primary-key-table/sequence-rowkind.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,50 @@ By default, the primary key table determines the row kind according to the input
`'rowkind.field'` to use a field to extract row kind.

The valid row kind string should be `'+I'`, `'-U'`, `'+U'` or `'-D'`.

## Snapshot Ordering

In multi-writer scenarios where wall-clock sequence numbers cannot be globally ordered across writers,
you can enable `'sequence.snapshot-ordering'` to use the commit snapshot id as the ordering key when
merging records with the same primary key. Records from later snapshots are considered newer,
regardless of their per-record sequence number.

<Tabs>
<TabItem value="flink" label="Flink">

```sql
CREATE TABLE my_table (
pk BIGINT PRIMARY KEY NOT ENFORCED,
v1 DOUBLE,
v2 BIGINT
) WITH (
'sequence.snapshot-ordering' = 'true',
'write-only' = 'true'
);
```

</TabItem>
</Tabs>

:::warning
This option requires `'write-only' = 'true'`. Compaction must be performed by a separate dedicated
compact job. This ensures that compaction correctly preserves the snapshot id of each record.
:::

:::info
`'sequence.snapshot-ordering'` is mutually exclusive with `'sequence.field'`. You cannot enable both
on the same table.
:::

:::info
The ordering key is the commit snapshot id only; the order of records **within the same snapshot** is
not guaranteed, and this is by design. Under the default configuration it is harmless: a writer
buffers a commit's writes (`'write-buffer-spillable' = 'true'`) and runs them through the merge
function before flushing, so at most one record per primary key is written per snapshot — the common
case is fully covered. We therefore deliberately do not handle the case where the same key is spread
across multiple files of one snapshot. That case only arises with `'write-buffer-spillable' = 'false'`,
or when the spilled data exceeds `'write-buffer-spill-disk-size'`, where the buffer may be flushed
mid-commit; the same key can then land in multiple files of the same snapshot with equal sequence
numbers and their relative order becomes undefined. This affects only intra-snapshot order, never the
cross-snapshot ordering this feature provides.
:::
6 changes: 6 additions & 0 deletions docs/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,12 @@
<td><p>Enum</p></td>
<td>Specify the order of sequence.field.<br /><br />Possible values:<ul><li>"ascending": specifies sequence.field sort order is ascending.</li><li>"descending": specifies sequence.field sort order is descending.</li></ul></td>
</tr>
<tr>
<td><h5>sequence.snapshot-ordering</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When enabled, merge uses the commit snapshot id as the ordering key for primary-key conflicts: records from later snapshots always win. Designed for multi-writer scenarios on the same primary-key table where wall-clock sequence numbers cannot be globally ordered. The order of records within the same snapshot is not guaranteed. Mutually exclusive with sequence.field. Requires a primary-key table with write-only=true. Inline compaction is not allowed because snapshot ids are assigned only after commit. To compact such tables, run a dedicated compaction job/action with write-only=false.</td>
</tr>
<tr>
<td><h5>sink.process-time-zone</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
22 changes: 22 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,24 @@ public InlineElement getDescription() {
.defaultValue(SortOrder.ASCENDING)
.withDescription("Specify the order of sequence.field.");

@Immutable
public static final ConfigOption<Boolean> SEQUENCE_SNAPSHOT_ORDERING =
key("sequence.snapshot-ordering")
.booleanType()
.defaultValue(false)
.withDescription(
"When enabled, merge uses the commit snapshot id as the ordering key "
+ "for primary-key conflicts: records from later snapshots "
+ "always win. Designed for multi-writer scenarios on the same "
+ "primary-key table where wall-clock sequence numbers cannot "
+ "be globally ordered. The order of records within the same "
+ "snapshot is not guaranteed. Mutually exclusive with "
+ "sequence.field. Requires a primary-key table with "
+ "write-only=true. Inline compaction is not allowed because "
+ "snapshot ids are assigned only after commit. To compact such "
+ "tables, run a dedicated compaction job/action with "
+ "write-only=false.");

@Immutable
public static final ConfigOption<Boolean> AGGREGATION_REMOVE_RECORD_ON_DELETE =
key("aggregation.remove-record-on-delete")
Expand Down Expand Up @@ -3415,6 +3433,10 @@ public boolean sequenceFieldSortOrderIsAscending() {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}

public boolean snapshotSequenceOrdering() {
return options.get(SEQUENCE_SNAPSHOT_ORDERING);
}

public Optional<String> rowkindField() {
return options.getOptional(ROWKIND_FIELD);
}
Expand Down
5 changes: 5 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public long sequenceNumber() {
return sequenceNumber;
}

public KeyValue setSequenceNumber(long sequenceNumber) {
this.sequenceNumber = sequenceNumber;
return this;
}

public RowKind valueKind() {
return valueKind;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,15 @@ static DataFileMeta create(

SimpleStats valueStats();

/**
* Minimum sequence number of records in this file. When {@code sequence.snapshot-ordering} is
* enabled for a primary-key table, this field is repurposed to carry the commit snapshot id
* instead of the per-record sequence number range (the snapshot id is stamped into it at commit
* time by {@code FileStoreCommitImpl}).
*/
long minSequenceNumber();

/** @see #minSequenceNumber() */
long maxSequenceNumber();

long schemaId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,26 @@ public class KeyValueDataFileRecordReader implements FileRecordReader<KeyValue>
private final FileRecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
private final boolean overrideSequenceWithSnapshotId;
private final long snapshotId;

public KeyValueDataFileRecordReader(
FileRecordReader<InternalRow> reader, RowType keyType, RowType valueType, int level) {
this(reader, keyType, valueType, level, false, KeyValue.UNKNOWN_SEQUENCE);
}

public KeyValueDataFileRecordReader(
FileRecordReader<InternalRow> reader,
RowType keyType,
RowType valueType,
int level,
boolean overrideSequenceWithSnapshotId,
long snapshotId) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.overrideSequenceWithSnapshotId = overrideSequenceWithSnapshotId;
this.snapshotId = snapshotId;
}

@Nullable
Expand All @@ -53,10 +67,29 @@ public FileRecordIterator<KeyValue> readBatch() throws IOException {
}

return iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
internalRow -> {
if (internalRow == null) {
return null;
}
KeyValue kv = serializer.fromRow(internalRow).setLevel(level);
// When snapshot-ordering is enabled, an APPEND file's per-record sequence
// numbers are replaced with the commit snapshot id so that later snapshots
// always win during merge. COMPACT files are left untouched: their records
// already carry the snapshot id in the per-record _SEQUENCE_NUMBER column.
//
// CAUTION: in this mode the per-record sequence number physically stored in
// an APPEND file is NOT trustworthy and must not be relied upon. The writer
// seeds its sequence counter from the file-level maxSequenceNumber (which has
// been stamped to the snapshot id), so the on-disk per-record values can be
// small and non-monotonic across snapshots. Correct ordering is established
// here, by this override; any future read path that bypasses this override
// would order APPEND records by the stale on-disk values and break snapshot
// ordering.
if (overrideSequenceWithSnapshotId) {
kv.setSequenceNumber(snapshotId);
}
return kv;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.format.OrcFormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.FileRecordReader;
Expand All @@ -42,6 +43,7 @@
import org.apache.paimon.utils.AsyncRecordReader;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;

Expand All @@ -68,6 +70,7 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {
private final long asyncThreshold;
private final boolean ignoreCorruptFiles;
private final boolean ignoreLostFiles;
private final boolean snapshotSequenceOrdering;
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
Expand All @@ -93,6 +96,7 @@ protected KeyValueFileReaderFactory(
this.asyncThreshold = coreOptions.fileReaderAsyncThreshold().getBytes();
this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
this.snapshotSequenceOrdering = coreOptions.snapshotSequenceOrdering();
this.partition = partition;
this.formatReaderMappings = new HashMap<>();
this.dvFactory = dvFactory;
Expand Down Expand Up @@ -168,7 +172,28 @@ private FileRecordReader<KeyValue> createRecordReader(
new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level());
// When snapshot-ordering is enabled, FileStoreCommitImpl stamps the commit snapshot id
// into minSequenceNumber for APPEND files. We override each record's sequence number with
// that snapshot id so that records from later snapshots always win during merge. COMPACT
// files are left untouched: their per-record _SEQUENCE_NUMBER already carries the snapshot
// id (transitively, the input records were read through this same override path), so the
// file-level minSequenceNumber already reflects the correct snapshot id range.
boolean overrideSequenceWithSnapshotId = false;
if (snapshotSequenceOrdering) {
Preconditions.checkState(
file.fileSource().isPresent(),
"sequence.snapshot-ordering requires data files with fileSource metadata. "
+ "This option is only safe for newly-created tables or empty tables. "
+ "Legacy files without fileSource cannot be ordered by commit snapshot id.");
overrideSequenceWithSnapshotId = file.fileSource().get() == FileSource.APPEND;
}
return new KeyValueDataFileRecordReader(
fileRecordReader,
keyType,
valueType,
file.level(),
overrideSequenceWithSnapshotId,
file.minSequenceNumber());
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
private boolean closeCompactExecutorWhenLeaving = true;
private boolean ignorePreviousFiles = false;
private boolean ignoreNumBucketCheck = false;
private final boolean compactOnly;

protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
Expand Down Expand Up @@ -123,6 +124,7 @@ protected AbstractFileStoreWrite(
this.tableName = tableName;
this.writerNumberMax = options.writeMaxWritersToSpill();
this.legacyPartitionName = options.legacyPartitionName();
this.compactOnly = options.snapshotSequenceOrdering() && !options.writeOnly();
this.partitionTimestampValidator =
PartitionTimestampValidator.create(options, partitionType);
}
Expand Down Expand Up @@ -167,6 +169,15 @@ public void withCompactExecutor(ExecutorService compactExecutor) {

@Override
public void write(BinaryRow partition, int bucket, T data) throws Exception {
if (compactOnly) {
throw new IllegalStateException(
CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key()
+ " is enabled with "
+ CoreOptions.WRITE_ONLY.key()
+ " = false. This writer only allows compaction, not data writes. "
+ "Inline compaction is not supported because snapshot ids are "
+ "assigned only after commit.");
}
WriterContainer<T> container = getWriterWrapper(partition, bucket);
container.writer.write(data);
if (container.dynamicBucketMaintainer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,10 @@ CommitResult tryCommitOnce(
deltaFiles = assigned.assignedEntries;
}

if (options.snapshotSequenceOrdering()) {
deltaFiles = stampSequenceWithSnapshotId(newSnapshotId, commitKind, deltaFiles);
Comment thread
JunRuiLee marked this conversation as resolved.
}

// the added records subtract the deleted records from
long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;
Expand Down Expand Up @@ -1265,4 +1269,46 @@ public void close() {
IOUtils.closeAllQuietly(commitCallbacks);
IOUtils.closeQuietly(snapshotCommit);
}

/**
* When {@code sequence.snapshot-ordering} is enabled, we stamp the commit snapshot id into
* {@link DataFileMeta#minSequenceNumber()} and {@link DataFileMeta#maxSequenceNumber()} at file
* level. This avoids adding a new field to DataFileMeta and follows the same pattern used by
* row-tracking tables (see {@link RowTrackingCommitUtils#assignRowTracking}). At read time,
* {@code KeyValueFileReaderFactory} reads the snapshot id from {@code minSequenceNumber} and
* overrides each record's {@code _SEQUENCE_NUMBER} with it, so the sort-merge readers compare
* by snapshot id directly and records from later snapshots always win.
*
* <p>For {@link CommitKind#COMPACT} commits, the compaction input records were read through the
* same override path, so their per-record {@code _SEQUENCE_NUMBER} already carries the snapshot
* id; the merged output keeps it, and the file-level min/maxSequenceNumber (tracked by the
* writer from per-record values) already reflects the correct snapshot id range. We return the
* files unchanged.
*
* <p>Because all records of one snapshot share a single snapshot id, the relative order of
* records <i>within</i> the same snapshot is intentionally not preserved. We deliberately do
* not handle the case where the same key lands in multiple files of one snapshot: the default
* writer is spillable ({@code write-buffer-spillable=true}), so a commit's writes pass through
* the merge function and collapse to at most one record per key before being flushed. That
* covers the common case. The degenerate case (same key in several files of one snapshot with
* equal sequence numbers) only arises with {@code write-buffer-spillable=false} or when spilled
* data exceeds {@code write-buffer-spill-disk-size}; it affects only intra-snapshot order and
* never the cross-snapshot ordering this feature targets, so it is accepted as a documented
* limitation.
*/
private static List<ManifestEntry> stampSequenceWithSnapshotId(
long snapshotId, CommitKind commitKind, List<ManifestEntry> files) {
if (commitKind == CommitKind.COMPACT) {
return files;
}
List<ManifestEntry> result = new ArrayList<>(files.size());
for (ManifestEntry entry : files) {
if (entry.kind() == FileKind.ADD) {
result.add(entry.assignSequenceNumber(snapshotId, snapshotId));
} else {
result.add(entry);
}
}
return result;
}
}
Loading
Loading