diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index ebd8a5aca7d1..ac105664faea 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -919,7 +919,25 @@
manifest.merge-min-count
30 Integer - To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge. + To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.
Note: when 'manifest-sort.enabled' is true, this minimum-count gate is only applied to the trailing sub-segment of a section that exceeds 'manifest-sort.max-rewrite-size'. Small under-budget sections are sorted and rewritten directly, so two small manifest files may be merged into one even when their count is below this threshold and full compaction is not triggered. + + +
manifest-sort.enabled
+ false + Boolean + Whether to invoke manifest sort rewrite during commit.
Note: enabling this changes the semantics of 'manifest.merge-min-count'. In the sort rewrite path, small manifest files within the rewrite budget are sorted and merged directly, so the minimum-count gate no longer prevents merging a small number of under-budget manifest files when full compaction is not triggered. + + +
manifest-sort.partition-field
+ (none) + String + Partition field name to sort manifest entries by. Validated by schema validation, if not configured, defaults to the first partition field. + + +
manifest-sort.max-rewrite-size
+ 256 mb + MemorySize + Maximum total size of manifest files to rewrite in a single sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger value to allow more aggressive sort rewriting. The cap only limits the sorted rewrite portion and full/minor cleanup may still happen beyond it.
manifest.target-file-size
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index f09a6edb4a74..2518fc75643f 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -468,8 +468,61 @@ public InlineElement getDescription() { .intType() .defaultValue(30) .withDescription( - "To avoid frequent manifest merges, this parameter specifies the minimum number " - + "of ManifestFileMeta to merge."); + Description.builder() + .text( + "To avoid frequent manifest merges, this parameter specifies the minimum number " + + "of ManifestFileMeta to merge.") + .linebreak() + .text( + "Note: when '" + + "manifest-sort.enabled" + + "' is true, this minimum-count gate is only " + + "applied to the trailing sub-segment of a " + + "section that exceeds '" + + "manifest-sort.max-rewrite-size" + + "'. Small under-budget sections are sorted " + + "and rewritten directly, so two small manifest " + + "files may be merged into one even when their " + + "count is below this threshold and full " + + "compaction is not triggered.") + .build()); + + public static final ConfigOption MANIFEST_SORT_ENABLED = + key("manifest-sort.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text("Whether to invoke manifest sort rewrite during commit.") + .linebreak() + .text( + "Note: enabling this changes the semantics of '" + + "manifest.merge-min-count" + + "'. In the sort rewrite path, small manifest " + + "files within the rewrite budget are sorted " + + "and merged directly, so the minimum-count " + + "gate no longer prevents merging a small " + + "number of under-budget manifest files when " + + "full compaction is not triggered.") + .build()); + + public static final ConfigOption MANIFEST_SORT_PARTITION_FIELD = + key("manifest-sort.partition-field") + .stringType() + .noDefaultValue() + .withDescription( + "Partition field name to sort manifest entries by. Validated by" + + " schema validation, if not configured, defaults to the first partition field."); + + public static final ConfigOption MANIFEST_SORT_MAX_REWRITE_SIZE = + key("manifest-sort.max-rewrite-size") + .memoryType() + .defaultValue(MemorySize.ofMebiBytes(256)) + .withDescription( + "Maximum total size of manifest files to rewrite in a single" + + " sort rewrite pass. Sections exceeding this limit are" + + " skipped. Set to a larger value to allow more aggressive" + + " sort rewriting. The cap only limits the sorted rewrite portion and full/minor cleanup may still happen beyond it."); public static final ConfigOption UPSERT_KEY = key("upsert-key") @@ -2603,6 +2656,19 @@ public MemorySize manifestFullCompactionThresholdSize() { return options.get(MANIFEST_FULL_COMPACTION_FILE_SIZE); } + public boolean manifestSortEnabled() { + return options.get(MANIFEST_SORT_ENABLED); + } + + @Nullable + public String manifestSortPartitionField() { + return options.get(MANIFEST_SORT_PARTITION_FIELD); + } + + public long manifestSortMaxRewriteSize() { + return options.get(MANIFEST_SORT_MAX_REWRITE_SIZE).getBytes(); + } + public String partitionDefaultName() { return options.get(PARTITION_DEFAULT_NAME); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 3f9fdb9f1c0c..10c9b20a0467 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -55,6 +55,8 @@ import org.apache.paimon.operation.commit.SuccessCommitResult; import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.operation.metrics.CommitStats; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.partition.PartitionStatistics; import org.apache.paimon.predicate.Predicate; @@ -964,13 +966,7 @@ CommitResult tryCommitOnce( // try to merge old manifest files to create base manifest list mergeAfterManifests = ManifestFileMerger.merge( - mergeBeforeManifests, - manifestFile, - options.manifestTargetSize().getBytes(), - options.manifestMergeMinCount(), - options.manifestFullCompactionThresholdSize().getBytes(), - partitionType, - options.scanManifestParallelism()); + mergeBeforeManifests, manifestFile, partitionType, options); baseManifestList = manifestList.write(mergeAfterManifests); if (options.rowTrackingEnabled()) { @@ -1190,16 +1186,16 @@ private boolean compactManifestOnce() { manifestList.readDataManifests(latestSnapshot); List mergeAfterManifests; - // the fist trial + // the fist trial: use a copied options with forced full compaction settings + Options compactOptions = Options.fromMap(options.toMap()); + compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1); + compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE, MemorySize.ofBytes(1)); mergeAfterManifests = ManifestFileMerger.merge( mergeBeforeManifests, manifestFile, - options.manifestTargetSize().getBytes(), - 1, - 1, partitionType, - options.scanManifestParallelism()); + new CoreOptions(compactOptions)); if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) { // no need to commit this snapshot, because no compact were happened diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestAdjacentSortedRun.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestAdjacentSortedRun.java new file mode 100644 index 000000000000..ca0797c2139c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestAdjacentSortedRun.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.manifest.ManifestFileMeta; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * A {@code ManifestAdjacentSortedRun} is a list of {@link ManifestFileMeta}s sorted by a single + * partition field (the configured manifest sort field). The intervals {@code + * [partitionStats.minValues[k], partitionStats.maxValues[k]]} of these manifests do not overlap on + * field {@code k}, where {@code k} is the configured sort field index. + * + *

Boundary Equality: Files with boundary-touching intervals (min == previous.max) are + * considered non-overlapping and can be placed in the same SortedRun. This reduces the number of + * runs and improves compaction efficiency. However, such files may be separated into different + * Sections during splitIntoSections to avoid merge-sort overhead. + */ +public class ManifestAdjacentSortedRun { + + private int level; + private final List files; + private final long totalSize; + + private ManifestAdjacentSortedRun(List files) { + this.level = -1; + this.files = Collections.unmodifiableList(files); + long size = 0L; + for (ManifestFileMeta file : files) { + size += file.fileSize(); + } + this.totalSize = size; + } + + /** + * Build a {@code ManifestAdjacentSortedRun} from an already-sorted list. The caller MUST + * guarantee that {@code sortedFiles} is sorted ascending on the configured sort field's min + * value, and that intervals do not overlap on that field. + */ + public static ManifestAdjacentSortedRun fromSorted(List sortedFiles) { + return new ManifestAdjacentSortedRun(sortedFiles); + } + + public List files() { + return files; + } + + public long totalSize() { + return totalSize; + } + + public int level() { + return level; + } + + public void setLevel(int level) { + this.level = level; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ManifestAdjacentSortedRun)) { + return false; + } + ManifestAdjacentSortedRun that = (ManifestAdjacentSortedRun) o; + return level == that.level && files.equals(that.files); + } + + @Override + public int hashCode() { + return Objects.hash(level, files); + } + + @Override + public String toString() { + return "ManifestAdjacentSortedRun{level=" + + level + + ", files=[" + + files.stream().map(ManifestFileMeta::fileName).collect(Collectors.joining(", ")) + + "]}"; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index cdcad1ed3e84..f899aa71786f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -18,6 +18,7 @@ package org.apache.paimon.operation; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.manifest.FileEntry; @@ -48,7 +49,7 @@ import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute; import static org.apache.paimon.utils.Preconditions.checkArgument; -/** Util for merging manifest files. */ +/** Manifest file merger with standard merge logic and optional sort rewrite. */ public class ManifestFileMerger { private static final Logger LOG = LoggerFactory.getLogger(ManifestFileMerger.class); @@ -62,33 +63,44 @@ public class ManifestFileMerger { public static List merge( List input, ManifestFile manifestFile, - long suggestedMetaSize, - int suggestedMinMetaCount, - long manifestFullCompactionSize, RowType partitionType, - @Nullable Integer manifestReadParallelism) { + CoreOptions options) { + // Extract configuration from options + long suggestedMetaSize = options.manifestTargetSize().getBytes(); + int suggestedMinMetaCount = options.manifestMergeMinCount(); + long manifestFullCompactionSize = options.manifestFullCompactionThresholdSize().getBytes(); + Integer manifestReadParallelism = options.scanManifestParallelism(); + // these are the newly created manifest files, clean them up if exception occurs List newFilesForAbort = new ArrayList<>(); try { - Optional> fullCompacted = - tryFullCompaction( - input, - newFilesForAbort, - manifestFile, - suggestedMetaSize, - manifestFullCompactionSize, - partitionType, - manifestReadParallelism); - return fullCompacted.orElseGet( - () -> - tryMinorCompaction( - input, - newFilesForAbort, - manifestFile, - suggestedMetaSize, - suggestedMinMetaCount, - manifestReadParallelism)); + // If manifest-sort.enabled is enabled and there are partition fields, use + // trySortRewrite + if (options.manifestSortEnabled() && partitionType.getFieldCount() > 0) { + return ManifestFileSorter.trySortCompaction( + input, newFilesForAbort, manifestFile, partitionType, options); + } else { + // Otherwise try full compaction first, then minor compaction if needed + Optional> fullCompacted = + tryFullCompaction( + input, + newFilesForAbort, + manifestFile, + suggestedMetaSize, + manifestFullCompactionSize, + partitionType, + manifestReadParallelism); + return fullCompacted.orElseGet( + () -> + tryMinorCompaction( + input, + newFilesForAbort, + manifestFile, + suggestedMetaSize, + suggestedMinMetaCount, + manifestReadParallelism)); + } } catch (Throwable e) { // exception occurs, clean up and rethrow for (ManifestFileMeta manifest : newFilesForAbort) { @@ -234,7 +246,6 @@ public static Optional> tryFullCompaction( } // 2.2. merge - if (toBeMerged.size() <= 1) { return Optional.empty(); } @@ -295,7 +306,7 @@ private static FullCompactionReadResult readForFullCompaction( return new FullCompactionReadResult(file, requireChange, entries); } - private static Set computeDeletePartitions(Set deleteEntries) { + static Set computeDeletePartitions(Set deleteEntries) { Set partitions = new HashSet<>(); for (FileEntry.Identifier identifier : deleteEntries) { partitions.add(identifier.partition); @@ -303,13 +314,13 @@ private static Set computeDeletePartitions(Set return partitions; } - private static class FullCompactionReadResult { + static class FullCompactionReadResult { - private final ManifestFileMeta file; - private final boolean requireChange; - private final List entries; + final ManifestFileMeta file; + final boolean requireChange; + final List entries; - private FullCompactionReadResult( + FullCompactionReadResult( ManifestFileMeta file, boolean requireChange, List entries) { this.file = file; this.requireChange = requireChange; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java new file mode 100644 index 000000000000..39ef0bab5299 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java @@ -0,0 +1,1149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.Pair; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.function.Function; + +import static java.util.Collections.singletonList; +import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute; + +/** Manifest file sorter that sorts and rewrites manifest files by a configured partition field. */ +public class ManifestFileSorter { + + private static final Logger LOG = LoggerFactory.getLogger(ManifestFileSorter.class); + + /** Context object that carries shared state across compaction methods. */ + static class CompactionContext { + final boolean fullCompaction; + final RecordComparator fieldComparator; + final Set deleteEntries; + final Map defaultCompactionMap; + final List levelRuns; + final List pickedRuns; + + CompactionContext( + boolean fullCompaction, + RecordComparator fieldComparator, + Set deleteEntries, + Map defaultCompactionMap, + List levelRuns, + List pickedRuns) { + this.fullCompaction = fullCompaction; + this.fieldComparator = fieldComparator; + this.deleteEntries = deleteEntries; + this.defaultCompactionMap = defaultCompactionMap; + this.levelRuns = levelRuns; + this.pickedRuns = pickedRuns; + } + } + + /** Result of classifying manifest files. */ + private static class ClassifyResult { + final List lsmFiles; + final Set deleteEntries; + final Map defaultCompactionMap; + + ClassifyResult( + List lsmFiles, + Set deleteEntries, + Map defaultCompactionMap) { + this.lsmFiles = lsmFiles; + this.deleteEntries = deleteEntries; + this.defaultCompactionMap = defaultCompactionMap; + } + } + + /** + * Try to sort-rewrite the merged manifest list by a configured partition field. If the sort + * field cannot be resolved, the input is returned as-is. + * + *

Dispatches to {@link #tryFullCompaction} when totalDeltaFileSize >= sizeTrigger, or {@link + * #tryMinorCompaction} otherwise. + */ + static List trySortCompaction( + List input, + List newFilesForAbort, + ManifestFile manifestFile, + RowType partitionType, + CoreOptions options) + throws Exception { + String sortPartitionField = options.manifestSortPartitionField(); + long suggestedMetaSize = options.manifestTargetSize().getBytes(); + int suggestedMinMetaCount = options.manifestMergeMinCount(); + long fullCompactionThreshold = options.manifestFullCompactionThresholdSize().getBytes(); + long maxRewriteSize = options.manifestSortMaxRewriteSize(); + int maxSizeAmplificationPercent = options.maxSizeAmplificationPercent(); + int sortedRunSizeRatio = options.sortedRunSizeRatio(); + Integer manifestReadParallelism = options.scanManifestParallelism(); + + Optional> fullCompacted = + tryFullCompaction( + input, + newFilesForAbort, + manifestFile, + partitionType, + sortPartitionField, + suggestedMetaSize, + suggestedMinMetaCount, + fullCompactionThreshold, + maxRewriteSize, + maxSizeAmplificationPercent, + sortedRunSizeRatio, + manifestReadParallelism); + if (fullCompacted.isPresent()) { + return fullCompacted.get(); + } + return tryMinorCompaction( + input, + newFilesForAbort, + manifestFile, + partitionType, + sortPartitionField, + suggestedMetaSize, + suggestedMinMetaCount, + maxRewriteSize, + maxSizeAmplificationPercent, + sortedRunSizeRatio, + manifestReadParallelism); + } + + /** + * Full compaction path: totalDeltaFileSize >= sizeTrigger. + * + *

Does not build index mapping. sortAndRewriteSection writes all entries (ADD+DELETE merged) + * together without separating them. + */ + private static Optional> tryFullCompaction( + List input, + List newFilesForAbort, + ManifestFile manifestFile, + RowType partitionType, + String sortPartitionField, + long suggestedMetaSize, + int suggestedMinMetaCount, + long fullCompactionThreshold, + long maxRewriteSize, + int maxSizeAmplificationPercent, + int sortedRunSizeRatio, + @Nullable Integer manifestReadParallelism) + throws Exception { + // Step 1: Check if full compaction threshold is met + long totalDeltaFileSize = 0; + for (ManifestFileMeta file : input) { + if (file.numDeletedFiles() > 0 || file.fileSize() < suggestedMetaSize) { + totalDeltaFileSize += file.fileSize(); + } + } + if (totalDeltaFileSize < fullCompactionThreshold) { + return Optional.empty(); + } + // Step 2: Prepare compaction context + CompactionContext ctx = + prepareCompaction( + input, + true, + manifestFile, + partitionType, + sortPartitionField, + suggestedMetaSize, + maxSizeAmplificationPercent, + sortedRunSizeRatio, + manifestReadParallelism); + List levelRuns = ctx.levelRuns; + List pickedRuns = ctx.pickedRuns; + + if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) { + LOG.debug( + "Manifest sort full compact skipped: no runs picked and no defaultCompaction files."); + return Optional.empty(); + } + + LOG.info( + "Manifest sort full compact: input={} files, lsm={} runs, picked={} runs, " + + "defaultCompaction={} files.", + input.size(), + levelRuns.size(), + pickedRuns.size(), + ctx.defaultCompactionMap.size()); + + // Step 3: Collect reused files (not picked) and picked files + Set pickedSet = new HashSet<>(pickedRuns); + List result = new ArrayList<>(); + for (ManifestAdjacentSortedRun run : levelRuns) { + if (!pickedSet.contains(run)) { + result.addAll(run.files()); + } + } + List pickedFiles = new ArrayList<>(); + for (ManifestAdjacentSortedRun run : pickedRuns) { + pickedFiles.addAll(run.files()); + } + pickedFiles.addAll(ctx.defaultCompactionMap.keySet()); + + // Step 4: Split into sections and merge small adjacent sections + List

sections = + splitIntoSections(pickedFiles, ctx.fieldComparator, ctx.defaultCompactionMap); + sections = mergeSmallAdjacentSections(sections, suggestedMetaSize); + + LOG.info( + "Manifest sort full compact: pickedFiles={}, sections={}.", + pickedFiles.size(), + sections.size()); + + // Step 5: Rewrite sections + FullCompactOutput output = new FullCompactOutput(result); + rewriteSections( + sections, + output, + newFilesForAbort, + ctx, + manifestFile, + suggestedMetaSize, + suggestedMinMetaCount, + maxRewriteSize, + manifestReadParallelism); + + LOG.info( + "Manifest sort full compact completed: input={}, resultFiles={}.", + input.size(), + result.size()); + return Optional.of(result); + } + + /** + * Minor compaction path: totalDeltaFileSize < sizeTrigger. + * + *

Builds index mapping to preserve original positions. sortAndRewriteSection separates ADD + * and DELETE entries, placing ADD at result[minIdx] and DELETE at result[maxIdx]. + */ + private static List tryMinorCompaction( + List input, + List newFilesForAbort, + ManifestFile manifestFile, + RowType partitionType, + String sortPartitionField, + long suggestedMetaSize, + int suggestedMinMetaCount, + long maxRewriteSize, + int maxSizeAmplificationPercent, + int sortedRunSizeRatio, + @Nullable Integer manifestReadParallelism) + throws Exception { + // Step 1: Prepare compaction context (early-return if nothing to compact) + CompactionContext ctx = + prepareCompaction( + input, + false, + manifestFile, + partitionType, + sortPartitionField, + suggestedMetaSize, + maxSizeAmplificationPercent, + sortedRunSizeRatio, + manifestReadParallelism); + List levelRuns = ctx.levelRuns; + List pickedRuns = ctx.pickedRuns; + + if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) { + LOG.debug( + "Manifest sort minor compact skipped: no runs picked and no defaultCompaction files."); + return input; + } + + LOG.info( + "Manifest sort minor compact: input={} files, lsm={} runs, picked={} runs, " + + "defaultCompaction={} files.", + input.size(), + levelRuns.size(), + pickedRuns.size(), + ctx.defaultCompactionMap.size()); + + // Step 2: Build fileName -> index mapping and initialize 2D result + Map fileNameToIndex = new HashMap<>(); + List> result = new ArrayList<>(input.size()); + for (int i = 0; i < input.size(); i++) { + fileNameToIndex.put(input.get(i).fileName(), i); + result.add(new ArrayList<>()); + } + + // Step 3: Collect reused files and picked files + Set pickedSet = new HashSet<>(pickedRuns); + for (ManifestAdjacentSortedRun run : levelRuns) { + if (!pickedSet.contains(run)) { + for (ManifestFileMeta file : run.files()) { + Integer idx = fileNameToIndex.get(file.fileName()); + if (idx != null) { + result.get(idx).add(file); + } + } + } + } + + List pickedFiles = new ArrayList<>(); + for (ManifestAdjacentSortedRun run : pickedRuns) { + pickedFiles.addAll(run.files()); + } + pickedFiles.addAll(ctx.defaultCompactionMap.keySet()); + + // Step 4: Compute index range + int minIdx = Integer.MAX_VALUE; + int maxIdx = Integer.MIN_VALUE; + for (ManifestFileMeta meta : pickedFiles) { + Integer idx = fileNameToIndex.get(meta.fileName()); + if (idx != null) { + minIdx = Math.min(minIdx, idx); + maxIdx = Math.max(maxIdx, idx); + } + } + Pair indexRange = Pair.of(minIdx, maxIdx); + + // Step 5: Split into sections and merge small adjacent sections + List

sections = + splitIntoSections(pickedFiles, ctx.fieldComparator, ctx.defaultCompactionMap); + sections = mergeSmallAdjacentSections(sections, suggestedMetaSize); + + LOG.info( + "Manifest sort minor compact: pickedFiles={}, sections={}.", + pickedFiles.size(), + sections.size()); + + // Step 6: Rewrite sections + MinorCompactOutput output = new MinorCompactOutput(result, indexRange, fileNameToIndex); + rewriteSections( + sections, + output, + newFilesForAbort, + ctx, + manifestFile, + suggestedMetaSize, + suggestedMinMetaCount, + maxRewriteSize, + manifestReadParallelism); + + // Step 7: Flatten 2D result into a single list + List flatResult = new ArrayList<>(); + for (List subList : result) { + flatResult.addAll(subList); + } + + LOG.info( + "Manifest sort minor compact completed: input={}, resultFiles={}.", + input.size(), + flatResult.size()); + return flatResult; + } + + /** + * Prepare compaction context: resolve sort field, classify manifests, build level runs, and + * pick runs for compaction. + * + * @return CompactionContext containing all shared state + */ + private static CompactionContext prepareCompaction( + List input, + boolean fullCompaction, + ManifestFile manifestFile, + RowType partitionType, + String sortPartitionField, + long suggestedMetaSize, + int maxSizeAmplificationPercent, + int sortedRunSizeRatio, + @Nullable Integer manifestReadParallelism) { + + // Step 1: Resolve sort field and build comparator for partition ordering. + String sortField = resolveSortField(sortPartitionField, partitionType); + if (sortField == null) { + throw new IllegalArgumentException( + "Cannot resolve sort field for manifest sort rewrite."); + } + int sortFieldIndex = partitionType.getFieldNames().indexOf(sortField); + RecordComparator fieldComparator = + CodeGenUtils.newRecordComparator( + partitionType.getFieldTypes(), new int[] {sortFieldIndex}); + + // Step 2: Classify manifests into LSM files and collect delete entries. + ClassifyResult classifyResult = + classifyManifests( + input, + fullCompaction, + manifestFile, + partitionType, + suggestedMetaSize, + manifestReadParallelism); + List lsmFiles = classifyResult.lsmFiles; + + // Step 3: Build level-sorted runs from LSM files based on partition order. + List levelRuns = + lsmFiles.isEmpty() + ? new ArrayList<>() + : buildLevelSortedRuns(lsmFiles, fieldComparator); + + // Step 4: Pick runs for compaction using size amplification and ratio strategy. + ManifestPickStrategy pickStrategy = + new ManifestPickStrategy(maxSizeAmplificationPercent, sortedRunSizeRatio); + List pickedRuns = pickStrategy.pick(levelRuns); + + return new CompactionContext( + fullCompaction, + fieldComparator, + classifyResult.deleteEntries, + classifyResult.defaultCompactionMap, + levelRuns, + pickedRuns); + } + + /** + * Classify manifest files into default-compaction group and LSM group. + * + *

Full compaction: small files and files overlapping delete partitions go into + * defaultCompactionMap; the rest are returned as lsmFiles. + * + *

Non-full compaction: small files go to defaultCompactionMap for minor-style merge; the + * rest are returned as lsmFiles. + * + * @return ClassifyResult containing lsmFiles, deleteEntries, and defaultCompactionMap + */ + private static ClassifyResult classifyManifests( + List input, + boolean fullCompaction, + ManifestFile manifestFile, + RowType partitionType, + long suggestedMetaSize, + @Nullable Integer manifestReadParallelism) { + // Initialize classification containers and read delete entries + Map classifiedDefaultMap = new LinkedHashMap<>(); + List lsmFiles = new LinkedList<>(input); + Set classifiedDeleteEntries = Collections.emptySet(); + PartitionPredicate predicate = null; + if (fullCompaction) { + classifiedDeleteEntries = + FileEntry.readDeletedEntries(manifestFile, input, manifestReadParallelism); + + // Build partition predicate from delete entries for overlap detection + if (classifiedDeleteEntries.isEmpty()) { + predicate = PartitionPredicate.ALWAYS_FALSE; + } else { + if (partitionType.getFieldCount() > 0) { + Set deletePartitions = + ManifestFileMerger.computeDeletePartitions(classifiedDeleteEntries); + predicate = PartitionPredicate.fromMultiple(partitionType, deletePartitions); + } else { + predicate = PartitionPredicate.ALWAYS_TRUE; + } + } + } + + // Classify each file based on size and delete-partition overlap + Iterator iterator = lsmFiles.iterator(); + while (iterator.hasNext()) { + ManifestFileMeta file = iterator.next(); + boolean small = file.fileSize() < suggestedMetaSize; + boolean inDeleteRange = + predicate != null + && predicate.test( + file.numAddedFiles() + file.numDeletedFiles(), + file.partitionStats().minValues(), + file.partitionStats().maxValues(), + file.partitionStats().nullCounts()); + if (small || inDeleteRange) { + iterator.remove(); + classifiedDefaultMap.put(file, inDeleteRange); + } + } + + return new ClassifyResult(lsmFiles, classifiedDeleteEntries, classifiedDefaultMap); + } + + /** + * Build level-sorted runs from a list of manifest files. Sorts files by min partition value, + * greedy-scans to build non-overlapping SortedRuns, then assigns levels by totalSize (Top-4 + * largest to level 1~4, rest to level 0). + */ + static List buildLevelSortedRuns( + List input, RecordComparator fieldComparator) { + // Step 1: Sort by min value (if equal, then by max value) + input.sort( + (a, b) -> { + int cmp = + fieldComparator.compare( + a.partitionStats().minValues(), b.partitionStats().minValues()); + if (cmp != 0) { + return cmp; + } + return fieldComparator.compare( + a.partitionStats().maxValues(), b.partitionStats().maxValues()); + }); + + // Step 2: Interval graph coloring algorithm - assign files to runs + // Use priority queue to track runs by their max values + PriorityQueue> runs = + new PriorityQueue<>( + (r1, r2) -> { + ManifestFileMeta last1 = r1.get(r1.size() - 1); + ManifestFileMeta last2 = r2.get(r2.size() - 1); + return fieldComparator.compare( + last1.partitionStats().maxValues(), + last2.partitionStats().maxValues()); + }); + + for (ManifestFileMeta file : input) { + List earliestRun = runs.poll(); + if (earliestRun == null) { + // No existing runs, create a new one + List newRun = new ArrayList<>(); + newRun.add(file); + runs.offer(newRun); + } else if (fieldComparator.compare( + file.partitionStats().minValues(), + earliestRun.get(earliestRun.size() - 1).partitionStats().maxValues()) + >= 0) { + // Current file's min >= run's max, append to this run + // Note: When min == max (boundary equality), files are considered + // non-overlapping and can be placed in the same SortedRun. This allows + // building fewer SortedRuns, improving compaction efficiency while + // maintaining correct sort order. However, these files may later be separated + // into different Sections during splitIntoSections to avoid merge-sort overhead. + // + // See ManifestAdjacentSortedRun class comment for the full boundary equality + // semantics. + earliestRun.add(file); + runs.offer(earliestRun); + } else { + // Overlap detected, put the run back and create a new one + runs.offer(earliestRun); + List newRun = new ArrayList<>(); + newRun.add(file); + runs.offer(newRun); + } + } + + // Step 3: Convert to ManifestAdjacentSortedRun list + List result = new ArrayList<>(); + while (!runs.isEmpty()) { + result.add(ManifestAdjacentSortedRun.fromSorted(runs.poll())); + } + + // Step 4: Sort by totalSize and assign levels + result.sort(Comparator.comparingLong(ManifestAdjacentSortedRun::totalSize)); + int n = result.size(); + int maxLevel = ManifestPickStrategy.MAX_LEVEL; + for (int i = 0; i < n; i++) { + if (i >= n - maxLevel) { + result.get(i).setLevel(i - (n - maxLevel) + 1); + } else { + result.get(i).setLevel(0); + } + } + return result; + } + + /** + * Split picked files into sections. Files with overlapping sort-key intervals go into the same + * section. Each section is built with pre-computed totalSize and hasDefaultCompactMeta. + */ + static List

splitIntoSections( + List pickedFiles, + RecordComparator fieldComparator, + Map defaultCompactionMap) { + pickedFiles.sort( + (a, b) -> { + int cmp = + fieldComparator.compare( + a.partitionStats().minValues(), b.partitionStats().minValues()); + if (cmp != 0) { + return cmp; + } + return fieldComparator.compare( + a.partitionStats().maxValues(), b.partitionStats().maxValues()); + }); + + List
sections = new ArrayList<>(); + List currentFiles = new ArrayList<>(); + long currentTotalSize = 0; + boolean currentHasDefault = false; + ManifestFileMeta first = pickedFiles.get(0); + currentFiles.add(first); + currentTotalSize += first.fileSize(); + currentHasDefault = defaultCompactionMap.containsKey(first); + BinaryRow sectionMaxBound = first.partitionStats().maxValues(); + + for (int i = 1; i < pickedFiles.size(); i++) { + ManifestFileMeta file = pickedFiles.get(i); + // Note: Boundary equality (file.min == sectionMaxBound) results in separate + // sections. This design choice balances three factors: + // 1. Avoid merge-sort overhead: Files with non-overlapping boundaries can be processed + // independently without merge-sort, improving performance. + // 2. Maintain partition filtering capability: Each section has a distinct key range, + // enabling efficient partition pruning during queries. + // 3. Preserve ordering invariant: Separating boundary-touching files into different + // sections + // does not break the global sort order, as they are still processed in ascending + // order. + // + // IMPORTANT: While boundary-touching files are separated into different Sections here, + // they may be placed in the same SortedRun during buildLevelSortedRuns (which uses >= 0 + // comparison). This dual behavior is intentional and documented in class comments. + if (fieldComparator.compare(file.partitionStats().minValues(), sectionMaxBound) >= 0) { + sections.add(new Section(currentFiles, currentTotalSize, currentHasDefault)); + currentFiles = new ArrayList<>(); + currentTotalSize = 0; + currentFiles.add(file); + currentTotalSize += file.fileSize(); + currentHasDefault = defaultCompactionMap.containsKey(file); + sectionMaxBound = file.partitionStats().maxValues(); + } else { + currentFiles.add(file); + currentTotalSize += file.fileSize(); + if (!currentHasDefault && defaultCompactionMap.containsKey(file)) { + currentHasDefault = true; + } + if (fieldComparator.compare(file.partitionStats().maxValues(), sectionMaxBound) + > 0) { + sectionMaxBound = file.partitionStats().maxValues(); + } + } + } + sections.add(new Section(currentFiles, currentTotalSize, currentHasDefault)); + return sections; + } + + /** + * Merge small adjacent sections to avoid producing too many small rewrite batches. If either + * the pending section or the current section total size is smaller than {@code + * suggestedMetaSize}, they are combined into a single section. + */ + private static List
mergeSmallAdjacentSections( + List
sections, long suggestedMetaSize) { + List
merged = new ArrayList<>(); + Section pending = null; + + for (Section section : sections) { + if (pending == null) { + pending = section; + } else { + if (pending.totalSize < suggestedMetaSize + || section.totalSize < suggestedMetaSize) { + pending = Section.merge(pending, section); + } else { + merged.add(pending); + pending = section; + } + } + } + if (pending != null) { + merged.add(pending); + } + return merged; + } + + /** + * Rewrite sections with budget control. + * + *

Semantics of manifest-sort.max-rewrite-size: This budget applies only to the sorted + * rewrite portion. When the cumulative size reaches the limit: + * + *

    + *
  • First overflow: The current section is split. The rewritable part is sorted and + * rewritten. The remaining part is appended back to the sections queue for later + * processing. + *
  • Subsequent overflows: If the section has files in defaultCompactionMap (needs default + * compaction), rewriteSubSegments is called to process it in smaller chunks. Otherwise, + * the section is skipped. + *
+ * + *

This design ensures that the budget only limits the aggressive sort rewrite, while still + * allowing necessary cleanup operations (delete entry elimination, small file merge) through + * the rewriteSubSegments fallback path. + */ + private static void rewriteSections( + List

sections, + RewriteOutput output, + List sortNewFiles, + CompactionContext ctx, + ManifestFile manifestFile, + long suggestedMetaSize, + int suggestedMinMetaCount, + long maxRewriteSize, + @Nullable Integer manifestReadParallelism) + throws Exception { + long processedSize = 0; + boolean reachedLimit = false; + + for (int i = 0; i < sections.size(); i++) { + Section section = sections.get(i); + if (section.files.size() == 1) { + sortAndRewriteSection( + section.files, + output, + sortNewFiles, + ctx, + manifestFile, + manifestReadParallelism); + continue; + } + + if (processedSize + section.totalSize <= maxRewriteSize) { + processedSize += section.totalSize; + sortAndRewriteSection( + section.files, + output, + sortNewFiles, + ctx, + manifestFile, + manifestReadParallelism); + } else if (!reachedLimit) { + long rewriteTotalSize = maxRewriteSize - processedSize; + processedSize += section.totalSize; + List rewriteFiles = new ArrayList<>(); + List remainingFiles = new ArrayList<>(); + long rewriteSize = 0; + long remainingSize = 0; + boolean remainingHasDefault = false; + + for (ManifestFileMeta file : section.files) { + if (rewriteSize + file.fileSize() <= rewriteTotalSize) { + rewriteFiles.add(file); + rewriteSize += file.fileSize(); + } else { + remainingFiles.add(file); + remainingSize += file.fileSize(); + if (ctx.defaultCompactionMap.containsKey(file)) { + remainingHasDefault = true; + } + } + } + + sortAndRewriteSection( + rewriteFiles, + output, + sortNewFiles, + ctx, + manifestFile, + manifestReadParallelism); + + if (!remainingFiles.isEmpty()) { + Section remainingSection = + new Section(remainingFiles, remainingSize, remainingHasDefault); + // global manifest file metas order by sort key is not a required invariant + sections.add(remainingSection); + } + reachedLimit = true; + } else if (section.hasDefaultCompactMeta) { + rewriteSubSegments( + section.files, + output, + sortNewFiles, + ctx, + manifestFile, + suggestedMetaSize, + suggestedMinMetaCount, + manifestReadParallelism); + } else { + output.addAllUnchanged(section.files); + } + } + } + + /** + * Rewrite a section in smaller sub-segments when it exceeds the sort rewrite budget. + * + *

Semantics difference from old minor merge: In the old ManifestFileMerger path, the + * trailing candidates are kept unchanged when their count is below manifest.merge-min-count. In + * this sort path, rewriteSubSegments is triggered when defaultCompactionMap is non-empty, + * regardless of the manifest count. This is because files in defaultCompactionMap either: + * + *

    + *
  • Are small files needing consolidation + *
  • Contain delete entries that must be eliminated + *
+ * + *

The manifest.merge-min-count threshold is still applied to the final sub-segment's tail, + * acting as a conservative gate to avoid unnecessary rewrite when there are no delete entries + * and the tail is too small. + */ + private static void rewriteSubSegments( + List section, + RewriteOutput output, + List sortNewFiles, + CompactionContext ctx, + ManifestFile manifestFile, + long suggestedMetaSize, + int suggestedMinMetaCount, + @Nullable Integer manifestReadParallelism) + throws Exception { + List subSegment = new ArrayList<>(); + long subSegmentSize = 0; + for (ManifestFileMeta m : section) { + subSegmentSize += m.fileSize(); + subSegment.add(m); + + if (subSegmentSize >= suggestedMetaSize) { + sortAndRewriteSection( + subSegment, + output, + sortNewFiles, + ctx, + manifestFile, + manifestReadParallelism); + subSegment.clear(); + subSegmentSize = 0; + } + } + // Flush tail only if delete entries exist or file count >= minCount. + if (!subSegment.isEmpty()) { + if (!ctx.deleteEntries.isEmpty() || subSegment.size() >= suggestedMinMetaCount) { + sortAndRewriteSection( + subSegment, + output, + sortNewFiles, + ctx, + manifestFile, + manifestReadParallelism); + } else { + output.addAllUnchanged(subSegment); + } + } + } + + /** + * Sort and rewrite a section. Dispatches to full or minor compact path. + * + *

sortNewFiles is the same reference as newFilesForAbort, ensuring newly written files are + * cleaned up on exception by the caller's catch block. + */ + private static void sortAndRewriteSection( + List section, + RewriteOutput output, + List sortNewFiles, + CompactionContext ctx, + ManifestFile manifestFile, + @Nullable Integer manifestReadParallelism) + throws Exception { + // Skip rewrite for single file not in delete-range. + if (section.size() == 1 && !ctx.defaultCompactionMap.getOrDefault(section.get(0), false)) { + output.addUnchanged(section.get(0)); + return; + } + + if (ctx.fullCompaction) { + sortAndRewriteFull( + section, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism); + } else { + sortAndRewriteMinor( + section, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism); + } + } + + /** + * Full compaction path: read all surviving entries (ADD merged with DELETE), sort them + * together, and write to output as a single sorted stream. + */ + private static void sortAndRewriteFull( + List section, + RewriteOutput output, + List sortNewFiles, + CompactionContext ctx, + ManifestFile manifestFile, + @Nullable Integer manifestReadParallelism) + throws Exception { + // Read surviving ADD entries: filter out entries cancelled by deleteEntries. + Function> reader = + meta -> { + List batch = new ArrayList<>(); + for (ManifestEntry entry : + manifestFile.read( + meta.fileName(), + meta.fileSize(), + FileEntry.addFilter(), + Filter.alwaysTrue())) { + if (!ctx.deleteEntries.contains(entry.identifier())) { + batch.add(entry); + } + } + return batch; + }; + + List entries = new ArrayList<>(); + for (ManifestEntry entry : + sequentialBatchedExecute(reader, section, manifestReadParallelism)) { + entries.add(entry); + } + + if (!entries.isEmpty()) { + List sorted = + sortAndWriteEntries(entries, ctx.fieldComparator, manifestFile); + output.addSortedFiles(sorted); + sortNewFiles.addAll(sorted); + } + } + + /** + * Minor compaction path: read entries with ADD/DELETE classified in a single pass per file, + * then sort each group independently and write them to output. + * + *

Each file is read in parallel (via sequentialBatchedExecute). The reader classifies + * entries into ADD and DELETE within each file, returning a Pair. Results are merged in the + * main thread. + */ + private static void sortAndRewriteMinor( + List section, + RewriteOutput output, + List sortNewFiles, + CompactionContext ctx, + ManifestFile manifestFile, + @Nullable Integer manifestReadParallelism) + throws Exception { + // Read and classify ADD/DELETE in one pass per file. + Function, List>>> reader = + meta -> { + List addBatch = new ArrayList<>(); + List deleteBatch = new ArrayList<>(); + for (ManifestEntry entry : + manifestFile.read(meta.fileName(), meta.fileSize())) { + if (entry.kind() == FileKind.ADD) { + addBatch.add(entry); + } else { + deleteBatch.add(entry); + } + } + return singletonList(Pair.of(addBatch, deleteBatch)); + }; + + Map addMap = new HashMap<>(); + List minorDeleteEntries = new ArrayList<>(); + for (Pair, List> pair : + sequentialBatchedExecute(reader, section, manifestReadParallelism)) { + for (ManifestEntry entry : pair.getLeft()) { + addMap.put(entry.identifier(), entry); + } + minorDeleteEntries.addAll(pair.getRight()); + } + + // Cancel out ADD+DELETE pairs with the same identifier within the section. + minorDeleteEntries.removeIf( + manifestEntry -> addMap.remove(manifestEntry.identifier()) != null); + List addEntries = new ArrayList<>(addMap.values()); + + if (!addEntries.isEmpty()) { + List sorted = + sortAndWriteEntries(addEntries, ctx.fieldComparator, manifestFile); + output.addSortedFiles(sorted); + sortNewFiles.addAll(sorted); + } + + if (!minorDeleteEntries.isEmpty()) { + List sorted = + sortAndWriteEntries(minorDeleteEntries, ctx.fieldComparator, manifestFile); + output.addDeleteFiles(sorted); + sortNewFiles.addAll(sorted); + } + } + + /** Sort entries and write them to a new manifest file with proper error handling. */ + private static List sortAndWriteEntries( + List entries, + RecordComparator fieldComparator, + ManifestFile manifestFile) + throws Exception { + entries.sort((a, b) -> compareSortKey(a, b, fieldComparator)); + RollingFileWriter writer = + manifestFile.createRollingWriter(); + Exception exception = null; + try { + writer.write(entries); + } catch (Exception e) { + exception = e; + } finally { + if (exception != null) { + writer.abort(); + throw exception; + } + writer.close(); + } + return writer.result(); + } + + /** + * Compare two {@link ManifestEntry}s by the composite key {@code (sort-field, kind, fileName)}. + * {@code fileName} is used as the tie-breaker so that all entries sharing the same sort-field + * value AND the same data file are emitted contiguously. + */ + static int compareSortKey(ManifestEntry a, ManifestEntry b, RecordComparator fieldComparator) { + int c = fieldComparator.compare(a.partition(), b.partition()); + if (c != 0) { + return c; + } + // ADD before DELETE + int kindCmp = a.kind().compareTo(b.kind()); + if (kindCmp != 0) { + return kindCmp; + } + return a.file().fileName().compareTo(b.file().fileName()); + } + + /** + * Resolve the partition field to sort manifests by. + * + *

Resolution rules: + * + *

    + *
  1. If {@code manifest-sort.partition-field} is configured, return that value. + *
  2. Otherwise, default to the first partition field. + *
+ */ + static String resolveSortField(String sortPartitionField, RowType partitionType) { + if (sortPartitionField != null && !sortPartitionField.isEmpty()) { + return sortPartitionField; + } + return partitionType.getFieldNames().get(0); + } + + /** Strategy interface for writing compaction results. */ + interface RewriteOutput { + void addUnchanged(ManifestFileMeta file); + + void addAllUnchanged(List files); + + void addSortedFiles(List files); + + void addDeleteFiles(List files); + } + + private static class FullCompactOutput implements RewriteOutput { + private final List result; + + FullCompactOutput(List result) { + this.result = result; + } + + @Override + public void addUnchanged(ManifestFileMeta file) { + result.add(file); + } + + @Override + public void addAllUnchanged(List files) { + result.addAll(files); + } + + @Override + public void addSortedFiles(List files) { + result.addAll(files); + } + + @Override + public void addDeleteFiles(List files) { + result.addAll(files); + } + } + + private static class MinorCompactOutput implements RewriteOutput { + private final List> result; + private final Pair indexRange; + private final Map fileNameToIndex; + + MinorCompactOutput( + List> result, + Pair indexRange, + Map fileNameToIndex) { + this.result = result; + this.indexRange = indexRange; + this.fileNameToIndex = fileNameToIndex; + } + + @Override + public void addUnchanged(ManifestFileMeta file) { + Integer idx = fileNameToIndex.get(file.fileName()); + result.get(idx).add(file); + } + + @Override + public void addAllUnchanged(List files) { + for (ManifestFileMeta file : files) { + addUnchanged(file); + } + } + + @Override + public void addSortedFiles(List files) { + result.get(indexRange.getLeft()).addAll(files); + } + + @Override + public void addDeleteFiles(List files) { + result.get(indexRange.getRight()).addAll(files); + } + } + + /** A section of manifest files with pre-computed metadata. */ + static class Section { + final List files; + final long totalSize; + final boolean hasDefaultCompactMeta; + + Section(List files, long totalSize, boolean hasDefaultCompactMeta) { + this.files = files; + this.totalSize = totalSize; + this.hasDefaultCompactMeta = hasDefaultCompactMeta; + } + + /** Create a merged section from two sections. */ + static Section merge(Section a, Section b) { + List merged = new ArrayList<>(a.files); + merged.addAll(b.files); + return new Section( + merged, + a.totalSize + b.totalSize, + a.hasDefaultCompactMeta || b.hasDefaultCompactMeta); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestPickStrategy.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestPickStrategy.java new file mode 100644 index 000000000000..519c49676ce3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestPickStrategy.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.utils.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Pick strategy for manifest LSM Tree compaction. + * + *

Strategy priority: + * + *

    + *
  1. SizeAmp: if all lower-level runs' total size exceeds the highest-level run's size + * times {@code sizeAmpThreshold}, trigger full compaction (pick all runs). + *
  2. SizeRatio: from low to high, pick adjacent runs whose amplification factor is less + * than {@code sizeRatioThreshold}. + *
  3. Forced pick: level0 and level1 runs are always picked. + *
+ */ +public class ManifestPickStrategy { + + public static final int MAX_LEVEL = 4; + + private final int sizeAmpThreshold; + private final int sizeRatioThreshold; + + public ManifestPickStrategy(int sizeAmpThreshold, int sizeRatioThreshold) { + Preconditions.checkArgument(sizeAmpThreshold > 0, "sizeAmpThreshold must be positive"); + Preconditions.checkArgument(sizeRatioThreshold > 0, "sizeRatioThreshold must be positive"); + this.sizeAmpThreshold = sizeAmpThreshold; + this.sizeRatioThreshold = sizeRatioThreshold; + } + + /** + * Pick runs that need compaction from the given level runs. + * + * @param levelRuns runs with assigned levels (level 0~4) + * @return list of picked runs to compact + */ + public List pick(List levelRuns) { + if (levelRuns.isEmpty() || levelRuns.size() <= MAX_LEVEL) { + return new ArrayList<>(); + } + + // Try SizeAmp first + List sizeAmpResult = pickForSizeAmp(levelRuns); + if (sizeAmpResult != null) { + return sizeAmpResult; + } + + // SizeRatio + forced pick + return pickForSizeRatioAndForce(levelRuns); + } + + /** + * SizeAmp check: if all lower-level (0~3) runs' total size exceeds the highest-level run's size + * by more than {@code sizeAmpThreshold} percent, pick all runs for full compaction. + * + *

Formula (consistent with {@code UniversalCompaction#pickForSizeAmp}): {@code + * lowerLevelTotalSize * 100 > sizeAmpThreshold * highestRunSize} + */ + private List pickForSizeAmp( + List levelRuns) { + if (levelRuns.isEmpty()) { + return null; + } + + // The last run has the highest level (set by buildLevelSortedRuns) + ManifestAdjacentSortedRun highestRun = levelRuns.get(levelRuns.size() - 1); + int maxLevel = highestRun.level(); + + if (maxLevel <= 0) { + return null; + } + + long lowerLevelTotalSize = 0; + for (ManifestAdjacentSortedRun run : levelRuns) { + if (run.level() < maxLevel) { + lowerLevelTotalSize += run.totalSize(); + } + } + + // size amplification = percentage of additional size + if (lowerLevelTotalSize * 100 > (long) sizeAmpThreshold * highestRun.totalSize()) { + return new ArrayList<>(levelRuns); + } + return null; + } + + /** + * SizeRatio + forced pick. + * + *

    + *
  • Level0 and level1 are always picked. + *
  • From low to high, if the cumulative picked size with ratio amplification covers the + * next run's size, continue picking. + *
+ * + *

Formula (consistent with {@code UniversalCompaction#pickForSizeRatio}): {@code pickedSize + * * (100.0 + sizeRatioThreshold) / 100.0 >= nextRunSize} + */ + private List pickForSizeRatioAndForce( + List levelRuns) { + // levelRuns is already sorted by level ascending (set by buildLevelSortedRuns) + List picked = new ArrayList<>(); + + // Always pick the first run to guarantee a non-empty result. + picked.add(levelRuns.get(0)); + long pickedSize = levelRuns.get(0).totalSize(); + + // From the second run onward: forced pick level0/level1, then SizeRatio for the rest. + for (int i = 1; i < levelRuns.size(); i++) { + ManifestAdjacentSortedRun run = levelRuns.get(i); + if (run.level() <= 1) { + picked.add(run); + pickedSize += run.totalSize(); + } else { + long nextRunSize = run.totalSize(); + if (pickedSize * (100 + sizeRatioThreshold) >= nextRunSize * 100L) { + picked.add(run); + pickedSize += nextRunSize; + } + } + } + if (picked.size() == 1) { + return new ArrayList<>(); + } + return picked; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index d07ad2581944..50228385a90a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -319,6 +319,8 @@ public static void validateTableSchema(TableSchema schema) { validateChangelogReadSequenceNumber(schema, options); validatePkClusteringOverride(options); + + validateManifestSort(schema, options); } public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) { @@ -1032,4 +1034,22 @@ public static void validatePkClusteringOverride(CoreOptions options) { } } } + + private static void validateManifestSort(TableSchema schema, CoreOptions options) { + if (options.manifestSortEnabled()) { + checkArgument( + !schema.partitionKeys().isEmpty(), + "Cannot enable '%s' for non-partition table.", + CoreOptions.MANIFEST_SORT_ENABLED.key()); + String sortPartitionField = options.manifestSortPartitionField(); + if (sortPartitionField != null && !sortPartitionField.isEmpty()) { + checkArgument( + schema.partitionKeys().contains(sortPartitionField), + "'%s' = '%s' is not a partition field. Available partition fields: %s.", + CoreOptions.MANIFEST_SORT_PARTITION_FIELD.key(), + sortPartitionField, + schema.partitionKeys()); + } + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index 36b0d15f114f..75a1ab0a84df 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -18,16 +18,26 @@ package org.apache.paimon.manifest; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.fs.SeekableInputStreamWrapper; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.operation.ManifestFileMerger; +import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.stats.StatsTestUtils; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -42,6 +52,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -84,9 +95,16 @@ public void testMergeWithoutFullCompaction(int numLastBits) { createData(numLastBits, input, expected); // no trigger Full Compaction + Options testOptions = new Options(); + testOptions.set("manifest.target-file-size", "500B"); + testOptions.set("manifest.merge-min-count", "3"); + testOptions.set("manifest.full-compaction-threshold-size", "9223372036854775807B"); List actual = ManifestFileMerger.merge( - input, manifestFile, 500, 3, Long.MAX_VALUE, getPartitionType(), null); + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); assertThat(actual).hasSameSizeAs(expected); // these two manifest files are merged from the input @@ -118,14 +136,16 @@ private void testCleanUp(List input, long fullCompactionThresh ManifestFile failingManifestFile = createManifestFile(FailingFileIO.getFailingPath(failingName, tempDir.toString())); try { + Options testOptions = new Options(); + testOptions.set("manifest.target-file-size", "500B"); + testOptions.set("manifest.merge-min-count", "3"); + testOptions.set( + "manifest.full-compaction-threshold-size", fullCompactionThreshold + "B"); ManifestFileMerger.merge( input, failingManifestFile, - 500, - 3, - fullCompactionThreshold, getPartitionType(), - null); + CoreOptions.fromMap(testOptions.toMap())); } catch (Throwable e) { assertThat(e).hasRootCauseExactlyInstanceOf(FailingFileIO.ArtificialException.class); // old files should be kept untouched, while new files should be cleaned up @@ -156,9 +176,16 @@ public void testMerge() { // delta with delete apply partition 1,2 addDeltaManifests(input, true); // trigger full compaction + Options testOptions = new Options(); + testOptions.set("manifest.target-file-size", "500B"); + testOptions.set("manifest.merge-min-count", "3"); + testOptions.set("manifest.full-compaction-threshold-size", "200B"); List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); // 1st Manifest don't need to Merge assertSameContent(input.get(0), merged.get(0), manifestFile); @@ -173,9 +200,16 @@ public void testMergeWithoutDelta() { // base List input = createBaseManifestFileMetas(true); + Options testOptions = new Options(); + testOptions.set("manifest.target-file-size", "500B"); + testOptions.set("manifest.merge-min-count", "3"); + testOptions.set("manifest.full-compaction-threshold-size", "200B"); List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); assertEquivalentEntries(input, merged); assertThat(merged).hasSameElementsAs(input); @@ -186,9 +220,16 @@ public void testMergeWithoutDelta() { ManifestFileMeta delta = makeManifest(makeEntry(true, "A", 1), makeEntry(false, "A", 1)); input1.add(delta); + Options testOptions1 = new Options(); + testOptions1.set("manifest.target-file-size", "500B"); + testOptions1.set("manifest.merge-min-count", "3"); + testOptions1.set("manifest.full-compaction-threshold-size", "200B"); List merged1 = ManifestFileMerger.merge( - input1, manifestFile, 500, 3, 200, getPartitionType(), null); + input1, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions1.toMap())); assertThat(base).hasSameElementsAs(merged1); assertEquivalentEntries(input1, merged1); @@ -198,9 +239,16 @@ public void testMergeWithoutDelta() { public void testMergeWithoutBase() { List input = new ArrayList<>(); addDeltaManifests(input, true); + Options testOptions = new Options(); + testOptions.set("manifest.target-file-size", "500B"); + testOptions.set("manifest.merge-min-count", "3"); + testOptions.set("manifest.full-compaction-threshold-size", "200B"); List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); assertEquivalentEntries(input, merged); } @@ -225,9 +273,16 @@ public void testMergeWithoutDeleteFile() { input.add(makeManifest(makeEntry(true, "F"))); input.add(makeManifest(makeEntry(true, "G"))); + Options testOptions = new Options(); + testOptions.set("manifest.target-file-size", "500B"); + testOptions.set("manifest.merge-min-count", "3"); + testOptions.set("manifest.full-compaction-threshold-size", "200B"); List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); assertEquivalentEntries(input, merged); } @@ -489,9 +544,16 @@ public void testMergeFullCompactionWithoutDeleteFile() { input.add(makeManifest(makeEntry(true, "F"))); input.add(makeManifest(makeEntry(true, "G"))); + Options testOptions = new Options(); + testOptions.set("manifest.target-file-size", threshold + "B"); + testOptions.set("manifest.merge-min-count", "3"); + testOptions.set("manifest.full-compaction-threshold-size", "200B"); List merged = ManifestFileMerger.merge( - input, manifestFile, threshold, 3, 200, getPartitionType(), null); + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); assertEquivalentEntries( input.stream() .filter(f -> !baseFiles.contains(f.fileName())) @@ -819,4 +881,621 @@ private void beforeFirstRead() throws IOException { } } } + + // ==================== Manifest Sort Tests ==================== + + /** + * Test manifest sort with overlapping partition ranges. Each manifest contains entries spanning + * multiple partitions, creating overlapping intervals that require sort rewrite to resolve. + * After sort rewrite, all surviving ADD entries should be sorted by partition field. + */ + @Test + public void testManifestSortWithOverlappingPartitions() { + List input = new ArrayList<>(); + + // manifest-A: partitions [5, 13] + List entriesA = new ArrayList<>(); + for (int p = 5; p <= 13; p++) { + entriesA.add(makeEntry(true, String.format("A-p%d", p), p)); + } + input.add(makeManifest(entriesA.toArray(new ManifestEntry[0]))); + + // manifest-B: partitions [0, 9] + List entriesB = new ArrayList<>(); + for (int p = 0; p <= 9; p++) { + entriesB.add(makeEntry(true, String.format("B-p%d", p), p)); + } + input.add(makeManifest(entriesB.toArray(new ManifestEntry[0]))); + + // manifest-C: partitions [3, 7] -- overlaps with A and B + List entriesC = new ArrayList<>(); + for (int p = 3; p <= 7; p++) { + entriesC.add(makeEntry(true, String.format("C-p%d", p), p)); + } + input.add(makeManifest(entriesC.toArray(new ManifestEntry[0]))); + + // manifest-D: partitions [8, 12] -- overlaps with A + List entriesD = new ArrayList<>(); + for (int p = 8; p <= 12; p++) { + entriesD.add(makeEntry(true, String.format("D-p%d", p), p)); + } + input.add(makeManifest(entriesD.toArray(new ManifestEntry[0]))); + + // manifest-E: partitions [1, 6] -- overlaps with B and C + List entriesE = new ArrayList<>(); + for (int p = 1; p <= 6; p++) { + entriesE.add(makeEntry(true, String.format("E-p%d", p), p)); + } + input.add(makeManifest(entriesE.toArray(new ManifestEntry[0]))); + + // manifest-F: partitions [4, 14] -- overlaps with D + List entriesF = new ArrayList<>(); + for (int p = 4; p <= 14; p++) { + entriesF.add(makeEntry(true, String.format("F-p%d", p), p)); + } + input.add(makeManifest(entriesF.toArray(new ManifestEntry[0]))); + + Options testOptions = new Options(); + testOptions.set("manifest-sort.enabled", "true"); + List merged = + ManifestFileMerger.merge( + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); + + // Verify entries are equivalent (no data loss) + assertEquivalentEntries(input, merged); + + // Verify all entries within each output manifest are sorted by partition + for (ManifestFileMeta meta : merged) { + List entries = manifestFile.read(meta.fileName(), meta.fileSize()); + for (int i = 1; i < entries.size(); i++) { + int prevPartition = entries.get(i - 1).partition().getInt(0); + int currPartition = entries.get(i).partition().getInt(0); + assertThat(currPartition) + .as("Entries within a manifest should be sorted by partition") + .isGreaterThanOrEqualTo(prevPartition); + } + } + } + + /** + * Test that sort rewrite correctly eliminates DELETE entries and their corresponding ADD + * entries. The key condition is that totalDeltaFileSize must reach manifestFullCompactionSize + * to trigger the full compaction path inside trySortRewrite, which reads deleteEntries and + * passes them to sortAndRewriteSection for elimination. + * + *

Design: + * + *

+     *   - Base manifests with overlapping partitions (all ADD, large enough to be "mustChange"
+     *     since fileSize < suggestedMetaSize):
+     *     manifest-A: partitions [0, 4] with entries A-p0..A-p4
+     *     manifest-B: partitions [2, 6] with entries B-p2..B-p6 (overlaps A)
+     *     manifest-C: partitions [5, 9] with entries C-p5..C-p9 (overlaps B)
+     *   - Delta manifests with DELETE entries (cancel some ADD entries):
+     *     manifest-D: DELETE A-p2, DELETE B-p4, ADD new-p2, ADD new-p4
+     *     manifest-E: DELETE C-p7, ADD new-p7
+     *   - After sort rewrite: A-p2, B-p4, C-p7 should be eliminated,
+     *     replaced by new-p2, new-p4, new-p7. Output should only contain ADD entries,
+     *     sorted by partition.
+     * 
+ */ + @Test + public void testManifestSortEliminatesDeleteEntries() { + List input = new ArrayList<>(); + + // manifest-A: partitions [0, 4] + List entriesA = new ArrayList<>(); + for (int p = 0; p <= 4; p++) { + entriesA.add(makeEntry(true, String.format("A-p%d", p), p)); + } + input.add(makeManifest(entriesA.toArray(new ManifestEntry[0]))); + + // manifest-B: partitions [2, 6] -- overlaps A + List entriesB = new ArrayList<>(); + for (int p = 2; p <= 6; p++) { + entriesB.add(makeEntry(true, String.format("B-p%d", p), p)); + } + input.add(makeManifest(entriesB.toArray(new ManifestEntry[0]))); + + // manifest-C: partitions [5, 9] -- overlaps B + List entriesC = new ArrayList<>(); + for (int p = 5; p <= 9; p++) { + entriesC.add(makeEntry(true, String.format("C-p%d", p), p)); + } + input.add(makeManifest(entriesC.toArray(new ManifestEntry[0]))); + + // manifest-D: DELETE A-p2, DELETE B-p4, ADD new-p2, ADD new-p4 + input.add( + makeManifest( + makeEntry(false, "A-p2", 2), + makeEntry(false, "B-p4", 4), + makeEntry(true, "new-p2", 2), + makeEntry(true, "new-p4", 4))); + + // manifest-E: DELETE C-p7, ADD new-p7 + input.add(makeManifest(makeEntry(false, "C-p7", 7), makeEntry(true, "new-p7", 7))); + + Options testOptions = new Options(); + testOptions.set("manifest-sort.enabled", "true"); + testOptions.set("manifest.full-compaction-threshold-size", "10B"); + + List merged = + ManifestFileMerger.merge( + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); + + // Collect all output entries + List allOutputEntries = new ArrayList<>(); + for (ManifestFileMeta meta : merged) { + allOutputEntries.addAll(manifestFile.read(meta.fileName(), meta.fileSize())); + } + + // Verify: no DELETE entries in output (all DELETE pairs eliminated) + long deleteCount = + allOutputEntries.stream().filter(e -> e.kind() == FileKind.DELETE).count(); + assertThat(deleteCount).as("Sort rewrite should eliminate all DELETE entries").isEqualTo(0); + + // Verify: the deleted ADD entries (A-p2, B-p4, C-p7) are NOT in output + Set outputFileNames = + allOutputEntries.stream().map(e -> e.file().fileName()).collect(Collectors.toSet()); + assertThat(outputFileNames).doesNotContain("A-p2", "B-p4", "C-p7"); + + // Verify: the replacement entries (new-p2, new-p4, new-p7) ARE in output + assertThat(outputFileNames).contains("new-p2", "new-p4", "new-p7"); + + // Verify: all surviving entries match what FileEntry.mergeEntries would produce + assertEquivalentEntries(input, merged); + + // Verify entries within each output manifest are sorted by partition + for (ManifestFileMeta meta : merged) { + List entries = manifestFile.read(meta.fileName(), meta.fileSize()); + for (int i = 1; i < entries.size(); i++) { + int prevPartition = entries.get(i - 1).partition().getInt(0); + int currPartition = entries.get(i).partition().getInt(0); + assertThat(currPartition) + .as("Entries within manifest should be sorted by partition") + .isGreaterThanOrEqualTo(prevPartition); + } + } + } + + /** + * Test manifest sort with a multi-field partition type. + * + *

Setup: partition=(region INT, dt INT, hour INT), sort by dt (field index=1). 9 manifest + * files form 6 overlapping sorted runs by dt range: + * + *

+     *   Run1: 3 files, dt=[0,15],[3,5],[6,8]
+     *   Run2: 2 files, dt=[1,8],[5,7]
+     *   Run3: 1 file,  dt=[0,9]
+     *   Run4: 1 file,  dt=[5,14]
+     *   Run5: 1 file,  dt=[8,15]
+     *   Run6: 1 file,  dt=[4,12]
+     * 
+ * + *

Verifies: 1) no data loss after sort-rewrite, 2) entries within each output manifest are + * sorted by dt. + */ + @Test + public void testManifestSortWithMultiplePartitions() { + // Use a 3-field partition type: (region INT, dt INT, hour INT) + RowType multiPartitionType = RowType.of(new IntType(), new IntType(), new IntType()); + + // Create a dedicated ManifestFile for the 3-field partition type + Path path = new Path(tempDir.toString()); + FileIO fileIO = FileIOFinder.find(path); + ManifestFile multiPartManifestFile = + new ManifestFile.Factory( + fileIO, + new SchemaManager(fileIO, path), + multiPartitionType, + avro, + "zstd", + new FileStorePathFactory( + path, + multiPartitionType, + "default", + CoreOptions.FILE_FORMAT.defaultValue(), + CoreOptions.DATA_FILE_PREFIX.defaultValue(), + CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), + CoreOptions.PARTITION_GENERATE_LEGACY_NAME.defaultValue(), + CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), + CoreOptions.FILE_COMPRESSION.defaultValue(), + null, + null, + CoreOptions.ExternalPathStrategy.NONE, + null, + false, + null), + Long.MAX_VALUE, + null) + .create(); + + List input = new ArrayList<>(); + + // Run1 + input.add( + multiPartManifestFile + .write( + Arrays.asList( + makeMultiPartEntry(true, "r1a-p0", 10, 0, 1), + makeMultiPartEntry(true, "r1a-p1", 20, 1, 2), + makeMultiPartEntry(true, "r1a-p2", 30, 15, 3))) + .get(0)); + input.add( + multiPartManifestFile + .write( + Arrays.asList( + makeMultiPartEntry(true, "r1b-p3", 10, 3, 4), + makeMultiPartEntry(true, "r1b-p4", 20, 4, 5), + makeMultiPartEntry(true, "r1b-p5", 30, 5, 6))) + .get(0)); + input.add( + multiPartManifestFile + .write( + Arrays.asList( + makeMultiPartEntry(true, "r1c-p6", 10, 6, 7), + makeMultiPartEntry(true, "r1c-p7", 20, 7, 8), + makeMultiPartEntry(true, "r1c-p8", 30, 8, 9))) + .get(0)); + + // Run2 + input.add( + multiPartManifestFile + .write( + Arrays.asList( + makeMultiPartEntry(true, "r2a-p1", 5, 1, 10), + makeMultiPartEntry(true, "r2a-p2", 15, 2, 11), + makeMultiPartEntry(true, "r2a-p3", 25, 3, 12), + makeMultiPartEntry(true, "r2a-p4", 35, 8, 13))) + .get(0)); + input.add( + multiPartManifestFile + .write( + Arrays.asList( + makeMultiPartEntry(true, "r2b-p5", 5, 5, 14), + makeMultiPartEntry(true, "r2b-p6", 15, 6, 15), + makeMultiPartEntry(true, "r2b-p7", 25, 7, 16))) + .get(0)); + + // Run3 + List run3Entries = new ArrayList<>(); + for (int p = 0; p <= 9; p++) { + run3Entries.add(makeMultiPartEntry(true, String.format("r3-p%d", p), 99, p, p + 20)); + } + input.add(multiPartManifestFile.write(run3Entries).get(0)); + + // Run4 + input.add( + multiPartManifestFile + .write( + Arrays.asList( + makeMultiPartEntry(true, "r4a-p10", 10, 5, 30), + makeMultiPartEntry(true, "r4a-p11", 20, 11, 31), + makeMultiPartEntry(true, "r4a-p12", 30, 12, 32), + makeMultiPartEntry(true, "r4a-p13", 40, 13, 33), + makeMultiPartEntry(true, "r4a-p14", 50, 14, 34))) + .get(0)); + + // Run5 + input.add( + multiPartManifestFile + .write( + Arrays.asList( + makeMultiPartEntry(true, "r5a-p11", 11, 8, 40), + makeMultiPartEntry(true, "r5a-p12", 21, 12, 41), + makeMultiPartEntry(true, "r5a-p13", 31, 13, 42), + makeMultiPartEntry(true, "r5a-p14", 41, 14, 43), + makeMultiPartEntry(true, "r5a-p15", 51, 15, 44))) + .get(0)); + + // Run6 + input.add( + multiPartManifestFile + .write( + Arrays.asList( + makeMultiPartEntry(true, "r6a-p7", 7, 4, 50), + makeMultiPartEntry(true, "r6a-p8", 17, 8, 51), + makeMultiPartEntry(true, "r6a-p9", 27, 9, 52), + makeMultiPartEntry(true, "r6a-p10", 37, 10, 53), + makeMultiPartEntry(true, "r6a-p11", 47, 11, 54), + makeMultiPartEntry(true, "r6a-p12", 57, 12, 55))) + .get(0)); + + Options testOptions = new Options(); + testOptions.set("manifest-sort.enabled", "true"); + // Sort by the second partition field "f1" (dt) + testOptions.set("manifest-sort.partition-field", "f1"); + List merged = + ManifestFileMerger.merge( + input, + multiPartManifestFile, + multiPartitionType, + CoreOptions.fromMap(testOptions.toMap())); + + // Verify no data loss + List inputEntries = + input.stream() + .flatMap( + f -> + multiPartManifestFile.read(f.fileName(), f.fileSize()) + .stream()) + .collect(Collectors.toList()); + List entryBeforeMerge = + FileEntry.mergeEntries(inputEntries).stream() + .filter(entry -> entry.kind() == FileKind.ADD) + .map(entry -> entry.kind() + "-" + entry.file().fileName()) + .collect(Collectors.toList()); + List entryAfterMerge = new ArrayList<>(); + for (ManifestFileMeta meta : merged) { + for (ManifestEntry entry : + multiPartManifestFile.read(meta.fileName(), meta.fileSize())) { + entryAfterMerge.add(entry.kind() + "-" + entry.file().fileName()); + } + } + assertThat(entryBeforeMerge).hasSameElementsAs(entryAfterMerge); + + // Verify entries within each output manifest are sorted by the second field (dt) + for (ManifestFileMeta meta : merged) { + List entries = + multiPartManifestFile.read(meta.fileName(), meta.fileSize()); + for (int i = 1; i < entries.size(); i++) { + int prevDt = entries.get(i - 1).partition().getInt(1); + int currDt = entries.get(i).partition().getInt(1); + assertThat(currDt) + .as("Entries within manifest should be sorted by partition") + .isGreaterThanOrEqualTo(prevDt); + } + } + } + + /** + * Test that when manifest-sort.max-rewrite-size budget is exceeded in the middle of a section, + * the remaining files are appended to the tail and the final manifest order is preserved. + * + *

Design: + * + *

+     *   - Create a large section with overlapping partition ranges that exceeds the budget
+     *   - Set a small manifest-sort.max-rewrite-size to force budget split
+     *   - Verify that after merge, all manifests are globally sorted by partition field
+     *   - Verify that entries are equivalent (no data loss)
+     * 
+ */ + @Test + public void testManifestSortBudgetSplitPreservesOrder() { + // Create manifests with overlapping ranges, large enough to exceed budget + List input = new ArrayList<>(); + + // Manifest A: partitions [0, 10] - large size + List entriesA = new ArrayList<>(); + for (int p = 0; p <= 10; p++) { + entriesA.add(makeEntry(true, String.format("A-p%d", p), p)); + } + ManifestFileMeta manifestA = makeManifest(entriesA.toArray(new ManifestEntry[0])); + // Manually increase file size to simulate large manifest + input.add( + new ManifestFileMeta( + manifestA.fileName(), + 100, + manifestA.numAddedFiles(), + manifestA.numDeletedFiles(), + manifestA.partitionStats(), + manifestA.schemaId(), + manifestA.minBucket(), + manifestA.maxBucket(), + manifestA.minLevel(), + manifestA.maxLevel(), + manifestA.minRowId(), + manifestA.maxRowId())); + + // Manifest B: partitions [5, 15] - overlaps with A + List entriesB = new ArrayList<>(); + for (int p = 5; p <= 15; p++) { + entriesB.add(makeEntry(true, String.format("B-p%d", p), p)); + } + ManifestFileMeta manifestB = makeManifest(entriesB.toArray(new ManifestEntry[0])); + input.add( + new ManifestFileMeta( + manifestB.fileName(), + 100, + manifestB.numAddedFiles(), + manifestB.numDeletedFiles(), + manifestB.partitionStats(), + manifestB.schemaId(), + manifestB.minBucket(), + manifestB.maxBucket(), + manifestB.minLevel(), + manifestB.maxLevel(), + manifestB.minRowId(), + manifestB.maxRowId())); + + // Manifest C: partitions [10, 20] - overlaps with B + List entriesC = new ArrayList<>(); + for (int p = 10; p <= 20; p++) { + entriesC.add(makeEntry(true, String.format("C-p%d", p), p)); + } + ManifestFileMeta manifestC = makeManifest(entriesC.toArray(new ManifestEntry[0])); + input.add( + new ManifestFileMeta( + manifestC.fileName(), + 100, + manifestC.numAddedFiles(), + manifestC.numDeletedFiles(), + manifestC.partitionStats(), + manifestC.schemaId(), + manifestC.minBucket(), + manifestC.maxBucket(), + manifestC.minLevel(), + manifestC.maxLevel(), + manifestC.minRowId(), + manifestC.maxRowId())); + + // Set small budget to force split + Options testOptions = new Options(); + testOptions.set("manifest-sort.enabled", "true"); + testOptions.set("manifest-sort.max-rewrite-size", "150B"); // Total input size is 300B + + List merged = + ManifestFileMerger.merge( + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); + + // Verify entries are equivalent + assertEquivalentEntries(input, merged); + + // Verify global ordering: all manifests sorted by partition min value + for (int i = 1; i < merged.size(); i++) { + BinaryRow prevMin = merged.get(i - 1).partitionStats().minValues(); + BinaryRow currMin = merged.get(i).partitionStats().minValues(); + assertThat(currMin.getInt(0)) + .as("Manifests should be globally sorted by partition field") + .isGreaterThanOrEqualTo(prevMin.getInt(0)); + } + + // Verify entries within each manifest are sorted + for (ManifestFileMeta meta : merged) { + List entries = manifestFile.read(meta.fileName(), meta.fileSize()); + for (int i = 1; i < entries.size(); i++) { + int prevPartition = entries.get(i - 1).partition().getInt(0); + int currPartition = entries.get(i).partition().getInt(0); + assertThat(currPartition) + .as("Entries within manifest should be sorted by partition") + .isGreaterThanOrEqualTo(prevPartition); + } + } + } + + /** + * Test boundary equality (min == previous.max) handling in both SortedRun construction and + * Section splitting. Boundary-touching files should be allowed in the same SortedRun but may be + * separated into different Sections. + * + *

Design: + * + *

+     *   - Create manifests with boundary-touching partition ranges
+     *   - Manifest A: [0, 5]
+     *   - Manifest B: [5, 10] (min == A.max, boundary touching)
+     *   - Manifest C: [10, 15] (min == B.max, boundary touching)
+     *   - Verify they can be in the same SortedRun (>= comparison)
+     *   - Verify they may be split into different Sections (>= comparison with comment)
+     * 
+ */ + @Test + public void testBoundaryEqualityHandling() { + List input = new ArrayList<>(); + + // Manifest A: partitions [0, 5] + List entriesA = new ArrayList<>(); + for (int p = 0; p <= 5; p++) { + entriesA.add(makeEntry(true, String.format("A-p%d", p), p)); + } + input.add(makeManifest(entriesA.toArray(new ManifestEntry[0]))); + + // Manifest B: partitions [5, 10] - boundary touches A (min == A.max) + List entriesB = new ArrayList<>(); + for (int p = 5; p <= 10; p++) { + entriesB.add(makeEntry(true, String.format("B-p%d", p), p)); + } + input.add(makeManifest(entriesB.toArray(new ManifestEntry[0]))); + + // Manifest C: partitions [10, 15] - boundary touches B (min == B.max) + List entriesC = new ArrayList<>(); + for (int p = 10; p <= 15; p++) { + entriesC.add(makeEntry(true, String.format("C-p%d", p), p)); + } + input.add(makeManifest(entriesC.toArray(new ManifestEntry[0]))); + + Options testOptions = new Options(); + testOptions.set("manifest-sort.enabled", "true"); + + List merged = + ManifestFileMerger.merge( + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); + + // Verify entries are equivalent + assertEquivalentEntries(input, merged); + + // Verify all manifests maintain global sort order + for (int i = 1; i < merged.size(); i++) { + BinaryRow prevMin = merged.get(i - 1).partitionStats().minValues(); + BinaryRow prevMax = merged.get(i - 1).partitionStats().maxValues(); + BinaryRow currMin = merged.get(i).partitionStats().minValues(); + + // Boundary-touching is allowed: currMin >= prevMin + assertThat(currMin.getInt(0)) + .as("Global order should be maintained with boundary-touching allowed") + .isGreaterThanOrEqualTo(prevMin.getInt(0)); + + // Log boundary equality cases for documentation + if (currMin.getInt(0) == prevMax.getInt(0)) { + System.out.println( + String.format( + "Boundary equality detected: manifest[%d].min=%d == manifest[%d].max=%d", + i, currMin.getInt(0), i - 1, prevMax.getInt(0))); + } + } + + // Verify entries within each manifest are sorted + for (ManifestFileMeta meta : merged) { + List entries = manifestFile.read(meta.fileName(), meta.fileSize()); + for (int i = 1; i < entries.size(); i++) { + int prevPartition = entries.get(i - 1).partition().getInt(0); + int currPartition = entries.get(i).partition().getInt(0); + assertThat(currPartition) + .as("Entries within manifest should be sorted by partition") + .isGreaterThanOrEqualTo(prevPartition); + } + } + } + + /** Create a ManifestEntry with a 3-field partition row (region, dt, hour). */ + private ManifestEntry makeMultiPartEntry( + boolean isAdd, String fileName, int region, int dt, int hour) { + BinaryRow binaryRow = new BinaryRow(3); + BinaryRowWriter writer = new BinaryRowWriter(binaryRow); + writer.writeInt(0, region); + writer.writeInt(1, dt); + writer.writeInt(2, hour); + writer.complete(); + + return ManifestEntry.create( + isAdd ? FileKind.ADD : FileKind.DELETE, + binaryRow, + 0, + 0, + DataFileMeta.create( + fileName, + 0, + 0, + binaryRow, + binaryRow, + StatsTestUtils.newEmptySimpleStats(), + StatsTestUtils.newEmptySimpleStats(), + 0, + 0, + 0, + 0, + Collections.emptyList(), + Timestamp.fromEpochMillis(200000), + 0L, + null, + FileSource.APPEND, + null, + null, + null, + null)); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java index 591b3206518d..66465f1e7531 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java @@ -18,7 +18,9 @@ package org.apache.paimon.manifest; +import org.apache.paimon.CoreOptions; import org.apache.paimon.operation.ManifestFileMerger; +import org.apache.paimon.options.Options; import org.apache.paimon.types.RowType; import org.junit.jupiter.api.BeforeEach; @@ -49,9 +51,16 @@ public void testMerge() { List input = createBaseManifestFileMetas(false); addDeltaManifests(input, false); + Options testOptions = new Options(); + testOptions.set("manifest.target-file-size", "500B"); + testOptions.set("manifest.merge-min-count", "3"); + testOptions.set("manifest.full-compaction-threshold-size", "200B"); List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); assertEquivalentEntries(input, merged); // the first one is not deleted, it should not be merged @@ -89,9 +98,16 @@ public void testMergeFullCompactionWithoutDeleteFile() { input.add(makeManifest(makeEntry(true, "F", null))); input.add(makeManifest(makeEntry(true, "G", null))); + Options testOptions = new Options(); + testOptions.set("manifest.target-file-size", threshold + "B"); + testOptions.set("manifest.merge-min-count", "3"); + testOptions.set("manifest.full-compaction-threshold-size", "200B"); List merged = ManifestFileMerger.merge( - input, manifestFile, threshold, 3, 200, getPartitionType(), null); + input, + manifestFile, + getPartitionType(), + CoreOptions.fromMap(testOptions.toMap())); assertEquivalentEntries( input.stream() .filter(f -> !baseFiles.contains(f.fileName())) diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 14652ec883c2..beb4bfd37680 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -511,6 +511,68 @@ public void testFileFormatPerLevelAcceptsCompatibleSchema() { new TableSchema(1, fields, 10, emptyList(), singletonList("k"), options, "")); } + @Test + void testManifestSortValidation() { + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.INT())); + + // Test 1: manifest-sort.enabled on non-partition table should fail + Map options1 = new HashMap<>(); + options1.put(CoreOptions.MANIFEST_SORT_ENABLED.key(), "true"); + options1.put(BUCKET.key(), String.valueOf(-1)); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options1, + ""))) + .hasMessageContaining( + "Cannot enable 'manifest-sort.enabled' for non-partition table."); + + // Test 2: manifest-sort-partition-field not in partition keys should fail + Map options2 = new HashMap<>(); + options2.put(CoreOptions.MANIFEST_SORT_ENABLED.key(), "true"); + options2.put(CoreOptions.MANIFEST_SORT_PARTITION_FIELD.key(), "f1"); + options2.put(BUCKET.key(), String.valueOf(-1)); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + singletonList("f0"), + emptyList(), + options2, + ""))) + .hasMessageContaining("is not a partition field"); + + // Test 3: valid manifest-sort config should pass + Map options3 = new HashMap<>(); + options3.put(CoreOptions.MANIFEST_SORT_ENABLED.key(), "true"); + options3.put(CoreOptions.MANIFEST_SORT_PARTITION_FIELD.key(), "f0"); + options3.put(BUCKET.key(), String.valueOf(-1)); + assertThatNoException() + .isThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + singletonList("f0"), + emptyList(), + options3, + ""))); + } + @Test public void testMergeOnReadCoexistsWithVisibilityCallback() { Map options = new HashMap<>();