From 293868e5805ece8d98ba2c1a847d5d1f8314a64c Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 7 Jun 2026 18:06:28 +0800 Subject: [PATCH 1/4] feat(arrow): resolve S3-compatible schemes in ArrowFileSystemFileIO --- src/iceberg/arrow/arrow_io.cc | 11 ++++++++--- src/iceberg/test/arrow_io_test.cc | 8 ++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc index 45ad4259e..8da5edeeb 100644 --- a/src/iceberg/arrow/arrow_io.cc +++ b/src/iceberg/arrow/arrow_io.cc @@ -473,9 +473,14 @@ class ArrowOutputFile : public OutputFile { } // namespace Result ArrowFileSystemFileIO::ResolvePath(const std::string& file_location) { - if (file_location.find("://") != std::string::npos) { - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path, arrow_fs_->PathFromUri(file_location)); - return path; + if (auto pos = file_location.find("://"); pos != std::string::npos) { + auto path = arrow_fs_->PathFromUri(file_location); + if (path.ok()) { + return path.ValueOrDie(); + } + // PathFromUri rejects S3-compatible schemes (s3a/s3n, gs://, oss://); + // fall back to the scheme-less bucket/key. + return file_location.substr(pos + 3); } return file_location; } diff --git a/src/iceberg/test/arrow_io_test.cc b/src/iceberg/test/arrow_io_test.cc index 7edaf0756..ad0bd93f6 100644 --- a/src/iceberg/test/arrow_io_test.cc +++ b/src/iceberg/test/arrow_io_test.cc @@ -405,6 +405,14 @@ TEST_F(LocalFileIOTest, StdReadKeepsPositionAvailableAtEof) { EXPECT_THAT(stream->Position(), HasValue(::testing::Eq(3))); } +TEST_F(LocalFileIOTest, ResolvesForeignSchemeToUnderlyingPath) { + ASSERT_THAT(file_io_->WriteFile(temp_filepath_, "hello world"), IsOk()); + + auto read_res = file_io_->ReadFile("x-store://" + temp_filepath_, std::nullopt); + EXPECT_THAT(read_res, IsOk()); + EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world"))); +} + TEST(FileIOAdapterTest, InputAdapterRejectsReadsAfterClose) { auto state = std::make_shared(); state->data = "abc"; From 5c7656bf24582df2b1e6140efe37fb0e939becfb Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Mon, 8 Jun 2026 15:55:03 +0800 Subject: [PATCH 2/4] address review: scheme-mismatch-only fallback, strip query/fragment, accept s3a/s3n --- src/iceberg/arrow/arrow_io.cc | 28 ++++++++++++++++-------- src/iceberg/catalog/rest/rest_file_io.cc | 2 +- src/iceberg/test/arrow_io_test.cc | 27 ++++++++++++++++++++++- src/iceberg/test/rest_file_io_test.cc | 4 ++++ 4 files changed, 50 insertions(+), 11 deletions(-) diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc index 8da5edeeb..fb76a92ac 100644 --- a/src/iceberg/arrow/arrow_io.cc +++ b/src/iceberg/arrow/arrow_io.cc @@ -473,16 +473,26 @@ class ArrowOutputFile : public OutputFile { } // namespace Result ArrowFileSystemFileIO::ResolvePath(const std::string& file_location) { - if (auto pos = file_location.find("://"); pos != std::string::npos) { - auto path = arrow_fs_->PathFromUri(file_location); - if (path.ok()) { - return path.ValueOrDie(); - } - // PathFromUri rejects S3-compatible schemes (s3a/s3n, gs://, oss://); - // fall back to the scheme-less bucket/key. - return file_location.substr(pos + 3); + const auto pos = file_location.find("://"); + if (pos == std::string::npos) { + return file_location; + } + + auto path = arrow_fs_->PathFromUri(file_location); + if (path.ok()) { + return std::move(path).ValueOrDie(); + } + + // Only fall back for Arrow's scheme-mismatch error; propagate anything else. + const auto& status = path.status(); + if (status.message().find("expected a URI with one of the schemes") == + std::string::npos) { + return std::unexpected{ + {.kind = ToErrorKind(status), .message = status.ToString()}}; } - return file_location; + // Scheme-less bucket/key, dropping any ?query / #fragment. + std::string bucket_key = file_location.substr(pos + 3); + return bucket_key.substr(0, bucket_key.find_first_of("?#")); } Result> OpenArrowInputStream( diff --git a/src/iceberg/catalog/rest/rest_file_io.cc b/src/iceberg/catalog/rest/rest_file_io.cc index f08a03353..5fadca1ac 100644 --- a/src/iceberg/catalog/rest/rest_file_io.cc +++ b/src/iceberg/catalog/rest/rest_file_io.cc @@ -45,7 +45,7 @@ Result DetectBuiltinFileIO(std::string_view location) { if (scheme == "file") { return BuiltinFileIOKind::kArrowLocal; } - if (scheme == "s3") { + if (scheme == "s3" || scheme == "s3a" || scheme == "s3n") { return BuiltinFileIOKind::kArrowS3; } diff --git a/src/iceberg/test/arrow_io_test.cc b/src/iceberg/test/arrow_io_test.cc index ad0bd93f6..751404f3f 100644 --- a/src/iceberg/test/arrow_io_test.cc +++ b/src/iceberg/test/arrow_io_test.cc @@ -37,6 +37,11 @@ namespace iceberg { namespace { +std::string ForeignSchemeUri(std::string local_path) { + std::ranges::replace(local_path, '\\', '/'); + return "x-store://" + local_path; +} + struct CloseState { bool closed = false; }; @@ -408,9 +413,29 @@ TEST_F(LocalFileIOTest, StdReadKeepsPositionAvailableAtEof) { TEST_F(LocalFileIOTest, ResolvesForeignSchemeToUnderlyingPath) { ASSERT_THAT(file_io_->WriteFile(temp_filepath_, "hello world"), IsOk()); - auto read_res = file_io_->ReadFile("x-store://" + temp_filepath_, std::nullopt); + auto read_res = file_io_->ReadFile(ForeignSchemeUri(temp_filepath_), std::nullopt); EXPECT_THAT(read_res, IsOk()); EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world"))); + + auto with_query = file_io_->ReadFile(ForeignSchemeUri(temp_filepath_) + "?versionId=42", + std::nullopt); + EXPECT_THAT(with_query, IsOk()); + EXPECT_THAT(with_query, HasValue(::testing::Eq("hello world"))); +} + +TEST_F(LocalFileIOTest, PropagatesNonSchemeMismatchUriError) { + auto read_res = file_io_->ReadFile("file:///tmp/%ZZ", std::nullopt); + EXPECT_THAT(read_res, IsError(ErrorKind::kUnknownError)); + EXPECT_THAT(read_res, HasErrorMessage("Cannot parse URI")); +} + +TEST_F(LocalFileIOTest, FallbackPreservesPercentEncodingInKey) { + std::string encoded_path = temp_filepath_ + "%20x"; + ASSERT_THAT(file_io_->WriteFile(encoded_path, "raw"), IsOk()); + + auto read_res = file_io_->ReadFile(ForeignSchemeUri(encoded_path), std::nullopt); + EXPECT_THAT(read_res, IsOk()); + EXPECT_THAT(read_res, HasValue(::testing::Eq("raw"))); } TEST(FileIOAdapterTest, InputAdapterRejectsReadsAfterClose) { diff --git a/src/iceberg/test/rest_file_io_test.cc b/src/iceberg/test/rest_file_io_test.cc index c86df2753..b1193d9f8 100644 --- a/src/iceberg/test/rest_file_io_test.cc +++ b/src/iceberg/test/rest_file_io_test.cc @@ -49,6 +49,10 @@ class MockFileIO : public FileIO { TEST(RestFileIOTest, DetectBuiltinKindFromScheme) { EXPECT_THAT(DetectBuiltinFileIO("s3://bucket/path"), HasValue(::testing::Eq(BuiltinFileIOKind::kArrowS3))); + EXPECT_THAT(DetectBuiltinFileIO("s3a://bucket/path"), + HasValue(::testing::Eq(BuiltinFileIOKind::kArrowS3))); + EXPECT_THAT(DetectBuiltinFileIO("s3n://bucket/path"), + HasValue(::testing::Eq(BuiltinFileIOKind::kArrowS3))); EXPECT_THAT(DetectBuiltinFileIO("/tmp/warehouse"), HasValue(::testing::Eq(BuiltinFileIOKind::kArrowLocal))); EXPECT_THAT(DetectBuiltinFileIO("file:///tmp/warehouse"), From ef6c505d6c8efd9cc3469bc70f11c34f758a08e3 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Mon, 8 Jun 2026 21:31:17 +0800 Subject: [PATCH 3/4] address review: percent-decode s3a/s3n fallback to match native s3 parsing --- src/iceberg/arrow/arrow_io.cc | 6 ++++-- src/iceberg/test/arrow_io_test.cc | 9 +++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc index fb76a92ac..2dd1b701a 100644 --- a/src/iceberg/arrow/arrow_io.cc +++ b/src/iceberg/arrow/arrow_io.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_io_util.h" @@ -490,9 +491,10 @@ Result ArrowFileSystemFileIO::ResolvePath(const std::string& file_l return std::unexpected{ {.kind = ToErrorKind(status), .message = status.ToString()}}; } - // Scheme-less bucket/key, dropping any ?query / #fragment. + // Scheme-less bucket/key: drop ?query / #fragment and percent-decode to match s3://. std::string bucket_key = file_location.substr(pos + 3); - return bucket_key.substr(0, bucket_key.find_first_of("?#")); + bucket_key = bucket_key.substr(0, bucket_key.find_first_of("?#")); + return ::arrow::util::UriUnescape(bucket_key); } Result> OpenArrowInputStream( diff --git a/src/iceberg/test/arrow_io_test.cc b/src/iceberg/test/arrow_io_test.cc index 751404f3f..4ac83469c 100644 --- a/src/iceberg/test/arrow_io_test.cc +++ b/src/iceberg/test/arrow_io_test.cc @@ -429,11 +429,12 @@ TEST_F(LocalFileIOTest, PropagatesNonSchemeMismatchUriError) { EXPECT_THAT(read_res, HasErrorMessage("Cannot parse URI")); } -TEST_F(LocalFileIOTest, FallbackPreservesPercentEncodingInKey) { - std::string encoded_path = temp_filepath_ + "%20x"; - ASSERT_THAT(file_io_->WriteFile(encoded_path, "raw"), IsOk()); +TEST_F(LocalFileIOTest, FallbackDecodesPercentEncodingInKey) { + std::string decoded_path = temp_filepath_ + " x"; + ASSERT_THAT(file_io_->WriteFile(decoded_path, "raw"), IsOk()); - auto read_res = file_io_->ReadFile(ForeignSchemeUri(encoded_path), std::nullopt); + auto read_res = + file_io_->ReadFile(ForeignSchemeUri(temp_filepath_ + "%20x"), std::nullopt); EXPECT_THAT(read_res, IsOk()); EXPECT_THAT(read_res, HasValue(::testing::Eq("raw"))); } From 277d8ba037b5e5437d7f90922e9fc4b95350948f Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Tue, 9 Jun 2026 09:50:16 +0800 Subject: [PATCH 4/4] address review: parse URI via arrow::util::Uri instead of matching error text --- src/iceberg/arrow/arrow_io.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc index 2dd1b701a..6b159ca89 100644 --- a/src/iceberg/arrow/arrow_io.cc +++ b/src/iceberg/arrow/arrow_io.cc @@ -484,14 +484,13 @@ Result ArrowFileSystemFileIO::ResolvePath(const std::string& file_l return std::move(path).ValueOrDie(); } - // Only fall back for Arrow's scheme-mismatch error; propagate anything else. - const auto& status = path.status(); - if (status.message().find("expected a URI with one of the schemes") == - std::string::npos) { + // Foreign alias (s3a/s3n): validate via Arrow's parser, then percent-decode the + // scheme-less key (substring keeps a Windows drive letter's ':' that host() drops). + if (auto parsed = ::arrow::util::Uri::FromString(file_location); !parsed.ok()) { + const auto& status = parsed.status(); return std::unexpected{ {.kind = ToErrorKind(status), .message = status.ToString()}}; } - // Scheme-less bucket/key: drop ?query / #fragment and percent-decode to match s3://. std::string bucket_key = file_location.substr(pos + 3); bucket_key = bucket_key.substr(0, bucket_key.find_first_of("?#")); return ::arrow::util::UriUnescape(bucket_key);