diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md
index dc4f1fedaf46..cc462d4bda74 100644
--- a/docs/docs/flink/procedures.md
+++ b/docs/docs/flink/procedures.md
@@ -867,7 +867,7 @@ All available procedures are listed below.
CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` => bucket_num, `partition` => 'partition', `scan_parallelism` => scan_parallelism, `sink_parallelism` => sink_parallelism)
- Rescale one partition of a table. Arguments:
+ Rescale one partition of a table. For partitioned tables, different partitions can have different bucket counts after rescaling. Arguments:
table: The target table identifier. Cannot be empty.
bucket_num: Resulting bucket number after rescale. The default value of argument bucket_num is the current bucket number of the table. Cannot be empty for postpone bucket tables.
partition: What partition to rescale. For partitioned table this argument cannot be empty.
diff --git a/docs/docs/maintenance/rescale-bucket.md b/docs/docs/maintenance/rescale-bucket.md
index 1a304525345e..a814f28a60a2 100644
--- a/docs/docs/maintenance/rescale-bucket.md
+++ b/docs/docs/maintenance/rescale-bucket.md
@@ -45,14 +45,17 @@ Please note that
- `ALTER TABLE` only modifies the table's metadata and will **NOT** reorganize or reformat existing data.
Reorganize existing data must be achieved by `INSERT OVERWRITE`.
- Rescale bucket number does not influence the read and running write jobs.
-- Once the bucket number is changed, any newly scheduled `INSERT INTO` jobs which write to without-reorganized
- existing table/partition will throw a `TableException` with message like
+- **Partitioned tables** support per-partition bucket counts. Each partition retains its own bucket
+ count from its data files, and the new bucket count only applies to newly created partitions or partitions that
+ have been reorganized with `INSERT OVERWRITE`.
+- **Unpartitioned tables** require a full rescale before writing. If you change the bucket number and attempt
+ to write without reorganizing the data first, a `RuntimeException` will be thrown:
```text
- Try to write table/partition ... with a new bucket num ...,
+ Try to write table with a new bucket num ...,
but the previous bucket num is ... Please switch to batch mode,
and perform INSERT OVERWRITE to rescale current data layout first.
```
-- For partitioned table, it is possible to have different bucket number for different partitions. *E.g.*
+- For partitioned tables, it is possible to have different bucket numbers for different partitions. *E.g.*
```sql
ALTER TABLE my_table SET ('bucket' = '4');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
@@ -62,7 +65,15 @@ Please note that
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02')
SELECT * FROM ...;
```
+ After these operations, partition `dt=2022-01-01` uses 4 buckets, `dt=2022-01-02` uses 8 buckets, and any
+ new partitions will use the latest table-level default (8 buckets in this case).
- During overwrite period, make sure there are no other jobs writing the same table/partition.
+- **Streaming jobs must be restarted after rescaling a partition.** The per-partition bucket mapping
+ is loaded once when the streaming job starts (from the manifest files at that point in time). If a
+ partition is rescaled while the streaming job is running, the job will continue routing rows using
+ the old bucket count for that partition, which can cause rows to land in wrong buckets and lead to
+ data correctness issues. The recommended workflow is: suspend the streaming job with a savepoint →
+ perform the rescale overwrite → restart from the savepoint.
## Use Case
@@ -121,8 +132,12 @@ and the job's latency keeps increasing. To improve the data freshness, users can
-- scaling out
ALTER TABLE verified_orders SET ('bucket' = '32');
```
-- Switch to the batch mode and overwrite the current partition(s) to which the streaming job is writing
+- Use the `rescale` procedure or switch to batch mode and overwrite the partition(s) that need rescaling
```sql
+ -- Option 1: Use the rescale procedure (recommended)
+ CALL sys.rescale(`table` => 'default.verified_orders', `bucket_num` => 32, `partition` => 'dt=2022-06-22');
+
+ -- Option 2: Manual batch overwrite
SET 'execution.runtime-mode' = 'batch';
-- suppose today is 2022-06-22
-- case 1: there is no late event which updates the historical partitions, thus overwrite today's partition is enough
@@ -142,8 +157,11 @@ and the job's latency keeps increasing. To improve the data freshness, users can
FROM verified_orders
WHERE dt IN ('2022-06-20', '2022-06-21', '2022-06-22');
```
-- After overwrite job has finished, switch back to streaming mode. And now, the parallelism can be increased alongside with bucket number to restore the streaming job from the savepoint
-( see [Start a SQL Job from a savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint) )
+- After the overwrite job has finished, switch back to streaming mode. The parallelism can be increased alongside
+ the bucket number to restore the streaming job from the savepoint
+ ( see [Start a SQL Job from a savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint) ).
+ Note that for partitioned tables, each partition retains its own bucket count, so only the rescaled partitions
+ are affected.
```sql
SET 'execution.runtime-mode' = 'streaming';
SET 'execution.savepoint.path' = ;
diff --git a/docs/docs/primary-key-table/data-distribution.md b/docs/docs/primary-key-table/data-distribution.md
index 95e8c01aa6c9..643a287c0c27 100644
--- a/docs/docs/primary-key-table/data-distribution.md
+++ b/docs/docs/primary-key-table/data-distribution.md
@@ -24,7 +24,7 @@ under the License.
# Data Distribution
-A bucket is the smallest storage unit for reads and writes, each bucket directory contains an [LSM tree](./#lsm-trees).
+A bucket is the smallest storage unit for reads and writes, each bucket directory contains an [LSM tree](./overview#lsm-trees).
## Fixed Bucket
@@ -34,6 +34,9 @@ the bucket of record.
Rescaling buckets can only be done through offline processes, see [Rescale Bucket](../maintenance/rescale-bucket).
A too large number of buckets leads to too many small files, and a too small number of buckets leads to poor write performance.
+For partitioned tables, each partition can have its own bucket count. After a rescale operation, existing
+partitions retain their original bucket count while newly created partitions use the updated table-level default.
+
## Dynamic Bucket
Default mode for primary key table, or configure `'bucket' = '-1'`.
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
index d162234cd58a..b34fa528f05b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -83,13 +83,27 @@ public int totalBuckets() {
}
public PartitionEntry merge(PartitionEntry entry) {
+ PartitionEntry newer = entry.lastFileCreationTime >= lastFileCreationTime ? entry : this;
+ PartitionEntry older = newer == entry ? this : entry;
+
+ // Use the totalBuckets from the most recently created file. This correctly handles
+ // the case where a partition has been overwritten with a different bucket count: the
+ // newer files carry the new totalBuckets, and their creation time is always later.
+ // When timestamps are equal (e.g., two files written in the same millisecond with
+ // different bucket counts), we take the larger totalBuckets value. This makes merge
+ // commutative and associative — a.merge(b) == b.merge(a)
+ int newTotalBuckets =
+ newer.lastFileCreationTime == older.lastFileCreationTime
+ ? Math.max(newer.totalBuckets, older.totalBuckets)
+ : newer.totalBuckets;
+
return new PartitionEntry(
partition,
recordCount + entry.recordCount,
fileSizeInBytes + entry.fileSizeInBytes,
fileCount + entry.fileCount,
- Math.max(lastFileCreationTime, entry.lastFileCreationTime),
- entry.totalBuckets);
+ newer.lastFileCreationTime,
+ newTotalBuckets);
}
public Partition toPartition(InternalRowPartitionComputer computer) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index a49fff06d0ce..66a6ed76ce72 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -508,20 +508,28 @@ private RestoreFiles scanExistingFileMetas(BinaryRow partition, int bucket) {
totalBuckets = restoredTotalBuckets;
}
if (!ignoreNumBucketCheck && totalBuckets != numBuckets) {
- String partInfo =
- partitionType.getFieldCount() > 0
- ? "partition "
- + getPartitionComputer(
- partitionType,
- PARTITION_DEFAULT_NAME.defaultValue(),
- legacyPartitionName)
- .generatePartValues(partition)
- : "table";
- throw new RuntimeException(
- String.format(
- "Try to write %s with a new bucket num %d, but the previous bucket num is %d. "
- + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
- partInfo, numBuckets, totalBuckets));
+ if (partitionType.getFieldCount() > 0) {
+ // For partitioned tables, allow per-partition bucket counts.
+ // The partition's existing bucket count takes precedence over the
+ // table-level default. This supports rescale operations where different
+ // partitions may have different bucket counts.
+ LOG.info(
+ "Partition {} uses {} buckets (table default: {}). "
+ + "Accepting per-partition bucket count.",
+ getPartitionComputer(
+ partitionType,
+ PARTITION_DEFAULT_NAME.defaultValue(),
+ legacyPartitionName)
+ .generatePartValues(partition),
+ totalBuckets,
+ numBuckets);
+ } else {
+ throw new RuntimeException(
+ String.format(
+ "Try to write table with a new bucket num %d, but the previous bucket num is %d. "
+ + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
+ numBuckets, totalBuckets));
+ }
}
return restored;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java
index e7faf2a24569..3548123bcd68 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java
@@ -25,9 +25,9 @@
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.utils.SnapshotManager;
-import java.util.ArrayList;
import java.util.List;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
@@ -38,6 +38,7 @@ public class FileSystemWriteRestore implements WriteRestore {
private final SnapshotManager snapshotManager;
private final FileStoreScan scan;
private final IndexFileHandler indexFileHandler;
+ private final PartitionBucketMapping partitionBucketMapping;
public FileSystemWriteRestore(
CoreOptions options,
@@ -52,6 +53,7 @@ public FileSystemWriteRestore(
this.scan.dropStats();
}
}
+ this.partitionBucketMapping = PartitionBucketMapping.loadFromScan(scan, options.bucket());
}
@Override
@@ -75,10 +77,12 @@ public RestoreFiles restoreFiles(
return RestoreFiles.empty();
}
- List restoreFiles = new ArrayList<>();
List entries =
scan.withSnapshot(snapshot).withPartitionBucket(partition, bucket).plan().files();
- Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles);
+ List restoreFiles = WriteRestore.extractDataFiles(entries);
+
+ Integer totalBuckets =
+ WriteRestore.extractTotalBuckets(entries, partition, partitionBucketMapping);
IndexFileMeta dynamicBucketIndex = null;
if (scanDynamicBucketIndex) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java b/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java
index 5d4e335571d1..b5b963f62e7d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java
@@ -21,9 +21,11 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
/** Restore for write to restore data files by partition and bucket from file system. */
@@ -37,9 +39,44 @@ RestoreFiles restoreFiles(
boolean scanDynamicBucketIndex,
boolean scanDeleteVectorsIndex);
+ /**
+ * Resolves the {@code totalBuckets} for a (partition, bucket) pair given the manifest entries
+ * for that bucket and the table's partition-bucket mapping.
+ *
+ *
+ * - Non-empty bucket: use the value stamped on the existing data files so that
+ * committer-side bucket-count mismatch detection (e.g. rescale-without-overwrite) still
+ * fires.
+ *
- Empty bucket on a partitioned table: look up the per-partition override in {@code
+ * mapping}; returns {@code null} if the partition uses the table default.
+ *
- Empty bucket on an unpartitioned table: returns {@code null} so the write path falls
+ * back to {@code numBuckets} and the committer-side check still fires.
+ *
+ */
@Nullable
- static Integer extractDataFiles(List entries, List dataFiles) {
+ static Integer extractTotalBuckets(
+ List entries, BinaryRow partition, PartitionBucketMapping mapping) {
+ if (!entries.isEmpty()) {
+ return entries.get(0).totalBuckets();
+ }
+ if (partition.getFieldCount() > 0) {
+ return mapping.resolveNumBuckets(partition);
+ }
+ return null;
+ }
+
+ /**
+ * Extracts the {@link DataFileMeta} list from the given manifest entries, validating that all
+ * entries agree on {@code totalBuckets}.
+ *
+ * @param entries manifest entries for a single (partition, bucket) pair
+ * @return the list of data files; empty if {@code entries} is empty
+ * @throws RuntimeException if entries carry inconsistent {@code totalBuckets} values, which
+ * indicates a corrupted manifest
+ */
+ static List extractDataFiles(List entries) {
Integer totalBuckets = null;
+ List dataFiles = new ArrayList<>();
for (ManifestEntry entry : entries) {
if (totalBuckets != null && totalBuckets != entry.totalBuckets()) {
throw new RuntimeException(
@@ -50,6 +87,6 @@ static Integer extractDataFiles(List entries, List
totalBuckets = entry.totalBuckets();
dataFiles.add(entry.file());
}
- return totalBuckets;
+ return dataFiles;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 35dfd308df0a..831bfb44eedf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -27,6 +27,7 @@
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.WriteSelector;
@@ -258,6 +259,13 @@ public TableWriteImpl> newWrite(String commitUser, @Nullable Integer writeId)
return wrapped.newWrite(commitUser, writeId);
}
+ @Override
+ public TableWriteImpl> newWrite(
+ String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newWrite(commitUser, writeId, rowKeyExtractor);
+ }
+
@Override
public TableCommitImpl newCommit(String commitUser) {
privilegeChecker.assertCanInsert(identifier);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index df4224915927..03f73e8a5a34 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -40,6 +40,7 @@
import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketWriteSelector;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.table.sink.PostponeBucketRowKeyExtractor;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.RowKindGenerator;
@@ -230,7 +231,9 @@ public Optional statistics() {
public Optional newWriteSelector() {
switch (bucketMode()) {
case HASH_FIXED:
- return Optional.of(new FixedBucketWriteSelector(schema()));
+ return Optional.of(
+ new FixedBucketWriteSelector(
+ schema(), PartitionBucketMapping.loadFromTable(this)));
case BUCKET_UNAWARE:
case POSTPONE_MODE:
return Optional.empty();
@@ -258,7 +261,8 @@ protected CatalogEnvironment newCatalogEnvironment(String branch) {
public RowKeyExtractor createRowKeyExtractor() {
switch (bucketMode()) {
case HASH_FIXED:
- return new FixedBucketRowKeyExtractor(schema());
+ return new FixedBucketRowKeyExtractor(
+ schema(), PartitionBucketMapping.loadFromTable(this));
case HASH_DYNAMIC:
case KEY_DYNAMIC:
return new DynamicBucketRowKeyExtractor(schema());
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index d65c84fd5e65..f2f5280c3211 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -29,6 +29,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.AppendOnlySplitGenerator;
import org.apache.paimon.table.source.AppendTableRead;
@@ -140,11 +141,17 @@ public TableWriteImpl newWrite(String commitUser) {
@Override
public TableWriteImpl newWrite(String commitUser, @Nullable Integer writeId) {
+ return newWrite(commitUser, writeId, createRowKeyExtractor());
+ }
+
+ @Override
+ public TableWriteImpl newWrite(
+ String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
BaseAppendFileStoreWrite writer = store().newWrite(commitUser, writeId);
return new TableWriteImpl<>(
rowType(),
writer,
- createRowKeyExtractor(),
+ rowKeyExtractor,
(record, rowKind) -> {
Preconditions.checkState(
rowKind.isAdd(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 7ba4bc20a9d7..76f222719db3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -340,6 +340,12 @@ public TableWriteImpl> newWrite(String commitUser, @Nullable Integer writeId)
return wrapped.newWrite(commitUser, writeId);
}
+ @Override
+ public TableWriteImpl> newWrite(
+ String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
+ return wrapped.newWrite(commitUser, writeId, rowKeyExtractor);
+ }
+
@Override
public TableCommitImpl newCommit(String commitUser) {
return wrapped.newCommit(commitUser);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index b07465a25828..0b284bf5dd4d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -121,6 +121,13 @@ default Optional comment() {
TableWriteImpl> newWrite(String commitUser, @Nullable Integer writeId);
+ /**
+ * Create a new write with a custom {@link RowKeyExtractor}. This is useful for scenarios like
+ * rescaling where the bucket assignment logic needs to be overridden.
+ */
+ TableWriteImpl> newWrite(
+ String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor);
+
@Override
TableCommitImpl newCommit(String commitUser);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index a2fee49bfb88..b68e5e48c8f2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -32,6 +32,7 @@
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.KeyValueTableRead;
@@ -157,11 +158,18 @@ public TableWriteImpl newWrite(String commitUser) {
@Override
public TableWriteImpl newWrite(String commitUser, @Nullable Integer writeId) {
+ return newWrite(commitUser, writeId, createRowKeyExtractor());
+ }
+
+ @Override
+ public TableWriteImpl newWrite(
+ String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
+
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
rowType(),
store().newWrite(commitUser, writeId),
- createRowKeyExtractor(),
+ rowKeyExtractor,
(record, rowKind) ->
kv.replace(
record.primaryKey(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/SchemaBucketFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/SchemaBucketFileStoreTable.java
new file mode 100644
index 000000000000..e730c8ba505e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/SchemaBucketFileStoreTable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
+import org.apache.paimon.table.sink.FixedBucketWriteSelector;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
+import org.apache.paimon.table.sink.RowKeyExtractor;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.sink.WriteSelector;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A {@link FileStoreTable} wrapper that uses the schema number of buckets assign writes instead of
+ * using the number of buckets defined in each partition. Useful for postpone buckets, overrides and
+ * rescales.
+ */
+public class SchemaBucketFileStoreTable extends DelegatedFileStoreTable {
+
+ public SchemaBucketFileStoreTable(FileStoreTable wrapped) {
+ super(wrapped);
+ }
+
+ @Override
+ public Optional newWriteSelector() {
+ return Optional.of(
+ new FixedBucketWriteSelector(
+ schema(), new PartitionBucketMapping(schema().numBuckets())));
+ }
+
+ @Override
+ public RowKeyExtractor createRowKeyExtractor() {
+ return new FixedBucketRowKeyExtractor(
+ schema(), new PartitionBucketMapping(schema().numBuckets()));
+ }
+
+ @Override
+ public TableWriteImpl> newWrite(String commitUser) {
+ return newWrite(commitUser, null);
+ }
+
+ @Override
+ public TableWriteImpl> newWrite(String commitUser, @Nullable Integer writeId) {
+ return wrapped().newWrite(commitUser, writeId, createRowKeyExtractor());
+ }
+
+ @Override
+ public TableWriteImpl> newWrite(
+ String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
+ // Always use the schema-bucket-based extractor; ignore the caller-supplied extractor
+ // to ensure consistent per-partition bucket routing even when called via the 3-arg form.
+ return wrapped().newWrite(commitUser, writeId, createRowKeyExtractor());
+ }
+
+ @Override
+ public FileStoreTable copy(Map dynamicOptions) {
+ return new SchemaBucketFileStoreTable(wrapped().copy(dynamicOptions));
+ }
+
+ @Override
+ public FileStoreTable copy(TableSchema newTableSchema) {
+ return new SchemaBucketFileStoreTable(wrapped().copy(newTableSchema));
+ }
+
+ @Override
+ public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) {
+ return new SchemaBucketFileStoreTable(wrapped().copyWithoutTimeTravel(dynamicOptions));
+ }
+
+ @Override
+ public FileStoreTable copyWithLatestSchema() {
+ return new SchemaBucketFileStoreTable(wrapped().copyWithLatestSchema());
+ }
+
+ @Override
+ public FileStoreTable switchToBranch(String branchName) {
+ return new SchemaBucketFileStoreTable(wrapped().switchToBranch(branchName));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
index 146a45b43713..aaf677eb48b4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
@@ -29,24 +29,27 @@
/** {@link KeyAndBucketExtractor} for {@link InternalRow}. */
public class FixedBucketRowKeyExtractor extends RowKeyExtractor {
- private final int numBuckets;
+ private transient Projection bucketKeyProjection;
+
private final boolean sameBucketKeyAndTrimmedPrimaryKey;
- private final Projection bucketKeyProjection;
+ private final PartitionBucketMapping partitionBucketMapping;
private BinaryRow reuseBucketKey;
private Integer reuseBucket;
private final BucketFunction bucketFunction;
- public FixedBucketRowKeyExtractor(TableSchema schema) {
+ public FixedBucketRowKeyExtractor(
+ TableSchema schema, PartitionBucketMapping partitionBucketMapping) {
super(schema);
- numBuckets = new CoreOptions(schema.options()).bucket();
- bucketFunction =
- BucketFunction.create(
- new CoreOptions(schema.options()), schema.logicalBucketKeyType());
- sameBucketKeyAndTrimmedPrimaryKey = schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
- bucketKeyProjection =
- CodeGenUtils.newProjection(
- schema.logicalRowType(), schema.projection(schema.bucketKeys()));
+ this.bucketFunction = createBucketFunction(schema);
+ this.sameBucketKeyAndTrimmedPrimaryKey =
+ schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
+ this.partitionBucketMapping = partitionBucketMapping;
+ }
+
+ private static BucketFunction createBucketFunction(TableSchema schema) {
+ return BucketFunction.create(
+ new CoreOptions(schema.options()), schema.logicalBucketKeyType());
}
@Override
@@ -62,7 +65,7 @@ private BinaryRow bucketKey() {
}
if (reuseBucketKey == null) {
- reuseBucketKey = bucketKeyProjection.apply(record);
+ reuseBucketKey = bucketKeyProjection().apply(record);
}
return reuseBucketKey;
}
@@ -70,6 +73,7 @@ private BinaryRow bucketKey() {
@Override
public int bucket() {
if (reuseBucket == null) {
+ int numBuckets = partitionBucketMapping.resolveNumBuckets(partition());
reuseBucket = bucket(numBuckets);
}
return reuseBucket;
@@ -78,4 +82,13 @@ public int bucket() {
public int bucket(int numBuckets) {
return bucketFunction.bucket(bucketKey(), numBuckets);
}
+
+ private Projection bucketKeyProjection() {
+ if (bucketKeyProjection == null) {
+ bucketKeyProjection =
+ CodeGenUtils.newProjection(
+ schema.logicalRowType(), schema.projection(schema.bucketKeys()));
+ }
+ return bucketKeyProjection;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketWriteSelector.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketWriteSelector.java
index e08841dd8cd3..a53bd7efb40c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketWriteSelector.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketWriteSelector.java
@@ -28,17 +28,20 @@ public class FixedBucketWriteSelector implements WriteSelector {
private static final long serialVersionUID = 1L;
private final TableSchema schema;
+ private final PartitionBucketMapping partitionBucketMapping;
private transient KeyAndBucketExtractor extractor;
- public FixedBucketWriteSelector(TableSchema schema) {
+ public FixedBucketWriteSelector(
+ TableSchema schema, PartitionBucketMapping partitionBucketMapping) {
this.schema = schema;
+ this.partitionBucketMapping = partitionBucketMapping;
}
@Override
public int select(InternalRow row, int numWriters) {
if (extractor == null) {
- extractor = new FixedBucketRowKeyExtractor(schema);
+ extractor = new FixedBucketRowKeyExtractor(schema, partitionBucketMapping);
}
extractor.setRecord(row);
return ChannelComputer.select(extractor.partition(), extractor.bucket(), numWriters);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/PartitionBucketMapping.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/PartitionBucketMapping.java
new file mode 100644
index 000000000000..5e39c29a998c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/PartitionBucketMapping.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A mapping that resolves the number of buckets for each partition in a table.
+ *
+ * Different partitions may have different bucket counts (e.g., after a rescale operation). This
+ * class maintains a per-partition bucket count mapping and falls back to a default bucket count for
+ * partitions that are not explicitly mapped.
+ *
+ * This is used by components such as {@link FixedBucketRowKeyExtractor} and {@link
+ * FixedBucketWriteSelector} to correctly determine the bucket assignment for rows in tables where
+ * partitions may have been rescaled independently.
+ *
+ * @see #loadFromTable(FileStoreTable)
+ * @see #resolveNumBuckets(BinaryRow)
+ */
+public class PartitionBucketMapping implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The default number of buckets, used when a partition has no explicit mapping. */
+ private final int defaultBucketCount;
+
+ /** A map from partition to its specific bucket count. May be empty but never {@code null}. */
+ private final Map partitionBucketMap;
+
+ /**
+ * Creates a mapping with only a default bucket count and no per-partition overrides.
+ *
+ * @param defaultBucketCount the default number of buckets for all partitions
+ */
+ public PartitionBucketMapping(int defaultBucketCount) {
+ this(defaultBucketCount, Collections.emptyMap());
+ }
+
+ /**
+ * Creates a mapping with a default bucket count and an explicit per-partition bucket map.
+ *
+ * @param defaultBucketCount the default number of buckets, used as a fallback
+ * @param partitionBucketMap a map from partition (as {@link BinaryRow}) to its bucket count
+ */
+ public PartitionBucketMapping(
+ int defaultBucketCount, Map partitionBucketMap) {
+ this.defaultBucketCount = defaultBucketCount;
+ this.partitionBucketMap = partitionBucketMap;
+ }
+
+ /**
+ * Loads a {@link PartitionBucketMapping} by scanning the manifest entries of the given table.
+ *
+ * For non-partitioned tables, this returns a mapping with only the schema-defined default
+ * bucket count and an empty partition map.
+ *
+ * For partitioned tables, the method reads {@link
+ * org.apache.paimon.manifest.PartitionEntry}s, which aggregate manifest entries per partition
+ * during the scan and therefore have a much smaller memory footprint than loading all data file
+ * entries. Any scan failure is propagated to the caller.
+ *
+ * @param table the {@link FileStoreTable} to load the mapping from
+ * @return a {@link PartitionBucketMapping} reflecting the current bucket layout of the table
+ */
+ public static PartitionBucketMapping loadFromTable(FileStoreTable table) {
+ int defaultBuckets = table.schema().numBuckets();
+ if (table.partitionKeys().isEmpty()) {
+ return new PartitionBucketMapping(defaultBuckets, Collections.emptyMap());
+ }
+ return loadFromScan(table.store().newScan(), defaultBuckets);
+ }
+
+ public static PartitionBucketMapping loadFromScan(FileStoreScan scan, int defaultBuckets) {
+ if (scan == null) {
+ return new PartitionBucketMapping(defaultBuckets, Collections.emptyMap());
+ }
+ List partitionEntries = scan.readPartitionEntries();
+ Map partitionBucketMap = new HashMap<>();
+ for (PartitionEntry entry : partitionEntries) {
+ int totalBuckets = entry.totalBuckets();
+ // Only store partitions whose bucket count differs from the default.
+ // This keeps the map empty for partitions that have never been rescaled,
+ // avoiding per-partition BinaryRow copies and Integer allocations entirely.
+ if (totalBuckets > 0 && totalBuckets != defaultBuckets) {
+ partitionBucketMap.put(entry.partition().copy(), totalBuckets);
+ }
+ }
+ return new PartitionBucketMapping(defaultBuckets, partitionBucketMap);
+ }
+
+ /**
+ * Resolves the number of buckets for the given partition.
+ *
+ * If the partition has an explicit entry in the partition-to-bucket map, that value is
+ * returned. Otherwise, the default bucket count is returned.
+ *
+ * @param partition the partition key as a {@link BinaryRow}
+ * @return the number of buckets for the given partition
+ */
+ public int resolveNumBuckets(BinaryRow partition) {
+ if (partitionBucketMap != null) {
+ Integer partitionBucketCount = partitionBucketMap.get(partition);
+ if (partitionBucketCount != null) {
+ return partitionBucketCount;
+ }
+ }
+ return defaultBucketCount;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKeyExtractor.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKeyExtractor.java
index 455aaa4aa5e9..697734ca10b1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKeyExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKeyExtractor.java
@@ -22,18 +22,23 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
+import java.io.Serializable;
+
/** {@link KeyAndBucketExtractor} for {@link InternalRow}. */
-public abstract class RowKeyExtractor implements KeyAndBucketExtractor {
+public abstract class RowKeyExtractor implements KeyAndBucketExtractor, Serializable {
+
+ private static final long serialVersionUID = 1L;
- private final RowPartitionKeyExtractor partitionKeyExtractor;
+ private transient RowPartitionKeyExtractor partitionKeyExtractor;
+ protected final TableSchema schema;
protected InternalRow record;
private BinaryRow partition;
private BinaryRow trimmedPrimaryKey;
public RowKeyExtractor(TableSchema schema) {
- this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
+ this.schema = schema;
}
@Override
@@ -46,7 +51,7 @@ public void setRecord(InternalRow record) {
@Override
public BinaryRow partition() {
if (partition == null) {
- partition = partitionKeyExtractor.partition(record);
+ partition = partitionKeyExtractor().partition(record);
}
return partition;
}
@@ -54,8 +59,15 @@ public BinaryRow partition() {
@Override
public BinaryRow trimmedPrimaryKey() {
if (trimmedPrimaryKey == null) {
- trimmedPrimaryKey = partitionKeyExtractor.trimmedPrimaryKey(record);
+ trimmedPrimaryKey = partitionKeyExtractor().trimmedPrimaryKey(record);
}
return trimmedPrimaryKey;
}
+
+ private RowPartitionKeyExtractor partitionKeyExtractor() {
+ if (partitionKeyExtractor == null) {
+ partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
+ }
+ return partitionKeyExtractor;
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/PartitionEntryTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/PartitionEntryTest.java
new file mode 100644
index 000000000000..8cc07f65d024
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/PartitionEntryTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PartitionEntry#merge(PartitionEntry)}. */
+public class PartitionEntryTest {
+
+ private static final BinaryRow PARTITION = BinaryRow.EMPTY_ROW;
+
+ /**
+ * Creates a PartitionEntry with the given fileCount, totalBuckets, and creation time.
+ * recordCount and fileSizeInBytes are set to fileCount for simplicity.
+ */
+ private static PartitionEntry entry(long fileCount, int totalBuckets, long creationTime) {
+ return new PartitionEntry(
+ PARTITION, fileCount, fileCount, fileCount, creationTime, totalBuckets);
+ }
+
+ // -------------------------------------------------------------------------
+ // Tests for totalBuckets selection based on lastFileCreationTime
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testMergeTakesTotalBucketsFromNewerEntry() {
+ // Old files (2 buckets, earlier creation time) merged with new files (4 buckets, later).
+ // totalBuckets should come from the newer entry.
+ PartitionEntry old = entry(3, 2, 1000L);
+ PartitionEntry newer = entry(3, 4, 2000L);
+
+ PartitionEntry result = old.merge(newer);
+ assertThat(result.totalBuckets()).isEqualTo(4);
+ assertThat(result.lastFileCreationTime()).isEqualTo(2000L);
+ assertThat(result.fileCount()).isEqualTo(6);
+ }
+
+ @Test
+ public void testMergeOrderDoesNotAffectTotalBuckets() {
+ // Regardless of whether old.merge(newer) or newer.merge(old) is called,
+ // the result must always take totalBuckets from the entry with the later creation time.
+ PartitionEntry old = entry(3, 2, 1000L);
+ PartitionEntry newer = entry(3, 4, 2000L);
+
+ PartitionEntry result1 = old.merge(newer);
+ PartitionEntry result2 = newer.merge(old);
+
+ assertThat(result1.totalBuckets()).isEqualTo(4);
+ assertThat(result2.totalBuckets()).isEqualTo(4);
+ assertThat(result1.lastFileCreationTime()).isEqualTo(2000L);
+ assertThat(result2.lastFileCreationTime()).isEqualTo(2000L);
+ }
+
+ @Test
+ public void testMergeWithDeleteEntryPreservesNewerTotalBuckets() {
+ // Simulates the scenario after INSERT OVERWRITE with rescale:
+ // - original ADD entries (2 buckets, time=1000) still present in base manifest
+ // - DELETE entries for old files (2 buckets, time=1000) in delta manifest
+ // - new ADD entries (4 buckets, time=2000) in delta manifest
+ //
+ // The merged entry should have totalBuckets=4 (from the newest files).
+ PartitionEntry originalAdd = entry(3, 2, 1000L); // original ADD (base manifest)
+ PartitionEntry deleteOld = entry(-3, 2, 1000L); // DELETE old files (same timestamp)
+ PartitionEntry newAdd = entry(3, 4, 2000L); // new ADD after overwrite
+
+ // Simulate concurrent processing in any order (all 6 permutations produce same result)
+ PartitionEntry r1 = originalAdd.merge(deleteOld).merge(newAdd);
+ PartitionEntry r2 = originalAdd.merge(newAdd).merge(deleteOld);
+ PartitionEntry r3 = deleteOld.merge(originalAdd).merge(newAdd);
+ PartitionEntry r4 = deleteOld.merge(newAdd).merge(originalAdd);
+ PartitionEntry r5 = newAdd.merge(originalAdd).merge(deleteOld);
+ PartitionEntry r6 = newAdd.merge(deleteOld).merge(originalAdd);
+
+ for (PartitionEntry r : new PartitionEntry[] {r1, r2, r3, r4, r5, r6}) {
+ assertThat(r.totalBuckets())
+ .as("totalBuckets should be 4 regardless of merge order")
+ .isEqualTo(4);
+ assertThat(r.fileCount())
+ .as("net fileCount should be 3 (original 3 files remain visible)")
+ .isEqualTo(3);
+ assertThat(r.lastFileCreationTime()).isEqualTo(2000L);
+ }
+ }
+
+ @Test
+ public void testMergeWithEqualCreationTimeIsCommutative() {
+ // When creation times are equal, merge must be commutative: a.merge(b) == b.merge(a).
+ // The tie-break takes the larger totalBuckets so that the parallel, non-deterministic
+ // aggregation in readPartitionEntries() always produces the same result regardless of
+ // manifest processing order.
+ PartitionEntry a = entry(1, 2, 1000L);
+ PartitionEntry b = entry(1, 4, 1000L);
+
+ PartitionEntry ab = a.merge(b);
+ PartitionEntry ba = b.merge(a);
+
+ assertThat(ab.totalBuckets()).isEqualTo(4); // max(2, 4) = 4
+ assertThat(ba.totalBuckets()).isEqualTo(4); // max(4, 2) = 4, commutative
+ assertThat(ab.fileCount()).isEqualTo(2);
+ assertThat(ba.fileCount()).isEqualTo(2);
+ }
+
+ @Test
+ public void testMergeAggregatesCountsCorrectly() {
+ PartitionEntry a = entry(5, 4, 1000L);
+ PartitionEntry b = entry(3, 4, 2000L);
+
+ PartitionEntry result = a.merge(b);
+ assertThat(result.fileCount()).isEqualTo(8);
+ assertThat(result.recordCount()).isEqualTo(8);
+ assertThat(result.fileSizeInBytes()).isEqualTo(8);
+ assertThat(result.totalBuckets()).isEqualTo(4);
+ assertThat(result.lastFileCreationTime()).isEqualTo(2000L);
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 71eb081de89f..e3c31b85dc46 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -1080,6 +1080,48 @@ public void testCommitRetryAfterFalseSuccessDoesNotCleanManifest() throws Except
assertThat(store.readKvsFromSnapshot(latestSnapshot.id())).hasSize(1);
}
+ @Test
+ public void testBucketCountConsistencyValidation() throws Exception {
+ TestFileStore store = createStore(false);
+
+ // Commit initial data
+ List data = generateDataList(10);
+ store.commitData(data, gen::getPartition, kv -> 0);
+
+ // Re-commit the same data but with a different totalBuckets value.
+ // This simulates a stale writer that loaded an old bucket mapping.
+ assertThatThrownBy(
+ () ->
+ store.commitDataImpl(
+ data,
+ gen::getPartition,
+ kv -> 0,
+ false,
+ null,
+ null,
+ Collections.emptyList(),
+ (commit, committable) -> {
+ ManifestCommittable tampered =
+ new ManifestCommittable(
+ committable.identifier(),
+ committable.watermark());
+ for (CommitMessage msg :
+ committable.fileCommittables()) {
+ CommitMessageImpl impl = (CommitMessageImpl) msg;
+ tampered.addFileCommittable(
+ new CommitMessageImpl(
+ impl.partition(),
+ impl.bucket(),
+ 99,
+ impl.newFilesIncrement(),
+ impl.compactIncrement()));
+ }
+ commit.commit(tampered, true);
+ }))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("without overwrite");
+ }
+
private FileStoreCommitImpl newCommitWithSnapshotCommit(
TestFileStore store, String commitUser, SnapshotCommit snapshotCommit) {
String tableName = store.options().path().getName();
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileSystemWriteRestoreTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileSystemWriteRestoreTest.java
new file mode 100644
index 000000000000..16ae71f6cdaa
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileSystemWriteRestoreTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link FileSystemWriteRestore}, covering the {@code totalBuckets} resolution logic for
+ * both empty and non-empty buckets across partitioned and unpartitioned tables.
+ *
+ * When restoring files for a {@code (partition, bucket)} that has no existing data files, there
+ * are no manifest entries to derive {@code totalBuckets} from. For partitioned tables, {@link
+ * WriteRestore#extractTotalBuckets} falls back to {@link
+ * org.apache.paimon.table.sink.PartitionBucketMapping} to correctly return the per-partition bucket
+ * count (e.g. after a rescale). For unpartitioned tables, {@code null} is returned so the write
+ * path falls back to {@code numBuckets} and the committer-side mismatch check still fires.
+ */
+public class FileSystemWriteRestoreTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
+
+ @Test
+ public void testEmptyBucketUsesPartitionBucketMapping() throws Exception {
+ // Build a table with default bucket=4 and write data into partition 1.
+ // Some buckets within partition 1 will end up with files (bucket 0 OR
+ // bucket 1, depending on hash); the OTHER bucket will be empty. Then
+ // "rescale" the table-level default to 32 (without rewriting partition 1)
+ // and ask the WriteRestore for an empty bucket. It must return
+ // totalBuckets=4 (the partition's actual bucket count), NOT 32 (the new
+ // table default).
+ FileStoreTable table = createPartitionedPkTable(4);
+
+ // Write enough rows to populate at least one bucket within partition 1.
+ commitOneRow(table, /* pt */ 1, /* k */ 1);
+ commitOneRow(table, /* pt */ 1, /* k */ 2);
+
+ // Find an empty bucket in partition 1 by inspecting the existing files.
+ int emptyBucket = findEmptyBucket(table, 1, /* totalBuckets */ 4);
+
+ // Simulate a rescale by raising the table-level default bucket count
+ // (without rewriting existing files). Existing manifest entries still
+ // carry totalBuckets=4.
+ table = withBucket(table, 32);
+
+ WriteRestore restore = newWriteRestore(table);
+
+ RestoreFiles restored = restore.restoreFiles(binaryRow(1), emptyBucket, false, false);
+
+ assertThat(restored.totalBuckets())
+ .as(
+ "Empty (partition 1, bucket %d): totalBuckets must be inferred from "
+ + "PartitionBucketMapping (4), not the new table default (32).",
+ emptyBucket)
+ .isEqualTo(4);
+ assertThat(restored.dataFiles()).isNullOrEmpty();
+ }
+
+ @Test
+ public void testEmptyBucketInUnseenPartitionUsesDefault() throws Exception {
+ // For an entirely unseen partition (no files anywhere), no per-partition
+ // mapping exists and PartitionBucketMapping.resolveNumBuckets falls back to
+ // the table's default bucket count.
+ FileStoreTable table = createPartitionedPkTable(8);
+ commitOneRow(table, 1, 100); // ensures the snapshot exists
+
+ WriteRestore restore = newWriteRestore(table);
+ RestoreFiles restored = restore.restoreFiles(binaryRow(/* unseen */ 999), 0, false, false);
+
+ assertThat(restored.totalBuckets()).isEqualTo(8);
+ assertThat(restored.dataFiles()).isNullOrEmpty();
+ }
+
+ @Test
+ public void testNonEmptyBucketReportsManifestTotalBuckets() throws Exception {
+ // Sanity test: when a bucket has files, totalBuckets must come from the
+ // manifest entries (not from the fallback path). This guards against
+ // accidentally always overriding totalBuckets via PartitionBucketMapping.
+ FileStoreTable table = createPartitionedPkTable(2);
+ commitOneRow(table, 1, 1);
+ commitOneRow(table, 1, 2);
+
+ // Locate a non-empty bucket within partition 1.
+ int nonEmptyBucket = findNonEmptyBucket(table, 1, 2);
+
+ // Change the table default to ensure the returned totalBuckets is from the
+ // manifest entry, not the schema.
+ table = withBucket(table, 32);
+
+ WriteRestore restore = newWriteRestore(table);
+ RestoreFiles restored = restore.restoreFiles(binaryRow(1), nonEmptyBucket, false, false);
+
+ assertThat(restored.totalBuckets()).isEqualTo(2);
+ assertThat(restored.dataFiles()).isNotEmpty();
+ }
+
+ // ------------------------------------------------------------------------
+ // helpers
+ // ------------------------------------------------------------------------
+
+ private FileStoreTable createPartitionedPkTable(int bucket) throws Exception {
+ Path path = new Path(tempDir.toString());
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path.toString());
+ options.set(CoreOptions.BUCKET, bucket);
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), path),
+ new Schema(
+ ROW_TYPE.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ options.toMap(),
+ ""));
+
+ return FileStoreTableFactory.create(
+ LocalFileIO.create(), path, tableSchema, CatalogEnvironment.empty());
+ }
+
+ private FileStoreTable withBucket(FileStoreTable table, int newBucket) {
+ Options options = new Options(table.options());
+ options.set(CoreOptions.BUCKET, newBucket);
+ return table.copy(table.schema().copy(options.toMap()));
+ }
+
+ private WriteRestore newWriteRestore(FileStoreTable table) {
+ return new FileSystemWriteRestore(
+ table.store().options(),
+ table.snapshotManager(),
+ table.store().newScan(),
+ table.store().newIndexFileHandler());
+ }
+
+ private void commitOneRow(FileStoreTable table, int pt, int k) throws Exception {
+ String user = UUID.randomUUID().toString();
+ Long latest = table.snapshotManager().latestSnapshotId();
+ long id = latest == null ? 0L : latest;
+ try (StreamTableWrite write = table.newWrite(user);
+ StreamTableCommit commit = table.newCommit(user)) {
+ write.write(GenericRow.of(pt, k, (long) k));
+ commit.commit(id, write.prepareCommit(true, id));
+ }
+ }
+
+ /** Returns a bucket id (0..totalBuckets-1) that has no data files within the partition. */
+ private int findEmptyBucket(FileStoreTable table, int pt, int totalBuckets) throws Exception {
+ BinaryRow partition = binaryRow(pt);
+ for (int b = 0; b < totalBuckets; b++) {
+ int bucket = b;
+ boolean nonEmpty =
+ table.newSnapshotReader()
+ .withPartitionFilter(Collections.singletonList(partition))
+ .withBucket(bucket).read().dataSplits().stream()
+ .anyMatch(s -> !s.dataFiles().isEmpty());
+ if (!nonEmpty) {
+ return bucket;
+ }
+ }
+ throw new IllegalStateException(
+ "Could not find an empty bucket in partition "
+ + pt
+ + " (every bucket has files); test scenario could not be set up.");
+ }
+
+ /** Returns a bucket id (0..totalBuckets-1) that has at least one data file. */
+ private int findNonEmptyBucket(FileStoreTable table, int pt, int totalBuckets)
+ throws Exception {
+ BinaryRow partition = binaryRow(pt);
+ for (int b = 0; b < totalBuckets; b++) {
+ int bucket = b;
+ boolean nonEmpty =
+ table.newSnapshotReader()
+ .withPartitionFilter(Collections.singletonList(partition))
+ .withBucket(bucket).read().dataSplits().stream()
+ .anyMatch(s -> !s.dataFiles().isEmpty());
+ if (nonEmpty) {
+ return bucket;
+ }
+ }
+ throw new IllegalStateException("Could not find a non-empty bucket in partition " + pt);
+ }
+
+ private static BinaryRow binaryRow(int pt) {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeInt(0, pt);
+ writer.complete();
+ return row;
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index c91dbb4c8e4f..cbd5fee57f87 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -99,6 +99,7 @@
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -155,16 +156,16 @@ public void testOverwriteSameFiles() throws Exception {
}
@Test
- public void testBucketedAppendTableWriteWithInit() throws Exception {
- innerTestBucketedAppendTableWriteInit(true);
+ public void testBucketedAppendOrderedSequenceNumbers() throws Exception {
+ innerTestBucketedAppendSequenceNumbers(true);
}
@Test
- public void testBucketedAppendTableWriteNoInit() throws Exception {
- innerTestBucketedAppendTableWriteInit(false);
+ public void testBucketedAppendUnorderedSequenceNumbers() throws Exception {
+ innerTestBucketedAppendSequenceNumbers(false);
}
- public void innerTestBucketedAppendTableWriteInit(boolean ordered) throws Exception {
+ public void innerTestBucketedAppendSequenceNumbers(boolean ordered) throws Exception {
FileStoreTable table =
createFileStoreTable(
options -> {
@@ -176,31 +177,47 @@ public void innerTestBucketedAppendTableWriteInit(boolean ordered) throws Except
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
- // 1. first write
+ // 1. first write - use a=1 so both batches land in the same bucket
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
write.write(rowData(1, 10, 100L));
commit.commit(write.prepareCommit());
}
- // 2. delete all manifests
- ManifestList manifestList = table.store().manifestListFactory().create();
- ManifestFile manifestFile = table.store().manifestFileFactory().create();
- List manifests =
- manifestList.readAllManifests(table.latestSnapshot().get());
- for (ManifestFileMeta manifest : manifests) {
- manifestFile.delete(manifest.fileName());
+ // collect sequence numbers from batch 1
+ List batch1Files =
+ table.newReadBuilder().newScan().plan().splits().stream()
+ .flatMap(s -> ((DataSplit) s).dataFiles().stream())
+ .collect(Collectors.toList());
+ long batch1MaxSeq =
+ batch1Files.stream().mapToLong(DataFileMeta::maxSequenceNumber).max().getAsLong();
+ Set batch1FileNames =
+ batch1Files.stream().map(DataFileMeta::fileName).collect(Collectors.toSet());
+
+ // 2. second write - same a=1 value ensures same bucket as batch 1
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(rowData(1, 20, 200L));
+ commit.commit(write.prepareCommit());
}
- // 3. check new write
- try (BatchTableWrite write = writeBuilder.newWrite()) {
- if (ordered) {
- assertThatThrownBy(() -> write.write(rowData(1, 10, 100L)))
- .hasMessageContaining("FileNotFoundException");
- } else {
- // no exception
- write.write(rowData(1, 10, 100L));
- }
+ // collect sequence numbers from batch 2 only (exclude batch 1 files by name)
+ List batch2Files =
+ table.newReadBuilder().newScan().plan().splits().stream()
+ .flatMap(s -> ((DataSplit) s).dataFiles().stream())
+ .filter(f -> !batch1FileNames.contains(f.fileName()))
+ .collect(Collectors.toList());
+ long batch2MinSeq =
+ batch2Files.stream().mapToLong(DataFileMeta::minSequenceNumber).min().getAsLong();
+
+ if (ordered) {
+ // ordered mode always restores previous files and continues sequence numbers,
+ // so batch 2 sequence numbers are strictly greater than batch 1's
+ assertThat(batch2MinSeq).isGreaterThan(batch1MaxSeq);
+ } else {
+ // unordered+writeOnly mode skips restoring previous files (ignorePreviousFiles=true),
+ // so sequence numbers reset to 0 each session
+ assertThat(batch2MinSeq).isEqualTo(0L);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
index 5c551dac7004..82a1a53d0a41 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
@@ -101,6 +101,30 @@ public void testUnCompactDecimalAndTimestampNullValueBucketNumber() {
}
}
+ @Test
+ public void testPerPartitionBucketCount() {
+ int defaultBuckets = 100;
+ int partition1Buckets = 4;
+
+ // Build a BinaryRow for partition value = 1
+ BinaryRow partitionRow = BinaryRow.singleColumn(1);
+
+ Map partitionMap = new HashMap<>();
+ partitionMap.put(partitionRow, partition1Buckets);
+ PartitionBucketMapping mapping = new PartitionBucketMapping(defaultBuckets, partitionMap);
+
+ // Schema: partition key "a", bucket key "b", primary key "a,b"
+ FixedBucketRowKeyExtractor extractor = extractor("a", "b", "a,b", defaultBuckets, mapping);
+
+ // Same bucket key (b=456) in both partitions, different bucket counts produce
+ // different bucket assignments: hash(456) % 4 = 3, hash(456) % 100 = 47
+ GenericRow rowInMappedPartition = GenericRow.of(1, 456, 7);
+ assertThat(bucket(extractor, rowInMappedPartition)).isEqualTo(3);
+
+ GenericRow rowInDefaultPartition = GenericRow.of(99, 456, 7);
+ assertThat(bucket(extractor, rowInDefaultPartition)).isEqualTo(47);
+ }
+
private int bucket(FixedBucketRowKeyExtractor extractor, InternalRow row) {
extractor.setRecord(row);
return extractor.bucket();
@@ -125,8 +149,29 @@ private FixedBucketRowKeyExtractor extractor(
return extractor(rowType, partK, bk, pk, numBucket);
}
+ private FixedBucketRowKeyExtractor extractor(
+ String partK, String bk, String pk, int numBucket, PartitionBucketMapping mapping) {
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "a", new IntType()),
+ new DataField(1, "b", new IntType()),
+ new DataField(2, "c", new IntType())));
+ return extractor(rowType, partK, bk, pk, numBucket, mapping);
+ }
+
private FixedBucketRowKeyExtractor extractor(
RowType rowType, String partK, String bk, String pk, int numBucket) {
+ return extractor(rowType, partK, bk, pk, numBucket, new PartitionBucketMapping(numBucket));
+ }
+
+ private FixedBucketRowKeyExtractor extractor(
+ RowType rowType,
+ String partK,
+ String bk,
+ String pk,
+ int numBucket,
+ PartitionBucketMapping mapping) {
List fields = TableSchema.newFields(rowType);
Map options = new HashMap<>();
options.put(BUCKET_KEY.key(), bk);
@@ -142,6 +187,6 @@ private FixedBucketRowKeyExtractor extractor(
"".equals(pk) ? Collections.emptyList() : Arrays.asList(pk.split(",")),
options,
"");
- return new FixedBucketRowKeyExtractor(schema);
+ return new FixedBucketRowKeyExtractor(schema, mapping);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/PartitionBucketMappingTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/PartitionBucketMappingTest.java
new file mode 100644
index 000000000000..9fd85e348ecc
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/PartitionBucketMappingTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.operation.FileStoreScan;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link PartitionBucketMapping}. */
+public class PartitionBucketMappingTest {
+
+ @Test
+ public void testDefaultBucketCount() {
+ PartitionBucketMapping mapping = new PartitionBucketMapping(16);
+
+ // Any partition should resolve to the default
+ assertThat(mapping.resolveNumBuckets(BinaryRow.EMPTY_ROW)).isEqualTo(16);
+ assertThat(mapping.resolveNumBuckets(partition(1))).isEqualTo(16);
+ assertThat(mapping.resolveNumBuckets(partition(42))).isEqualTo(16);
+ }
+
+ @Test
+ public void testExplicitPartitionMapping() {
+ BinaryRow partA = partition(1);
+ BinaryRow partB = partition(2);
+ BinaryRow partC = partition(3);
+
+ Map partitionMap = new HashMap<>();
+ partitionMap.put(partA, 32);
+ partitionMap.put(partB, 64);
+
+ PartitionBucketMapping mapping = new PartitionBucketMapping(16, partitionMap);
+
+ // Mapped partitions return their specific bucket counts
+ assertThat(mapping.resolveNumBuckets(partA)).isEqualTo(32);
+ assertThat(mapping.resolveNumBuckets(partB)).isEqualTo(64);
+
+ // Unmapped partition falls back to the default
+ assertThat(mapping.resolveNumBuckets(partC)).isEqualTo(16);
+ }
+
+ @Test
+ public void testLoadFromScanPropagatesException() {
+ // Simulate a scan that throws (e.g. corrupted manifest, transient I/O error).
+ // loadFromScan must fail fast so the job does not silently write to wrong buckets.
+ FileStoreScan failingScan = Mockito.mock(FileStoreScan.class);
+ Mockito.when(failingScan.readPartitionEntries())
+ .thenThrow(new RuntimeException("simulated manifest scan failure"));
+
+ assertThatThrownBy(() -> PartitionBucketMapping.loadFromScan(failingScan, 8))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("simulated manifest scan failure");
+ }
+
+ private static BinaryRow partition(int value) {
+ return BinaryRow.singleColumn(value);
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
index 802a3ea9d4cf..642ef76ec219 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
@@ -18,13 +18,15 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.flink.sink.RowDataKeyAndBucketExtractor;
+import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -65,7 +67,7 @@ public class CdcRecordKeyAndBucketExtractorTest {
public void testExtract() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableSchema schema = createTableSchema();
- RowDataKeyAndBucketExtractor expected = new RowDataKeyAndBucketExtractor(schema);
+ FixedBucketRowKeyExtractor expected = extractor(schema);
CdcRecordKeyAndBucketExtractor actual = new CdcRecordKeyAndBucketExtractor(schema);
int numTests = random.nextInt(1000) + 1;
@@ -85,7 +87,7 @@ public void testExtract() throws Exception {
v1,
StringData.fromString(k2),
StringData.fromString(v2));
- expected.setRecord(rowData);
+ expected.setRecord(new FlinkRowWrapper(rowData));
Map data = new HashMap<>();
data.put("pt1", pt1);
@@ -109,7 +111,7 @@ public void testExtract() throws Exception {
public void testNullPartition() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableSchema schema = createTableSchema();
- RowDataKeyAndBucketExtractor expected = new RowDataKeyAndBucketExtractor(schema);
+ FixedBucketRowKeyExtractor expected = extractor(schema);
CdcRecordKeyAndBucketExtractor actual = new CdcRecordKeyAndBucketExtractor(schema);
long k1 = random.nextLong();
@@ -120,7 +122,7 @@ public void testNullPartition() throws Exception {
GenericRowData rowData =
GenericRowData.of(
null, null, k1, v1, StringData.fromString(k2), StringData.fromString(v2));
- expected.setRecord(rowData);
+ expected.setRecord(new FlinkRowWrapper(rowData));
Map data = new HashMap<>();
data.put("pt1", null);
@@ -143,7 +145,7 @@ public void testNullPartition() throws Exception {
public void testEmptyPartition() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableSchema schema = createTableSchema();
- RowDataKeyAndBucketExtractor expected = new RowDataKeyAndBucketExtractor(schema);
+ FixedBucketRowKeyExtractor expected = extractor(schema);
CdcRecordKeyAndBucketExtractor actual = new CdcRecordKeyAndBucketExtractor(schema);
long k1 = random.nextLong();
@@ -159,7 +161,7 @@ public void testEmptyPartition() throws Exception {
v1,
StringData.fromString(k2),
StringData.fromString(v2));
- expected.setRecord(rowData);
+ expected.setRecord(new FlinkRowWrapper(rowData));
Map data = new HashMap<>();
data.put("pt1", "");
@@ -188,4 +190,9 @@ private TableSchema createTableSchema() throws Exception {
Collections.singletonMap("bucket", "1"),
""));
}
+
+ private FixedBucketRowKeyExtractor extractor(TableSchema schema) {
+ return new FixedBucketRowKeyExtractor(
+ schema, new PartitionBucketMapping(schema.numBuckets()));
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 7c4676397523..4061336fe63a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -45,6 +45,7 @@
import org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SchemaBucketFileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
@@ -322,7 +323,7 @@ protected boolean buildForPostponeBucketCompaction(
String commitUser = CoreOptions.createCommitUser(options);
List> dataStreams = new ArrayList<>();
for (BinaryRow partition : partitions) {
- int bucketNum = defaultBucketNum;
+ int partitionBucketNum = defaultBucketNum;
Iterator it =
table.newSnapshotReader()
@@ -330,12 +331,13 @@ protected boolean buildForPostponeBucketCompaction(
.onlyReadRealBuckets()
.readFileIterator();
if (it.hasNext()) {
- bucketNum = it.next().totalBuckets();
+ partitionBucketNum = it.next().totalBuckets();
}
bucketOptions = new HashMap<>(table.options());
- bucketOptions.put(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
- FileStoreTable realTable = table.copy(table.schema().copy(bucketOptions));
+ bucketOptions.put(CoreOptions.BUCKET.key(), String.valueOf(partitionBucketNum));
+ FileStoreTable realTable =
+ new SchemaBucketFileStoreTable(table.copy(table.schema().copy(bucketOptions)));
LinkedHashMap partitionSpec =
partitionComputer.generatePartValues(partition);
@@ -352,7 +354,7 @@ protected boolean buildForPostponeBucketCompaction(
sourcePair.getLeft(),
realTable.rowType(),
table.catalogEnvironment().catalogContext()),
- new RowDataChannelComputer(realTable.schema()),
+ new RowDataChannelComputer(realTable.createRowKeyExtractor()),
null);
FixedBucketSink sink = new FixedBucketSink(realTable, null);
DataStream written =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index ae8013b7e709..d79751248f8f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -36,8 +36,10 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PostponeUtils;
+import org.apache.paimon.table.SchemaBucketFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.utils.BlobDescriptorUtils;
import org.apache.flink.api.common.functions.MapFunction;
@@ -300,9 +302,16 @@ protected DataStreamSink> buildForFixedBucket(DataStream input) {
+ " then the parallelism of writerOperator will be set to bucketNums.");
parallelism = bucketNums;
}
- DataStream partitioned =
- partition(input, new RowDataChannelComputer(table.schema()), parallelism);
- FixedBucketSink sink = new FixedBucketSink(table, overwritePartition);
+ // When overwriting a specific partition on a partitioned fixed-bucket table, wrap the
+ // table in SchemaBucketFileStoreTable so that createRowKeyExtractor() uses the new schema
+ // bucket count for row routing, instead of loading the old per-partition bucket
+ // mapping from the manifest.
+ FileStoreTable sinkTable =
+ overwritePartition != null ? new SchemaBucketFileStoreTable(table) : table;
+ RowDataChannelComputer channelComputer =
+ new RowDataChannelComputer(sinkTable.createRowKeyExtractor());
+ DataStream partitioned = partition(input, channelComputer, parallelism);
+ FixedBucketSink sink = new FixedBucketSink(sinkTable, overwritePartition);
return sink.sinkFrom(partitioned);
}
@@ -320,10 +329,13 @@ private DataStreamSink> buildPostponeBucketSink(DataStream input)
return sink.sinkFrom(partitioned);
} else {
Map knownNumBuckets = PostponeUtils.getKnownNumBuckets(table);
+ PartitionBucketMapping partitionBucketMapping =
+ PartitionBucketMapping.loadFromTable(table);
DataStream partitioned =
partition(
input,
- new PostponeFixedBucketChannelComputer(table.schema(), knownNumBuckets),
+ new PostponeFixedBucketChannelComputer(
+ table.schema(), knownNumBuckets, partitionBucketMapping),
parallelism);
FileStoreTable tableForWrite = PostponeUtils.tableForFixBucketWrite(table);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
index e1bf08d8434c..98cb9a07ee83 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
@@ -23,6 +23,7 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
import java.util.Map;
@@ -36,20 +37,24 @@ public class PostponeFixedBucketChannelComputer implements ChannelComputer knownNumBuckets;
+ private final PartitionBucketMapping partitionBucketMapping;
private transient int numChannels;
private transient FixedBucketRowKeyExtractor keyExtractor;
public PostponeFixedBucketChannelComputer(
- TableSchema schema, Map knownNumBuckets) {
+ TableSchema schema,
+ Map knownNumBuckets,
+ PartitionBucketMapping partitionBucketMapping) {
this.schema = schema;
this.knownNumBuckets = knownNumBuckets;
+ this.partitionBucketMapping = partitionBucketMapping;
}
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- this.keyExtractor = new FixedBucketRowKeyExtractor(schema);
+ this.keyExtractor = new FixedBucketRowKeyExtractor(schema, partitionBucketMapping);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
index 1df93c82bcb1..f785bf564491 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
@@ -20,9 +20,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
/** {@link ChannelComputer} for {@link InternalRow}. */
@@ -30,19 +28,16 @@ public class RowDataChannelComputer implements ChannelComputer {
private static final long serialVersionUID = 1L;
- private final TableSchema schema;
-
private transient int numChannels;
- private transient KeyAndBucketExtractor extractor;
+ private final KeyAndBucketExtractor extractor;
- public RowDataChannelComputer(TableSchema schema) {
- this.schema = schema;
+ public RowDataChannelComputer(KeyAndBucketExtractor extractor) {
+ this.extractor = extractor;
}
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- this.extractor = new FixedBucketRowKeyExtractor(schema);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataKeyAndBucketExtractor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataKeyAndBucketExtractor.java
deleted file mode 100644
index 8082df005fe8..000000000000
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataKeyAndBucketExtractor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-
-import org.apache.flink.table.data.RowData;
-
-/** {@link KeyAndBucketExtractor} for {@link RowData}. */
-public class RowDataKeyAndBucketExtractor implements KeyAndBucketExtractor {
-
- private final FixedBucketRowKeyExtractor wrapped;
-
- public RowDataKeyAndBucketExtractor(TableSchema schema) {
- wrapped = new FixedBucketRowKeyExtractor(schema);
- }
-
- @Override
- public void setRecord(RowData record) {
- wrapped.setRecord(new FlinkRowWrapper(record));
- }
-
- @Override
- public BinaryRow partition() {
- return wrapped.partition();
- }
-
- @Override
- public int bucket() {
- return wrapped.bucket();
- }
-
- @Override
- public BinaryRow trimmedPrimaryKey() {
- return wrapped.trimmedPrimaryKey();
- }
-}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
index 0f315880e932..a31079f1aa63 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
@@ -28,13 +28,13 @@
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -62,6 +62,7 @@ public class TableWriteCoordinator {
private final Cache pagedCoordination;
private volatile Snapshot snapshot;
+ private volatile PartitionBucketMapping partitionBucketMapping;
public TableWriteCoordinator(FileStoreTable table) {
this.table = table;
@@ -93,6 +94,7 @@ private synchronized void refresh() {
}
this.snapshot = latestSnapshot.get();
this.scan.withSnapshot(snapshot);
+ loadPartitionBucketMapping();
}
public synchronized PagedCoordinationResponse scan(PagedCoordinationRequest request)
@@ -148,9 +150,10 @@ public synchronized ScanCoordinationResponse scan(ScanCoordinationRequest reques
BinaryRow partition = deserializeBinaryRow(request.partition());
int bucket = request.bucket();
- List restoreFiles = new ArrayList<>();
List entries = scan.withPartitionBucket(partition, bucket).plan().files();
- Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles);
+ List restoreFiles = WriteRestore.extractDataFiles(entries);
+ Integer totalBuckets =
+ WriteRestore.extractTotalBuckets(entries, partition, partitionBucketMapping);
IndexFileMeta dynamicBucketIndex = null;
if (request.scanDynamicBucketIndex()) {
@@ -182,6 +185,7 @@ private synchronized long computeLatestIdentifier(String user) {
if (snapshot == null || latestSnapshotOfUser.id() > snapshot.id()) {
snapshot = latestSnapshotOfUser;
scan.withSnapshot(snapshot);
+ loadPartitionBucketMapping();
}
return latestSnapshotOfUser.commitIdentifier();
}
@@ -193,6 +197,16 @@ public void checkpoint() {
latestCommittedIdentifiers.clear();
}
+ private void loadPartitionBucketMapping() {
+ int defaultNumBuckets = table.schema().numBuckets();
+ // Note: `scan` is shared between this method (called during refresh/checkpoint) and the
+ // `scan(ScanCoordinationRequest)` method (which calls scan.withPartitionBucket(...)).
+ // Both callers always invoke scan.withSnapshot(...) before using the scan, so the shared
+ // state is safe. The partition-bucket filter set by withPartitionBucket is applied per
+ // plan() call and does not bleed across invocations.
+ this.partitionBucketMapping = PartitionBucketMapping.loadFromScan(scan, defaultNumBuckets);
+ }
+
private static class CoordinationKey {
private final byte[] content;
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 940dd95af0b0..50ea1b5261e5 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -1394,11 +1394,13 @@ public void testChangeBucketNumber() throws Exception {
insertInto(table, "('US Dollar', 102, '2022-06-20')");
- // increase bucket num from 2 to 3
- assertChangeBucketWithoutRescale(table, 3);
+ // For partitioned tables, changing the bucket num and writing is allowed.
+ // The partition's existing bucket count takes precedence, supporting
+ // per-partition rescale operations.
- // decrease bucket num from 3 to 1
- assertChangeBucketWithoutRescale(table, 1);
+ assertChangeBucketWithoutRescale(table, 3, 1);
+
+ assertChangeBucketWithoutRescale(table, 1, 2);
}
@Test
@@ -1938,7 +1940,7 @@ private void testSinkParallelism(
// 3. assert parallelism from transformation
DataStream mockSource =
- bExeEnv.fromCollection(Collections.singletonList(GenericRowData.of()));
+ bExeEnv.fromData(Collections.singletonList(GenericRowData.of()));
mockSource.getTransformation().setParallelism(mockSource.getParallelism(), false);
DataStreamSink> sink = sinkProvider.consumeDataStream(null, mockSource);
@@ -1987,21 +1989,15 @@ private static Stream testSinkParallelismParameters() {
return parameters.stream();
}
- private void assertChangeBucketWithoutRescale(String table, int bucketNum) throws Exception {
+ private void assertChangeBucketWithoutRescale(
+ String table, int bucketNum, int expectedRowsBefore) throws Exception {
bEnv.executeSql(String.format("ALTER TABLE `%s` SET ('bucket' = '%d')", table, bucketNum));
- // read is ok
- assertThat(
- BlockingIterator.of(bEnv.executeSql(buildSimpleQuery(table)).collect())
- .collect())
- .containsExactlyInAnyOrder(changelogRow("+I", "US Dollar", 102L, "2022-06-20"));
- assertThatThrownBy(() -> insertInto(table, "('US Dollar', 102, '2022-06-20')"))
- .rootCause()
- .isInstanceOf(RuntimeException.class)
- .hasMessage(
- String.format(
- "Try to write partition {dt=2022-06-20} with a new bucket num %d, but the previous bucket num is 2. "
- + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
- bucketNum));
+ // read existing data is ok after changing bucket num
+ List rows =
+ BlockingIterator.of(bEnv.executeSql(buildSimpleQuery(table)).collect()).collect();
+ assertThat(rows).hasSize(expectedRowsBefore);
+ // writing with a different bucket num is allowed for partitioned tables
+ insertInto(table, "('US Dollar', 102, '2022-06-20')");
}
private void validateSchemaOptionResult() {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index 9b981102ec56..e33eef3fef3c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -20,8 +20,10 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.core.execution.JobClient;
@@ -33,6 +35,8 @@
import javax.annotation.Nullable;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -229,6 +233,234 @@ private void innerTest(String catalogName, String tableName) {
.containsExactlyInAnyOrderElementsOf(expected);
}
+ @Test
+ public void testRescaleSinglePartitionViaInsertOverwrite() throws Exception {
+ // Create a partitioned primary-key table with 2 buckets.
+ // Primary-key tables are sensitive to bucket routing: each row must land in the
+ // bucket determined by hash(pk) % totalBuckets, otherwise merging during reads
+ // produces duplicates (LSM finds the same key in different buckets and returns both).
+ batchSql("USE CATALOG fs_catalog");
+ batchSql("DROP TABLE IF EXISTS `TP`");
+ batchSql(
+ "CREATE TABLE `TP` "
+ + "(dt STRING, f0 INT, PRIMARY KEY (dt, f0) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) "
+ + "WITH ('bucket' = '2')");
+
+ // Insert data into two partitions
+ batchSql("INSERT INTO TP VALUES ('p1', 1), ('p1', 2), ('p1', 3)");
+ batchSql("INSERT INTO TP VALUES ('p2', 4), ('p2', 5), ('p2', 6)");
+
+ // Alter bucket count to 4
+ batchSql(alterTableSql, "TP", 4);
+
+ // INSERT OVERWRITE only partition p1 via the SQL path.
+ // The SQL path goes through FlinkTableSinkBase which does NOT wrap the table in
+ // OverwriteFileStoreTable. As a result, table.createRowKeyExtractor() calls
+ // PartitionBucketMapping.loadFromTable() which reads the old per-partition bucket
+ // count from the manifest (2 for p1) and uses it for row routing (hashing).
+ // Rows land in buckets [0,1] but files are stamped totalBuckets=4.
+ // A subsequent upsert into p1 routes rows using the new 4-bucket mapping — the
+ // same key may appear in two different files with different bucket assignments,
+ // causing duplicate reads.
+ batchSql("INSERT OVERWRITE TP PARTITION (dt = 'p1') SELECT f0 FROM TP WHERE dt = 'p1'");
+
+ // Verify the overwrite preserved data correctly
+ assertThat(batchSql("SELECT * FROM TP WHERE dt = 'p1'"))
+ .containsExactlyInAnyOrder(Row.of("p1", 1), Row.of("p1", 2), Row.of("p1", 3));
+ assertThat(batchSql("SELECT * FROM TP WHERE dt = 'p2'"))
+ .containsExactlyInAnyOrder(Row.of("p2", 4), Row.of("p2", 5), Row.of("p2", 6));
+
+ // Now upsert (update) existing rows in p1 — this uses the new 4-bucket mapping.
+ // If the overwrite used the old 2-bucket hashing, the original row for key (p1,1)
+ // lands in bucket hash(1)%2, but the update lands in bucket hash(1)%4.
+ // The LSM reader finds two files for key (p1,1) in different buckets and returns
+ // both — causing duplicate rows.
+ batchSql("INSERT INTO TP VALUES ('p1', 1), ('p1', 2), ('p1', 3)");
+ assertThat(batchSql("SELECT * FROM TP WHERE dt = 'p1'"))
+ .as(
+ "After rescale overwrite + upsert of same keys, each key must appear "
+ + "exactly once. If INSERT OVERWRITE used the old 2-bucket "
+ + "routing instead of the new 4-bucket routing (via "
+ + "OverwriteFileStoreTable), the same key will exist in two "
+ + "different buckets causing duplicate rows on read.")
+ .containsExactlyInAnyOrder(Row.of("p1", 1), Row.of("p1", 2), Row.of("p1", 3));
+
+ // Also verify the files written by INSERT OVERWRITE are stamped with the new bucket count
+ FileStoreTable fileStoreTable = paimonTable("TP");
+ Iterator it =
+ fileStoreTable
+ .newSnapshotReader()
+ .withPartitionFilter(Collections.singletonMap("dt", "p1"))
+ .onlyReadRealBuckets()
+ .readFileIterator();
+ assertThat(it.hasNext()).isTrue();
+ while (it.hasNext()) {
+ ManifestEntry entry = it.next();
+ assertThat(entry.totalBuckets())
+ .as("Files in partition p1 must be stamped with the new bucket count (4)")
+ .isEqualTo(4);
+ assertThat(entry.bucket()).as("Bucket index must be in range [0, 3]").isBetween(0, 3);
+ }
+
+ batchSql("USE CATALOG default_catalog");
+ }
+
+ @Test
+ public void testWriteToEmptyBucketAfterRescaleKeepsPartitionBucketCount() throws Exception {
+ // End-to-end reproduction of the FileSystemWriteRestore empty-bucket scenario.
+ //
+ // The scenario occurs when a partition has a bucket count that differs from
+ // the table-level default, AND at least one of that partition's buckets
+ // has NO existing data files. A subsequent write that lands in the
+ // empty bucket must keep the partition's bucket count, not silently
+ // adopt the table default.
+ //
+ // Final state we want before the bug-triggering write (Step 5):
+ // - table default bucket : 2
+ // - partition p1 bucket : 4 (held over from a prior rescale+overwrite)
+ // - partition p1 bucket=X: has data files, totalBuckets = 4
+ // - partition p1 bucket=Y: NO data files
+ //
+ // The Step 5 write fills bucket=Y in p1. Post-fix, the new file MUST
+ // be stamped totalBuckets=4. Before the fix, FileSystemWriteRestore
+ // returned null for the empty bucket, the writer fell back to the
+ // table default (2), and the new file was stamped totalBuckets=2,
+ // corrupting p1's bucket layout.
+
+ // Step 1: create a partitioned PK table with bucket=2 and seed one row
+ // into p1.
+ batchSql(
+ "CREATE TABLE IF NOT EXISTS `fs_catalog`.`default`.`TPE` "
+ + "(dt STRING, f0 INT, PRIMARY KEY (dt, f0) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) "
+ + "WITH ('bucket' = '2')");
+ batchSql("USE CATALOG fs_catalog");
+ batchSql("INSERT INTO TPE VALUES ('p1', 1)");
+ // Also seed partition p2 — this acts as a positive control: p2 is
+ // never rescaled and must always be stamped with the current table
+ // default bucket count.
+ batchSql("INSERT INTO TPE VALUES ('p2', 1)");
+
+ // Step 2: rescale the table default to 4.
+ batchSql(alterTableSql, "TPE", 4);
+
+ // Step 3: INSERT OVERWRITE p1 — this rewrites p1's files using the
+ // current table default (4), so afterwards p1's files are stamped
+ // totalBuckets=4. With only a single row in p1, at least one of p1's
+ // four buckets is guaranteed to be empty.
+ batchSql("INSERT OVERWRITE TPE PARTITION (dt = 'p1') SELECT f0 FROM TPE WHERE dt = 'p1'");
+
+ // Step 4 (NEW): rescale the table default BACK to 2, while p1 keeps
+ // its 4-bucket layout. Now the per-partition bucket count (4) diverges
+ // from the table default (2) — the precondition for the bug.
+ batchSql(alterTableSql, "TPE", 2);
+
+ // Step 5: insert more rows into p1. PartitionBucketMapping reads the
+ // manifest and routes these rows using p1's actual bucket count (4).
+ // Several rows are inserted so that — with high probability — at
+ // least one lands in the bucket that was previously empty, exercising
+ // the FileSystemWriteRestore empty-bucket code path.
+ // Also insert another row into p2 — p2 has never been rescaled and
+ // must remain stamped with the current table default bucket count (2).
+ batchSql(
+ "INSERT INTO TPE VALUES "
+ + "('p1', 2), ('p1', 3), ('p1', 4), ('p1', 5), ('p1', 6), "
+ + "('p2', 2), ('p2', 3), ('p2', 4), ('p2', 5), ('p2', 6)");
+
+ // Sanity check: each PK appears exactly once. Duplicates would indicate
+ // that the new rows landed in an inconsistent bucket layout (e.g. some
+ // files stamped totalBuckets=2 and others stamped totalBuckets=4),
+ // causing the LSM reader to find the same PK in two different buckets.
+ assertThat(batchSql("SELECT * FROM TPE"))
+ .as(
+ "Each PK must appear exactly once across all partitions. Duplicates "
+ + "indicate rows landed in inconsistent bucket layouts.")
+ .containsExactlyInAnyOrder(
+ Row.of("p1", 1),
+ Row.of("p1", 2),
+ Row.of("p1", 3),
+ Row.of("p1", 4),
+ Row.of("p1", 5),
+ Row.of("p1", 6),
+ Row.of("p2", 1),
+ Row.of("p2", 2),
+ Row.of("p2", 3),
+ Row.of("p2", 4),
+ Row.of("p2", 5),
+ Row.of("p2", 6));
+
+ // Strong assertion (p1 — the rescaled partition): every file must be
+ // stamped totalBuckets=4 (the partition's actual bucket count), NOT 2
+ // (the new table default). This is the precise condition the fix
+ // guarantees for partitions whose bucket count differs from the table
+ // default.
+ FileStoreTable fileStoreTable = paimonTable("TPE");
+ Iterator p1Iter =
+ fileStoreTable
+ .newSnapshotReader()
+ .withPartitionFilter(Collections.singletonMap("dt", "p1"))
+ .onlyReadRealBuckets()
+ .readFileIterator();
+ assertThat(p1Iter.hasNext()).isTrue();
+ while (p1Iter.hasNext()) {
+ ManifestEntry entry = p1Iter.next();
+ assertThat(entry.totalBuckets())
+ .as(
+ "Files in partition p1 must keep the partition's bucket count (4). "
+ + "If 2, FileSystemWriteRestore failed to fall back to "
+ + "PartitionBucketMapping for buckets that had no existing "
+ + "files at restore time, and stamped new files with the "
+ + "table-level default bucket count instead.")
+ .isEqualTo(4);
+ assertThat(entry.bucket()).as("Bucket index must be in range [0, 3]").isBetween(0, 3);
+ }
+
+ // Positive control (p2 — never rescaled): every file must be stamped
+ // with the current table default (2). This guards against an
+ // over-eager fix that would erroneously route ALL writes through
+ // PartitionBucketMapping even when no per-partition override exists.
+ Iterator p2Iter =
+ fileStoreTable
+ .newSnapshotReader()
+ .withPartitionFilter(Collections.singletonMap("dt", "p2"))
+ .onlyReadRealBuckets()
+ .readFileIterator();
+ assertThat(p2Iter.hasNext()).isTrue();
+ java.util.List p2Entries = new java.util.ArrayList<>();
+ while (p2Iter.hasNext()) {
+ p2Entries.add(p2Iter.next());
+ }
+ // Diagnostic: dump every manifest entry in p2 so we can see what each
+ // file is stamped with (FileKind, bucket, totalBuckets, file name,
+ // snapshot/sequence info via the file meta).
+ System.out.println("=== p2 manifest entries (" + p2Entries.size() + " total) ===");
+ for (int i = 0; i < p2Entries.size(); i++) {
+ ManifestEntry entry = p2Entries.get(i);
+ System.out.printf(
+ " [%d] kind=%s level=%d bucket=%d totalBuckets=%d file=%s "
+ + "minSeq=%d maxSeq=%d rowCount=%d%n",
+ i,
+ entry.kind(),
+ entry.level(),
+ entry.bucket(),
+ entry.totalBuckets(),
+ entry.file().fileName(),
+ entry.file().minSequenceNumber(),
+ entry.file().maxSequenceNumber(),
+ entry.file().rowCount());
+ }
+ for (ManifestEntry entry : p2Entries) {
+ assertThat(entry.totalBuckets())
+ .as(
+ "Files in partition p2 must use the current table default bucket "
+ + "count (2). p2 was never rescaled, so its files should "
+ + "always reflect the table-level default.")
+ .isEqualTo(2);
+ assertThat(entry.bucket()).as("Bucket index must be in range [0, 1]").isBetween(0, 1);
+ }
+ }
+
private void executeBoth(List sqlList) {
sqlList.forEach(
sql -> {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RescaleActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RescaleActionITCase.java
new file mode 100644
index 000000000000..4c698d23ff03
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RescaleActionITCase.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link RescaleAction}. */
+public class RescaleActionITCase extends ActionITCaseBase {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()},
+ new String[] {"k", "v", "pt"});
+
+ private FileStoreTable prepareTable(int initialBuckets) throws Exception {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), String.valueOf(initialBuckets));
+ FileStoreTable table =
+ createFileStoreTable(
+ ROW_TYPE,
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ Collections.emptyList(),
+ options);
+
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+ return table;
+ }
+
+ @Test
+ @Timeout(120)
+ public void testRescaleAllPartitions() throws Exception {
+ int initialBuckets = 2;
+ int newBuckets = 4;
+ prepareTable(initialBuckets);
+
+ // Write data to two partitions
+ writeData(
+ rowData(1, 100, BinaryString.fromString("p1")),
+ rowData(2, 200, BinaryString.fromString("p1")),
+ rowData(3, 300, BinaryString.fromString("p2")),
+ rowData(4, 400, BinaryString.fromString("p2")));
+
+ // Rescale all partitions (no --partition argument)
+ createAction(
+ RescaleAction.class,
+ "rescale",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--bucket_num",
+ String.valueOf(newBuckets))
+ .run();
+
+ // Verify both partitions now have the new bucket count
+ FileStoreTable table = getFileStoreTable(tableName);
+ Map partitionBuckets = getPartitionBucketCounts(table);
+ for (Map.Entry entry : partitionBuckets.entrySet()) {
+ assertThat(entry.getValue()).isEqualTo(newBuckets);
+ }
+
+ // Verify data is preserved
+ List rows = getData(tableName);
+ assertThat(rows).hasSize(4);
+ }
+
+ @Test
+ @Timeout(120)
+ public void testRescaleSinglePartition() throws Exception {
+ int initialBuckets = 2;
+ int rescaledBuckets = 4;
+ prepareTable(initialBuckets);
+
+ // Write data to two partitions
+ writeData(
+ rowData(1, 100, BinaryString.fromString("p1")),
+ rowData(2, 200, BinaryString.fromString("p1")),
+ rowData(3, 300, BinaryString.fromString("p2")),
+ rowData(4, 400, BinaryString.fromString("p2")));
+
+ // Rescale only partition p1 to 4 buckets, leaving p2 at 2 buckets
+ createAction(
+ RescaleAction.class,
+ "rescale",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--bucket_num",
+ String.valueOf(rescaledBuckets),
+ "--partition",
+ "pt=p1")
+ .run();
+
+ // Verify the table now has different bucket counts per partition
+ FileStoreTable table = getFileStoreTable(tableName);
+ Map partitionBuckets = getPartitionBucketCounts(table);
+
+ assertThat(partitionBuckets).hasSize(2);
+
+ // Find which partition is p1 and which is p2 by checking their bucket counts
+ int p1Buckets = -1;
+ int p2Buckets = -1;
+ for (Map.Entry entry : partitionBuckets.entrySet()) {
+ int buckets = entry.getValue();
+ // One partition should have rescaledBuckets, the other initialBuckets
+ if (buckets == rescaledBuckets) {
+ p1Buckets = buckets;
+ } else if (buckets == initialBuckets) {
+ p2Buckets = buckets;
+ }
+ }
+ assertThat(p1Buckets)
+ .as("Rescaled partition p1 should have %d buckets", rescaledBuckets)
+ .isEqualTo(rescaledBuckets);
+ assertThat(p2Buckets)
+ .as("Non-rescaled partition p2 should still have %d buckets", initialBuckets)
+ .isEqualTo(initialBuckets);
+
+ // Verify data is preserved across both partitions
+ List rows = getData(tableName);
+ assertThat(rows).hasSize(4);
+ }
+
+ @Test
+ @Timeout(120)
+ public void testRescaleAppendOnlyTable() throws Exception {
+ int initialBuckets = 2;
+ int newBuckets = 4;
+
+ // Create an append-only table (no primary keys)
+ RowType appendRowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()},
+ new String[] {"a", "b", "pt"});
+ Map options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), String.valueOf(initialBuckets));
+ FileStoreTable table =
+ createFileStoreTable(
+ appendRowType,
+ Collections.singletonList("pt"),
+ Collections.emptyList(), // no primary keys = append-only
+ Collections.singletonList("a"), // bucket key required for append-only
+ options);
+
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ // Write data
+ writeData(
+ rowData(1, 100, BinaryString.fromString("p1")),
+ rowData(2, 200, BinaryString.fromString("p1")),
+ rowData(3, 300, BinaryString.fromString("p1")),
+ rowData(4, 400, BinaryString.fromString("p1")));
+
+ // Rescale the append-only table
+ createAction(
+ RescaleAction.class,
+ "rescale",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--bucket_num",
+ String.valueOf(newBuckets))
+ .run();
+
+ // Verify the new bucket count
+ FileStoreTable rescaledTable = getFileStoreTable(tableName);
+ Map partitionBuckets = getPartitionBucketCounts(rescaledTable);
+ for (Map.Entry entry : partitionBuckets.entrySet()) {
+ assertThat(entry.getValue()).isEqualTo(newBuckets);
+ }
+
+ // Verify data is preserved
+ List rows = getData(tableName);
+ assertThat(rows).hasSize(4);
+ }
+
+ /**
+ * Reads all manifest entries and builds a map from partition to totalBuckets. This reflects the
+ * actual bucket layout on disk.
+ */
+ private Map getPartitionBucketCounts(FileStoreTable table) {
+ List entries = table.store().newScan().plan().files(FileKind.ADD);
+ Map result = new HashMap<>();
+ for (ManifestEntry entry : entries) {
+ int totalBuckets = entry.totalBuckets();
+ if (totalBuckets > 0) {
+ result.putIfAbsent(entry.partition().copy(), totalBuckets);
+ }
+ }
+ return result;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java
index 991ab5d2ddbd..e2d1ffc27719 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java
@@ -26,6 +26,7 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -72,8 +73,10 @@ public void testRecordsDistributedAcrossChannels() throws Exception {
int numChannels = 8;
Map knownNumBuckets = new HashMap<>();
+ PartitionBucketMapping bucketMapping =
+ new PartitionBucketMapping(numChannels, Collections.emptyMap());
PostponeFixedBucketChannelComputer computer =
- new PostponeFixedBucketChannelComputer(schema, knownNumBuckets);
+ new PostponeFixedBucketChannelComputer(schema, knownNumBuckets, bucketMapping);
computer.setup(numChannels);
Set channels = new HashSet<>();
@@ -113,8 +116,10 @@ public void testNoPartitionDistribution() throws Exception {
int numChannels = 8;
Map knownNumBuckets = new HashMap<>();
+ PartitionBucketMapping bucketMapping =
+ new PartitionBucketMapping(numChannels, Collections.emptyMap());
PostponeFixedBucketChannelComputer computer =
- new PostponeFixedBucketChannelComputer(schema, knownNumBuckets);
+ new PostponeFixedBucketChannelComputer(schema, knownNumBuckets, bucketMapping);
computer.setup(numChannels);
Set channels = new HashSet<>();
@@ -155,8 +160,10 @@ public void testSameKeyGoesToSameChannel() throws Exception {
int numChannels = 8;
Map knownNumBuckets = new HashMap<>();
+ PartitionBucketMapping bucketMapping =
+ new PartitionBucketMapping(numChannels, Collections.emptyMap());
PostponeFixedBucketChannelComputer computer =
- new PostponeFixedBucketChannelComputer(schema, knownNumBuckets);
+ new PostponeFixedBucketChannelComputer(schema, knownNumBuckets, bucketMapping);
computer.setup(numChannels);
// Same key should always route to the same channel
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
index 687407141bf1..a4cf677481b8 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
@@ -27,6 +27,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
+import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -108,10 +109,13 @@ public void testSchemaNoPartition() throws Exception {
private void testImpl(TableSchema schema, List input) {
ThreadLocalRandom random = ThreadLocalRandom.current();
- FixedBucketRowKeyExtractor extractor = new FixedBucketRowKeyExtractor(schema);
+ PartitionBucketMapping partitionBucketMapping =
+ new PartitionBucketMapping(schema.numBuckets());
+ FixedBucketRowKeyExtractor extractor =
+ new FixedBucketRowKeyExtractor(schema, partitionBucketMapping);
int numChannels = random.nextInt(10) + 1;
- RowDataChannelComputer channelComputer = new RowDataChannelComputer(schema);
+ RowDataChannelComputer channelComputer = new RowDataChannelComputer(extractor);
channelComputer.setup(numChannels);
// assert that channel(record) and channel(partition, bucket) gives the same result
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
index 8bc952f9baf4..c810e860bc5d 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
@@ -18,18 +18,30 @@
package org.apache.paimon.flink.sink.coordinator;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
@@ -76,4 +88,97 @@ public void testNoManifestCache() throws Exception {
assertThatThrownBy(() -> new TableWriteCoordinator(table))
.isInstanceOf(NullPointerException.class);
}
+
+ /**
+ * Tests that when a partition has been rescaled (different bucket count than the table
+ * default), the coordinator returns the correct per-partition bucket count even for buckets
+ * with no data files.
+ *
+ * Scenario:
+ *
+ *
+ * - Table bucket (default): 32
+ *
- Partition A bucket: 2 (rescaled)
+ *
- Partition A bucket=0: has data files, totalBuckets=2
+ *
- Partition A bucket=1: no data files
+ *
+ *
+ * When scanning bucket=1 of partition A (empty bucket), the response {@code totalBuckets}
+ * must be 2 (from the per-partition mapping), not 32 (the table default).
+ */
+ @Test
+ public void testEmptyBucketUsesPartitionBucketCount() throws Exception {
+ // Table default: 32 buckets; partition A was rescaled to 2
+ int tableBuckets = 32;
+ int partitionBuckets = 2;
+
+ Identifier identifier = new Identifier("db2", "table");
+ catalog.createDatabase("db2", false);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("k", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .primaryKey("pt", "k")
+ .partitionKeys("pt")
+ .option(CoreOptions.BUCKET.key(), String.valueOf(tableBuckets))
+ .build();
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = getTable(identifier);
+
+ // Write to partition A, bucket=0 with totalBuckets=2 (rescaled partition)
+ String commitUser = UUID.randomUUID().toString();
+ StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
+ List messages;
+ try (StreamTableWrite write = writeBuilder.newWrite()) {
+ write.write(GenericRow.of(1, 1, 100), 0);
+ messages = write.prepareCommit(false, 0);
+ }
+
+ // Override totalBuckets to simulate partition rescale to 2
+ CommitMessageImpl original = (CommitMessageImpl) messages.get(0);
+ CommitMessageImpl rescaled =
+ new CommitMessageImpl(
+ original.partition(),
+ original.bucket(),
+ partitionBuckets,
+ original.newFilesIncrement(),
+ original.compactIncrement());
+ try (TableCommitImpl commit = table.newCommit(commitUser)) {
+ commit.commit(0, Collections.singletonList(rescaled));
+ }
+
+ // Reload the table and create coordinator
+ FileStoreTable freshTable = getTable(identifier);
+ TableWriteCoordinator coordinator = new TableWriteCoordinator(freshTable);
+
+ BinaryRow partitionA = partitionRow(1);
+
+ // Scan bucket=0 (has files): totalBuckets should be 2
+ ScanCoordinationRequest requestBucket0 =
+ new ScanCoordinationRequest(serializeBinaryRow(partitionA), 0, false, false);
+ ScanCoordinationResponse responseBucket0 = coordinator.scan(requestBucket0);
+ assertThat(responseBucket0.totalBuckets())
+ .as("bucket=0 (has files) should use per-partition bucket count 2")
+ .isEqualTo(partitionBuckets);
+ assertThat(responseBucket0.extractDataFiles()).isNotEmpty();
+
+ // Scan bucket=1 (empty): totalBuckets must be 2, not the table default 32
+ ScanCoordinationRequest requestBucket1 =
+ new ScanCoordinationRequest(serializeBinaryRow(partitionA), 1, false, false);
+ ScanCoordinationResponse responseBucket1 = coordinator.scan(requestBucket1);
+ assertThat(responseBucket1.totalBuckets())
+ .as(
+ "bucket=1 (empty bucket) must use per-partition bucket count 2, not table default 32")
+ .isEqualTo(partitionBuckets);
+ assertThat(responseBucket1.extractDataFiles()).isEmpty();
+ }
+
+ private BinaryRow partitionRow(int partitionValue) {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeInt(0, partitionValue);
+ writer.complete();
+ return row;
+ }
}
|