diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index 6c166df53..bc7572591 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -861,7 +861,7 @@ Result> ManifestReaderImpl::LiveEntries() { } Result> ManifestReaderImpl::ReadEntries(bool only_live) { - ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->PartitionType(*schema_)); + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->RawPartitionType(*schema_)); auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema(); std::shared_ptr projected_data_file_schema; diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index c00eab7d2..ed5aa831a 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -74,24 +74,16 @@ Result> PartitionSpec::PartitionType( std::vector partition_fields; for (const auto& partition_field : fields_) { - // Get the source field from the original schema by source_id ICEBERG_ASSIGN_OR_RAISE(auto source_field, schema.FindFieldById(partition_field.source_id())); - if (!source_field.has_value()) { - // TODO(xiao.dong) when source field is missing, - // should return an error or just use UNKNOWN type - return InvalidSchema("Cannot find source field for partition field:{}", - partition_field.field_id()); + std::shared_ptr result_type; + if (source_field.has_value()) { + auto source_field_type = source_field.value().get().type(); + result_type = partition_field.transform()->ResultType(std::move(source_field_type)); + } else { + result_type = unknown(); } - auto source_field_type = source_field.value().get().type(); - // Bind the transform to the source field type to get the result type - ICEBERG_ASSIGN_OR_RAISE(auto transform_function, - partition_field.transform()->Bind(source_field_type)); - - auto result_type = transform_function->ResultType(); - // Create the partition field with the transform result type - // Partition fields are always optional (can be null) partition_fields.emplace_back(partition_field.field_id(), std::string(partition_field.name()), std::move(result_type), @@ -101,6 +93,29 @@ Result> PartitionSpec::PartitionType( return std::make_unique(std::move(partition_fields)); } +Result> PartitionSpec::RawPartitionType( + const Schema& schema) const { + const auto& ids_to_original = schema.IdsToOriginal(); + if (ids_to_original.empty()) { + return PartitionType(schema); + } + + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, PartitionType(schema)); + std::vector raw_partition_fields; + raw_partition_fields.reserve(partition_type->fields().size()); + for (const auto& field : partition_type->fields()) { + auto original_id = ids_to_original.find(field.field_id()); + if (original_id == ids_to_original.end()) { + return InvalidSchema("Cannot find original field ID for reassigned field ID: {}", + field.field_id()); + } + raw_partition_fields.emplace_back(original_id->second, std::string(field.name()), + field.type(), field.optional(), + std::string(field.doc())); + } + return std::make_unique(std::move(raw_partition_fields)); +} + Result PartitionSpec::PartitionPath(const PartitionValues& data) const { ICEBERG_PRECHECK(fields_.size() == data.num_fields(), "Partition spec and data mismatch, expected field num {}, got {}", diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index 0fb8814b8..2ed0ddc8a 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -64,6 +64,9 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \brief Get the partition type binding to the input schema. Result> PartitionType(const Schema& schema) const; + /// \brief Get the partition type as physically written in manifest files. + Result> RawPartitionType(const Schema& schema) const; + /// \brief Get the partition path for the given partition data. Result PartitionPath(const PartitionValues& data) const; diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index 5e60b551f..fcac43c78 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -35,8 +35,103 @@ namespace iceberg { +struct SchemaReassignIdContext { + Schema::IdMap ids_to_reassigned; + Schema::IdMap ids_to_original; +}; + namespace { +const Schema::IdMap& EmptyIdMap() { + static const Schema::IdMap kEmpty; + return kEmpty; +} + +void RecordIdReassignment(int32_t old_id, int32_t new_id, + Schema::IdMap& ids_to_reassigned, + Schema::IdMap& ids_to_original) { + if (new_id != old_id) { + ids_to_reassigned[old_id] = new_id; + ids_to_original[new_id] = old_id; + } +} + +SchemaField ReassignField(const SchemaField& field, int32_t new_id, + const Schema::GetId& get_id, Schema::IdMap& ids_to_reassigned, + Schema::IdMap& ids_to_original); + +std::shared_ptr ReassignTypeIds(const std::shared_ptr& type, + const Schema::GetId& get_id, + Schema::IdMap& ids_to_reassigned, + Schema::IdMap& ids_to_original) { + switch (type->type_id()) { + case TypeId::kStruct: { + const auto& struct_type = static_cast(*type); + const auto& fields = struct_type.fields(); + std::vector new_ids; + new_ids.reserve(fields.size()); + for (const auto& field : fields) { + const auto new_id = get_id(field.field_id()); + RecordIdReassignment(field.field_id(), new_id, ids_to_reassigned, + ids_to_original); + new_ids.push_back(new_id); + } + + std::vector reassigned_fields; + reassigned_fields.reserve(fields.size()); + for (size_t i = 0; i < fields.size(); ++i) { + reassigned_fields.emplace_back(ReassignField(fields[i], new_ids[i], get_id, + ids_to_reassigned, ids_to_original)); + } + return std::make_shared(std::move(reassigned_fields)); + } + case TypeId::kList: { + const auto& list_type = static_cast(*type); + const auto& element = list_type.element(); + const auto new_id = get_id(element.field_id()); + RecordIdReassignment(element.field_id(), new_id, ids_to_reassigned, + ids_to_original); + return std::make_shared( + ReassignField(element, new_id, get_id, ids_to_reassigned, ids_to_original)); + } + case TypeId::kMap: { + const auto& map_type = static_cast(*type); + const auto& key = map_type.key(); + const auto& value = map_type.value(); + const auto new_key_id = get_id(key.field_id()); + const auto new_value_id = get_id(value.field_id()); + RecordIdReassignment(key.field_id(), new_key_id, ids_to_reassigned, + ids_to_original); + RecordIdReassignment(value.field_id(), new_value_id, ids_to_reassigned, + ids_to_original); + return std::make_shared( + ReassignField(key, new_key_id, get_id, ids_to_reassigned, ids_to_original), + ReassignField(value, new_value_id, get_id, ids_to_reassigned, ids_to_original)); + } + default: + return type; + } +} + +SchemaField ReassignField(const SchemaField& field, int32_t new_id, + const Schema::GetId& get_id, Schema::IdMap& ids_to_reassigned, + Schema::IdMap& ids_to_original) { + return {new_id, std::string(field.name()), + ReassignTypeIds(field.type(), get_id, ids_to_reassigned, ids_to_original), + field.optional(), std::string(field.doc())}; +} + +std::vector ReassignIds(std::vector fields, + const Schema::GetId& get_id, + SchemaReassignIdContext& reassign_id_context) { + auto reassigned_type = ReassignTypeIds(std::make_shared(std::move(fields)), + get_id, reassign_id_context.ids_to_reassigned, + reassign_id_context.ids_to_original); + const auto& reassigned_fields = + internal::checked_cast(*reassigned_type).fields(); + return {reassigned_fields.begin(), reassigned_fields.end()}; +} + Status ValidateFieldNullability(const Type& type) { auto validate_field = [&](const SchemaField& field) -> Status { ICEBERG_PRECHECK(field.optional() || field.type()->type_id() != TypeId::kUnknown, @@ -73,17 +168,23 @@ Status ValidateFieldNullability(const Type& type) { } // namespace -Schema::Schema(std::vector fields, int32_t schema_id) +Schema::Schema(std::vector fields, int32_t schema_id, GetId get_id) : StructType(std::move(fields)), schema_id_(schema_id), - cache_(std::make_unique(this)) {} + cache_(std::make_unique(this)) { + if (get_id) { + reassign_id_context_ = std::make_unique(); + fields_ = ReassignIds(std::move(fields_), get_id, *reassign_id_context_); + } +} Schema::~Schema() = default; Result> Schema::Make(std::vector fields, int32_t schema_id, - std::vector identifier_field_ids) { - auto schema = std::make_unique(std::move(fields), schema_id); + std::vector identifier_field_ids, + GetId get_id) { + auto schema = std::make_unique(std::move(fields), schema_id, std::move(get_id)); if (!identifier_field_ids.empty()) { auto id_to_parent = IndexParents(*schema); @@ -99,8 +200,8 @@ Result> Schema::Make(std::vector fields, Result> Schema::Make( std::vector fields, int32_t schema_id, - const std::vector& identifier_field_names) { - auto schema = std::make_unique(std::move(fields), schema_id); + const std::vector& identifier_field_names, GetId get_id) { + auto schema = std::make_unique(std::move(fields), schema_id, std::move(get_id)); std::vector fresh_identifier_ids; for (const auto& name : identifier_field_names) { @@ -181,6 +282,14 @@ const std::shared_ptr& Schema::EmptySchema() { int32_t Schema::schema_id() const { return schema_id_; } +const Schema::IdMap& Schema::IdsToReassigned() const { + return reassign_id_context_ ? reassign_id_context_->ids_to_reassigned : EmptyIdMap(); +} + +const Schema::IdMap& Schema::IdsToOriginal() const { + return reassign_id_context_ ? reassign_id_context_->ids_to_original : EmptyIdMap(); +} + std::string Schema::ToString() const { std::string repr = "schema<"; for (const auto& field : fields_) { diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h index 791ed5c8f..9245be02e 100644 --- a/src/iceberg/schema.h +++ b/src/iceberg/schema.h @@ -24,6 +24,7 @@ /// and any utility functions. See iceberg/type.h and iceberg/field.h as well. #include +#include #include #include #include @@ -40,6 +41,7 @@ namespace iceberg { class SchemaCache; +struct SchemaReassignIdContext; /// \brief A schema for a Table. /// @@ -55,7 +57,14 @@ class ICEBERG_EXPORT Schema : public StructType { /// \brief Special value to select all columns from manifest files. static constexpr std::string_view kAllColumns = "*"; - explicit Schema(std::vector fields, int32_t schema_id = kInitialSchemaId); + /// \brief Maps an original field ID to its reassigned ID. + /// + /// The mapping is total: return the original ID when no reassignment is needed. + using GetId = std::function; + using IdMap = std::unordered_map; + + explicit Schema(std::vector fields, int32_t schema_id = kInitialSchemaId, + GetId get_id = {}); ~Schema() override; @@ -64,10 +73,12 @@ class ICEBERG_EXPORT Schema : public StructType { /// \param fields The fields that make up the schema. /// \param schema_id The unique identifier for this schema (default:kInitialSchemaId). /// \param identifier_field_ids Field IDs that uniquely identify rows in the table. + /// \param get_id Function mapping each original field ID to its reassigned ID. /// \return A new Schema instance or Status if failed. static Result> Make(std::vector fields, int32_t schema_id, - std::vector identifier_field_ids); + std::vector identifier_field_ids, + GetId get_id = {}); /// \brief Create a schema. /// @@ -75,10 +86,11 @@ class ICEBERG_EXPORT Schema : public StructType { /// \param schema_id The unique identifier for this schema (default: kInitialSchemaId). /// \param identifier_field_names Canonical names of fields that uniquely identify rows /// in the table. + /// \param get_id Function mapping each original field ID to its reassigned ID. /// \return A new Schema instance or Status if failed. static Result> Make( std::vector fields, int32_t schema_id, - const std::vector& identifier_field_names); + const std::vector& identifier_field_names, GetId get_id = {}); /// \brief Validate that the identifier field with the given ID is valid for the schema /// @@ -166,6 +178,12 @@ class ICEBERG_EXPORT Schema : public StructType { /// \brief Return the field IDs of the identifier fields. const std::vector& IdentifierFieldIds() const; + /// \brief Return a map of original field IDs to reassigned field IDs. + const IdMap& IdsToReassigned() const; + + /// \brief Return a map of reassigned field IDs to original field IDs. + const IdMap& IdsToOriginal() const; + /// \brief Return the canonical field names of the identifier fields. Result> IdentifierFieldNames() const; @@ -196,6 +214,7 @@ class ICEBERG_EXPORT Schema : public StructType { const int32_t schema_id_; // Field IDs that uniquely identify rows in the table. std::vector identifier_field_ids_; + std::unique_ptr reassign_id_context_; // Cache for schema mappings to facilitate fast lookups. std::unique_ptr cache_; }; diff --git a/src/iceberg/test/assign_id_visitor_test.cc b/src/iceberg/test/assign_id_visitor_test.cc index 8bec0ad53..4830e4803 100644 --- a/src/iceberg/test/assign_id_visitor_test.cc +++ b/src/iceberg/test/assign_id_visitor_test.cc @@ -106,6 +106,8 @@ TEST(AssignFreshIdVisitorTest, FlatSchema) { }, Schema::kInitialSchemaId), *fresh_schema); + EXPECT_TRUE(fresh_schema->IdsToReassigned().empty()); + EXPECT_TRUE(fresh_schema->IdsToOriginal().empty()); } TEST(AssignFreshIdVisitorTest, NestedSchema) { @@ -169,6 +171,55 @@ TEST(AssignFreshIdVisitorTest, NestedSchema) { EXPECT_EQ(*expect_nested_struct_type, *nested_struct_type); } +TEST(AssignFreshIdVisitorTest, GetIdMaps) { + ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema()); + std::vector fields(schema->fields().begin(), schema->fields().end()); + auto reassign_id = [](int32_t old_id) { return old_id + 1000; }; + + Schema reassigned_schema(std::move(fields), Schema::kInitialSchemaId, reassign_id); + + EXPECT_EQ(reassigned_schema.fields()[0].field_id(), 1010); + EXPECT_EQ(reassigned_schema.fields()[1].field_id(), 1020); + auto list_type = + std::dynamic_pointer_cast(reassigned_schema.fields()[1].type()); + ASSERT_TRUE(list_type); + EXPECT_EQ(list_type->element().field_id(), 1101); + + EXPECT_EQ(reassigned_schema.IdsToReassigned().size(), 15U); + EXPECT_THAT( + reassigned_schema.IdsToReassigned(), + testing::UnorderedElementsAre( + testing::Pair(10, 1010), testing::Pair(20, 1020), testing::Pair(30, 1030), + testing::Pair(40, 1040), testing::Pair(101, 1101), testing::Pair(102, 1102), + testing::Pair(103, 1103), testing::Pair(201, 1201), testing::Pair(202, 1202), + testing::Pair(203, 1203), testing::Pair(204, 1204), testing::Pair(301, 1301), + testing::Pair(302, 1302), testing::Pair(303, 1303), testing::Pair(304, 1304))); + EXPECT_THAT( + reassigned_schema.IdsToOriginal(), + testing::UnorderedElementsAre( + testing::Pair(1010, 10), testing::Pair(1020, 20), testing::Pair(1030, 30), + testing::Pair(1040, 40), testing::Pair(1101, 101), testing::Pair(1102, 102), + testing::Pair(1103, 103), testing::Pair(1201, 201), testing::Pair(1202, 202), + testing::Pair(1203, 203), testing::Pair(1204, 204), testing::Pair(1301, 301), + testing::Pair(1302, 302), testing::Pair(1303, 303), testing::Pair(1304, 304))); +} + +TEST(AssignFreshIdVisitorTest, GetIdIdentifierNames) { + ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema()); + std::vector fields(schema->fields().begin(), schema->fields().end()); + auto reassign_id = [](int32_t old_id) { return old_id + 1000; }; + + ICEBERG_UNWRAP_OR_FAIL( + auto reassigned_schema, + Schema::Make(std::move(fields), Schema::kInitialSchemaId, + std::vector{"id", "struct.outer_id"}, reassign_id)); + + EXPECT_THAT(reassigned_schema->IdentifierFieldIds(), testing::ElementsAre(1010, 1301)); + ICEBERG_UNWRAP_OR_FAIL(auto identifier_field_names, + reassigned_schema->IdentifierFieldNames()); + EXPECT_THAT(identifier_field_names, testing::ElementsAre("id", "struct.outer_id")); +} + TEST(AssignFreshIdVisitorTest, RefreshIdentifierId) { int32_t id = 0; auto next_id = [&id]() { return ++id; }; diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index 3a99b0009..274a8cb44 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -752,7 +752,7 @@ TEST_F(ExpireSnapshotsCleanupTest, IncrementalSkipsCherryPickedSnapshotCleanup) EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, kCurrentSnapshotId); } -TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupFailsClosedOnUnbindableExpiredSpec) { +TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupReadsExpiredSpecWithMissingSource) { const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; const auto expired_manifest_list_path = @@ -796,9 +796,9 @@ TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupFailsClosedOnUnbindableExpire [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); EXPECT_THAT(update->Commit(), IsOk()); - EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, + EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, + expired_data_manifest_path, expired_manifest_list_path)); - EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path))); } TEST_F(ExpireSnapshotsCleanupTest, CommitIgnoresMalformedSourceSnapshotIdCleanup) { diff --git a/src/iceberg/test/manifest_reader_test.cc b/src/iceberg/test/manifest_reader_test.cc index f2c4ef406..4a17eacaa 100644 --- a/src/iceberg/test/manifest_reader_test.cc +++ b/src/iceberg/test/manifest_reader_test.cc @@ -288,6 +288,26 @@ TEST_P(TestManifestReader, TestManifestReaderWithPartitionMetadata) { EXPECT_EQ(read_entry.data_file->partition.values()[0], Literal::Int(0)); } +TEST_P(TestManifestReader, ReadsEntriesWhenPartitionSourceFieldIsMissing) { + auto version = GetParam(); + auto file = MakeDataFile("/path/to/historical-data.parquet", + PartitionValues({Literal::Int(7)})); + auto manifest = + WriteManifest(version, /*snapshot_id=*/1000L, + {MakeEntry(ManifestStatus::kAdded, 1000L, std::move(file))}); + + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(/*field_id=*/3, "id", int32())}); + + ICEBERG_UNWRAP_OR_FAIL(auto reader, + ManifestReader::Make(manifest, file_io_, current_schema, spec_)); + ICEBERG_UNWRAP_OR_FAIL(auto read_entries, reader->Entries()); + + ASSERT_EQ(read_entries.size(), 1U); + EXPECT_EQ(read_entries[0].data_file->file_path, "/path/to/historical-data.parquet"); + EXPECT_EQ(read_entries[0].data_file->record_count, 1); +} + TEST_P(TestManifestReader, TestDeleteFilesWithReferences) { auto version = GetParam(); if (version < 2) { diff --git a/src/iceberg/test/partition_spec_test.cc b/src/iceberg/test/partition_spec_test.cc index 89c4cdc83..e3b86e274 100644 --- a/src/iceberg/test/partition_spec_test.cc +++ b/src/iceberg/test/partition_spec_test.cc @@ -149,6 +149,59 @@ TEST(PartitionSpecTest, PartitionTypeTest) { EXPECT_EQ(pt_field3, partition_type->fields()[2]); } +TEST(PartitionSpecTest, PartitionTypeMissingSource) { + Schema schema({SchemaField::MakeRequired(2, "ts", timestamp())}, + Schema::kInitialSchemaId); + ICEBERG_UNWRAP_OR_FAIL( + auto spec, + PartitionSpec::Make( + 1, {PartitionField(1, 1000, "dropped_identity", Transform::Identity()), + PartitionField(3, 1001, "dropped_bucket", Transform::Bucket(16)), + PartitionField(2, 1002, "ts_day", Transform::Day())})); + + ICEBERG_UNWRAP_OR_FAIL(auto partition_type, spec->PartitionType(schema)); + + ASSERT_EQ(partition_type->fields().size(), 3U); + EXPECT_EQ(partition_type->fields()[0], + SchemaField::MakeOptional(1000, "dropped_identity", unknown())); + EXPECT_EQ(partition_type->fields()[1], + SchemaField::MakeOptional(1001, "dropped_bucket", unknown())); + EXPECT_EQ(partition_type->fields()[2], + SchemaField::MakeOptional(1002, "ts_day", date())); +} + +TEST(PartitionSpecTest, RawPartitionTypeNoReassign) { + Schema schema({SchemaField::MakeRequired(2, "ts", timestamp())}, + Schema::kInitialSchemaId); + ICEBERG_UNWRAP_OR_FAIL( + auto spec, PartitionSpec::Make( + 1, {PartitionField(1, 1000, "dropped", Transform::Identity())})); + + ICEBERG_UNWRAP_OR_FAIL(auto partition_type, spec->PartitionType(schema)); + ICEBERG_UNWRAP_OR_FAIL(auto raw_partition_type, spec->RawPartitionType(schema)); + + EXPECT_EQ(*raw_partition_type, *partition_type); +} + +TEST(PartitionSpecTest, RawPartitionTypeReassignIds) { + auto reassign_id = [](int32_t old_id) { return old_id == 1000 ? 2000 : old_id; }; + Schema schema({SchemaField::MakeOptional(1000, "partition_col", int32())}, + Schema::kInitialSchemaId, reassign_id); + ICEBERG_UNWRAP_OR_FAIL( + auto spec, PartitionSpec::Make(1, {PartitionField(2000, 2000, "partition_col", + Transform::Identity())})); + + ICEBERG_UNWRAP_OR_FAIL(auto partition_type, spec->PartitionType(schema)); + ICEBERG_UNWRAP_OR_FAIL(auto raw_partition_type, spec->RawPartitionType(schema)); + + ASSERT_EQ(partition_type->fields().size(), 1U); + EXPECT_EQ(partition_type->fields()[0], + SchemaField::MakeOptional(2000, "partition_col", int32())); + ASSERT_EQ(raw_partition_type->fields().size(), 1U); + EXPECT_EQ(raw_partition_type->fields()[0], + SchemaField::MakeOptional(1000, "partition_col", int32())); +} + TEST(PartitionSpecTest, InvalidTransformForType) { // Test Day transform on string type (should fail) auto field_string = SchemaField::MakeRequired(6, "s", string()); diff --git a/src/iceberg/test/transform_test.cc b/src/iceberg/test/transform_test.cc index 47a1e87e6..943b2fb52 100644 --- a/src/iceberg/test/transform_test.cc +++ b/src/iceberg/test/transform_test.cc @@ -159,12 +159,16 @@ TEST(TransformResultTypeTest, PositiveCases) { ASSERT_TRUE(result.has_value()) << "Failed to parse: " << c.str; const auto& transform = result.value(); - const auto transformPtr = transform->Bind(c.source_type); - ASSERT_TRUE(transformPtr.has_value()) << "Failed to bind: " << c.str; - - auto result_type = transformPtr.value()->ResultType(); + auto result_type = transform->ResultType(c.source_type); + ASSERT_NE(result_type, nullptr) << "Missing result type for: " << c.str; EXPECT_EQ(result_type->type_id(), c.expected_result_type->type_id()) << "Unexpected result type for: " << c.str; + + const auto transform_func = transform->Bind(c.source_type); + ASSERT_TRUE(transform_func.has_value()) << "Failed to bind: " << c.str; + EXPECT_EQ(transform_func.value()->ResultType()->type_id(), + c.expected_result_type->type_id()) + << "Unexpected bound result type for: " << c.str; } } diff --git a/src/iceberg/transform.cc b/src/iceberg/transform.cc index c915ec067..453941c95 100644 --- a/src/iceberg/transform.cc +++ b/src/iceberg/transform.cc @@ -135,6 +135,26 @@ Result> Transform::Bind( } } +std::shared_ptr Transform::ResultType( + const std::shared_ptr& source_type) const { + switch (transform_type_) { + case TransformType::kIdentity: + case TransformType::kTruncate: + case TransformType::kVoid: + return source_type; + case TransformType::kBucket: + case TransformType::kYear: + case TransformType::kMonth: + case TransformType::kHour: + return int32(); + case TransformType::kDay: + return date(); + case TransformType::kUnknown: + return string(); + } + std::unreachable(); +} + bool Transform::CanTransform(const Type& source_type) const { switch (transform_type_) { case TransformType::kIdentity: diff --git a/src/iceberg/transform.h b/src/iceberg/transform.h index 873b3ca6e..6b855a74d 100644 --- a/src/iceberg/transform.h +++ b/src/iceberg/transform.h @@ -151,6 +151,9 @@ class ICEBERG_EXPORT Transform : public util::Formattable { Result> Bind( const std::shared_ptr& source_type) const; + /// \brief Return the type produced by this transform for a source type. + std::shared_ptr ResultType(const std::shared_ptr& source_type) const; + /// \brief Checks whether this function can be applied to the given Type. /// \param source_type The source type to check. /// \return true if this transform can be applied to the type, false otherwise diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index c9ac9e4cd..7c056fab2 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -51,10 +51,6 @@ namespace { Result> MakeManifestReader( const ManifestFile& manifest, const std::shared_ptr& file_io, const TableMetadata& metadata) { - // TODO(gangwu): Build manifest file schemas from PartitionSpec::RawPartitionType - // with UnknownType for dropped source fields instead of requiring the table schema - // to bind every partition source field. Until then, cleanup fails closed when - // historical specs cannot bind to the metadata schema. ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); TableMetadataCache metadata_cache(&metadata); ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());