Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions src/iceberg/arrow/arrow_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <arrow/io/interfaces.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/util/uri.h>

#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_io_util.h"
Expand Down Expand Up @@ -473,11 +474,26 @@ class ArrowOutputFile : public OutputFile {
} // namespace

Result<std::string> ArrowFileSystemFileIO::ResolvePath(const std::string& file_location) {
if (file_location.find("://") != std::string::npos) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path, arrow_fs_->PathFromUri(file_location));
return path;
const auto pos = file_location.find("://");
if (pos == std::string::npos) {
return file_location;
}
return file_location;

auto path = arrow_fs_->PathFromUri(file_location);
if (path.ok()) {
return std::move(path).ValueOrDie();
}

// Foreign alias (s3a/s3n): validate via Arrow's parser, then percent-decode the
// scheme-less key (substring keeps a Windows drive letter's ':' that host() drops).
if (auto parsed = ::arrow::util::Uri::FromString(file_location); !parsed.ok()) {
const auto& status = parsed.status();
return std::unexpected<Error>{
{.kind = ToErrorKind(status), .message = status.ToString()}};
}
std::string bucket_key = file_location.substr(pos + 3);
bucket_key = bucket_key.substr(0, bucket_key.find_first_of("?#"));
return ::arrow::util::UriUnescape(bucket_key);
}

Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenArrowInputStream(
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/catalog/rest/rest_file_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Result<BuiltinFileIOKind> DetectBuiltinFileIO(std::string_view location) {
if (scheme == "file") {
return BuiltinFileIOKind::kArrowLocal;
}
if (scheme == "s3") {
if (scheme == "s3" || scheme == "s3a" || scheme == "s3n") {
return BuiltinFileIOKind::kArrowS3;
}

Expand Down
34 changes: 34 additions & 0 deletions src/iceberg/test/arrow_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ namespace iceberg {

namespace {

std::string ForeignSchemeUri(std::string local_path) {
std::ranges::replace(local_path, '\\', '/');
return "x-store://" + local_path;
}

struct CloseState {
bool closed = false;
};
Expand Down Expand Up @@ -405,6 +410,35 @@ TEST_F(LocalFileIOTest, StdReadKeepsPositionAvailableAtEof) {
EXPECT_THAT(stream->Position(), HasValue(::testing::Eq(3)));
}

TEST_F(LocalFileIOTest, ResolvesForeignSchemeToUnderlyingPath) {
ASSERT_THAT(file_io_->WriteFile(temp_filepath_, "hello world"), IsOk());

auto read_res = file_io_->ReadFile(ForeignSchemeUri(temp_filepath_), std::nullopt);
EXPECT_THAT(read_res, IsOk());
EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));

auto with_query = file_io_->ReadFile(ForeignSchemeUri(temp_filepath_) + "?versionId=42",
std::nullopt);
EXPECT_THAT(with_query, IsOk());
EXPECT_THAT(with_query, HasValue(::testing::Eq("hello world")));
}

TEST_F(LocalFileIOTest, PropagatesNonSchemeMismatchUriError) {
auto read_res = file_io_->ReadFile("file:///tmp/%ZZ", std::nullopt);
EXPECT_THAT(read_res, IsError(ErrorKind::kUnknownError));
EXPECT_THAT(read_res, HasErrorMessage("Cannot parse URI"));
}

TEST_F(LocalFileIOTest, FallbackDecodesPercentEncodingInKey) {
std::string decoded_path = temp_filepath_ + " x";
ASSERT_THAT(file_io_->WriteFile(decoded_path, "raw"), IsOk());

auto read_res =
file_io_->ReadFile(ForeignSchemeUri(temp_filepath_ + "%20x"), std::nullopt);
EXPECT_THAT(read_res, IsOk());
EXPECT_THAT(read_res, HasValue(::testing::Eq("raw")));
}

TEST(FileIOAdapterTest, InputAdapterRejectsReadsAfterClose) {
auto state = std::make_shared<PermissiveReadState>();
state->data = "abc";
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/test/rest_file_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class MockFileIO : public FileIO {
TEST(RestFileIOTest, DetectBuiltinKindFromScheme) {
EXPECT_THAT(DetectBuiltinFileIO("s3://bucket/path"),
HasValue(::testing::Eq(BuiltinFileIOKind::kArrowS3)));
EXPECT_THAT(DetectBuiltinFileIO("s3a://bucket/path"),
HasValue(::testing::Eq(BuiltinFileIOKind::kArrowS3)));
EXPECT_THAT(DetectBuiltinFileIO("s3n://bucket/path"),
HasValue(::testing::Eq(BuiltinFileIOKind::kArrowS3)));
EXPECT_THAT(DetectBuiltinFileIO("/tmp/warehouse"),
HasValue(::testing::Eq(BuiltinFileIOKind::kArrowLocal)));
EXPECT_THAT(DetectBuiltinFileIO("file:///tmp/warehouse"),
Expand Down
Loading