Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/iceberg/test/expire_snapshots_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "iceberg/update/expire_snapshots.h"

#include <functional>
#include <mutex>
#include <optional>
#include <string>
#include <vector>
Expand All @@ -33,6 +35,7 @@
#include "iceberg/snapshot.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table_metadata.h"
#include "iceberg/test/executor.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/update_test_base.h"

Expand Down Expand Up @@ -444,6 +447,45 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) {
expired_manifest_list_path));
}

TEST_F(ExpireSnapshotsCleanupTest, ExecutorDispatchesDeletesConcurrently) {
const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet";
const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro";
const auto expired_manifest_list_path =
table_location_ + "/metadata/expired-manifest-list.avro";
const auto current_manifest_list_path =
table_location_ + "/metadata/current-manifest-list.avro";

auto expired_data_manifest = WriteDataManifest(
expired_data_manifest_path, kExpiredSnapshotId,
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
MakeDataFile(expired_data_file_path))});
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
/*parent_snapshot_id=*/0, kExpiredSequenceNumber,
{expired_data_manifest});
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
kCurrentSequenceNumber, {});
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);

test::ThreadExecutor executor;
std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;

ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->ExpireSnapshotId(kExpiredSnapshotId);
update->ExecuteDeleteWith(std::ref(executor));
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
expired_data_manifest_path,
expired_manifest_list_path));
// One submission per file: the executor saw real work.
EXPECT_EQ(executor.submit_count(), 3);
}

TEST_F(ExpireSnapshotsCleanupTest, MetadataOnlySkipsDataDeletion) {
const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet";
const auto expired_delete_manifest_path =
Expand Down
82 changes: 67 additions & 15 deletions src/iceberg/update/expire_snapshots.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@
#include "iceberg/table_metadata.h"
#include "iceberg/transaction.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/executor.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/retry_util.h"
#include "iceberg/util/snapshot_util_internal.h"
#include "iceberg/util/string_util.h"
#include "iceberg/util/task_group.h"

namespace iceberg {

Expand All @@ -65,8 +68,11 @@ Result<std::unique_ptr<ManifestReader>> MakeManifestReader(
class FileCleanupStrategy {
public:
FileCleanupStrategy(std::shared_ptr<FileIO> file_io,
std::function<void(const std::string&)> delete_func)
: file_io_(std::move(file_io)), delete_func_(std::move(delete_func)) {}
std::function<void(const std::string&)> delete_func,
OptionalExecutor executor)
: file_io_(std::move(file_io)),
delete_func_(std::move(delete_func)),
executor_(std::move(executor)) {}

virtual ~FileCleanupStrategy() = default;

Expand Down Expand Up @@ -99,19 +105,48 @@ class FileCleanupStrategy {
}

/// \brief Delete files at the given locations.
///
/// Best-effort: errors are suppressed to mirror Java's suppressFailureWhenFinished.
/// When a custom delete function was provided, deletes are invoked one path at a time,
/// optionally parallelized via the strategy's executor. Otherwise the FileIO bulk
/// `DeleteFiles` API is invoked once with a bounded retry that stops on kNotFound.
void DeleteFiles(const std::unordered_set<std::string>& paths) {
try {
if (delete_func_) {
for (const auto& path : paths) {
if (paths.empty()) return;
std::vector<std::string> path_list(paths.begin(), paths.end());

if (!delete_func_) {
// Bulk path: rely on FileIO::DeleteFiles. The tight retry helps atomic-bulk
// implementations (e.g. an S3 DeleteObjects-backed FileIO) ride out a
// transient throttle or network blip on the single round trip.
//
// Caveat: for the default fail-fast iterative impl, a retried attempt
// re-submits the full path_list, so files already deleted on a prior
// attempt come back as kNotFound and short-circuit the retry (kNotFound is
// in StopRetryOn). That is best-effort cleanup -- still no worse than the
// prior behaviour of a single un-retried call -- and we discard the final
// Status to match Java's suppressFailureWhenFinished semantics.
RetryRunner<retry::StopRetryOn<ErrorKind::kNotFound>> runner(kDeleteRetryConfig);
std::ignore = runner.Run([&]() { return file_io_->DeleteFiles(path_list); });
return;
}

// Custom callback path: invoke one path at a time, optionally on a worker thread
// pulled from the configured executor. Without an executor TaskGroup runs the
// callbacks synchronously on the calling thread.
TaskGroup<> group;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use TaskGroup<retry::StopRetryOn<ErrorKind::kNotFound>> group(kDeleteRetryConfig) to simplify code.

group.SetExecutor(executor_);
for (auto& path : path_list) {
group.Submit([this, path = std::move(path)]() -> Status {
try {
delete_func_(path);
} catch (...) {
// Suppress all exceptions during file cleanup to match Java's
// suppressFailureWhenFinished behavior.
}
} else {
std::vector<std::string> path_list(paths.begin(), paths.end());
std::ignore = file_io_->DeleteFiles(path_list);
}
} catch (...) {
// TODO(shangxinli): add retry
return {};
});
}
std::ignore = std::move(group).Run();
}

bool HasAnyStatisticsFiles(const TableMetadata& metadata) const {
Expand Down Expand Up @@ -153,6 +188,18 @@ class FileCleanupStrategy {

std::shared_ptr<FileIO> file_io_;
std::function<void(const std::string&)> delete_func_;
OptionalExecutor executor_;

private:
/// Retry budget for the FileIO bulk `DeleteFiles` path. Tight on purpose: file
/// cleanup is best-effort and runs after a successful commit, so we'd rather give
/// up than block the caller for minutes on a flaky storage layer.
static constexpr RetryConfig kDeleteRetryConfig{
.num_retries = 2,
.min_wait_ms = 100,
.max_wait_ms = 1000,
.total_timeout_ms = 5000,
};
};

/// \brief File cleanup strategy that determines safe deletions via full reachability.
Expand All @@ -161,7 +208,7 @@ class FileCleanupStrategy {
/// still referenced by retained snapshots, then deletes orphaned manifests, data
/// files, and manifest lists.
///
/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support.
/// TODO(shangxinli): Add multi-threaded manifest reading support.
class ReachableFileCleanup : public FileCleanupStrategy {
public:
using FileCleanupStrategy::FileCleanupStrategy;
Expand Down Expand Up @@ -366,7 +413,7 @@ class ReachableFileCleanup : public FileCleanupStrategy {
/// logically introduced by a snapshot whose changes are still present in the
/// current state under a different id.
///
/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support.
/// TODO(shangxinli): Add multi-threaded manifest reading support.
class IncrementalFileCleanup : public FileCleanupStrategy {
public:
using FileCleanupStrategy::FileCleanupStrategy;
Expand Down Expand Up @@ -703,6 +750,11 @@ ExpireSnapshots& ExpireSnapshots::CleanExpiredMetadata(bool clean) {
return *this;
}

ExpireSnapshots& ExpireSnapshots::ExecuteDeleteWith(OptionalExecutor executor) {
executor_ = std::move(executor);
return *this;
}

Result<std::unordered_set<int64_t>> ExpireSnapshots::ComputeBranchSnapshotsToRetain(
int64_t snapshot_id, TimePointMs expire_snapshot_older_than,
int32_t min_snapshots_to_keep) const {
Expand Down Expand Up @@ -934,11 +986,11 @@ Status ExpireSnapshots::Finalize(Result<const TableMetadata*> commit_result) {
!HasNonMainSnapshots(metadata_after_expiration);

if (can_use_incremental) {
return IncrementalFileCleanup(ctx_->table->io(), delete_func_)
return IncrementalFileCleanup(ctx_->table->io(), delete_func_, executor_)
.CleanFiles(metadata_before_expiration, metadata_after_expiration,
cleanup_level_);
}
return ReachableFileCleanup(ctx_->table->io(), delete_func_)
return ReachableFileCleanup(ctx_->table->io(), delete_func_, executor_)
.CleanFiles(metadata_before_expiration, metadata_after_expiration, cleanup_level_);
}

Expand Down
19 changes: 19 additions & 0 deletions src/iceberg/update/expire_snapshots.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/update/pending_update.h"
#include "iceberg/util/executor.h"
#include "iceberg/util/timepoint.h"

/// \file iceberg/update/expire_snapshots.h
Expand Down Expand Up @@ -115,6 +116,9 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
/// If this method is not called, unnecessary manifests and data files will still be
/// deleted.
///
/// \note When an executor is configured via ExecuteDeleteWith(), this callback may
/// be invoked concurrently from worker threads; implementations must be thread-safe.
///
/// \param delete_func A function that will be called to delete manifests and data files
/// \return Reference to this for method chaining.
ExpireSnapshots& DeleteWith(std::function<void(const std::string&)> delete_func);
Expand All @@ -140,6 +144,20 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
/// \return Reference to this for method chaining.
ExpireSnapshots& CleanExpiredMetadata(bool clean);

/// \brief Configure an executor used to parallelize best-effort file deletion.
///
/// Only meaningful in combination with DeleteWith(): when both are set the custom
/// delete callback is invoked concurrently for each path through the supplied
/// executor. Without DeleteWith(), file deletion uses FileIO's bulk DeleteFiles
/// API and the executor is unused. The caller retains ownership and must keep the
/// executor alive until Finalize() returns.
///
/// Named after Java's `ExpireSnapshots.executeDeleteWith(ExecutorService)`.
///
/// \param executor An executor reference, or std::nullopt for serial deletion.
/// \return Reference to this for method chaining.
ExpireSnapshots& ExecuteDeleteWith(OptionalExecutor executor);

Kind kind() const final { return Kind::kExpireSnapshots; }
bool IsRetryable() const override { return true; }

Expand Down Expand Up @@ -184,6 +202,7 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
enum CleanupLevel cleanup_level_ { CleanupLevel::kAll };
bool clean_expired_metadata_{false};
bool specified_snapshot_id_{false};
OptionalExecutor executor_;

/// Cached result from Apply(), consumed by Finalize() and cleared after use.
std::optional<ApplyResult> apply_result_;
Expand Down
Loading