Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/iceberg/manifest/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ Result<std::vector<ManifestEntry>> ManifestReaderImpl::LiveEntries() {
}

Result<std::vector<ManifestEntry>> ManifestReaderImpl::ReadEntries(bool only_live) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->PartitionType(*schema_));
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->RawPartitionType(*schema_));
auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema();

std::shared_ptr<Schema> projected_data_file_schema;
Expand Down
43 changes: 29 additions & 14 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,16 @@ Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(

std::vector<SchemaField> partition_fields;
for (const auto& partition_field : fields_) {
// Get the source field from the original schema by source_id
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
schema.FindFieldById(partition_field.source_id()));
if (!source_field.has_value()) {
// TODO(xiao.dong) when source field is missing,
// should return an error or just use UNKNOWN type
return InvalidSchema("Cannot find source field for partition field:{}",
partition_field.field_id());
std::shared_ptr<Type> result_type;
if (source_field.has_value()) {
auto source_field_type = source_field.value().get().type();
result_type = partition_field.transform()->ResultType(std::move(source_field_type));
} else {
result_type = unknown();
}
auto source_field_type = source_field.value().get().type();
// Bind the transform to the source field type to get the result type
ICEBERG_ASSIGN_OR_RAISE(auto transform_function,
partition_field.transform()->Bind(source_field_type));

auto result_type = transform_function->ResultType();

// Create the partition field with the transform result type
// Partition fields are always optional (can be null)
partition_fields.emplace_back(partition_field.field_id(),
std::string(partition_field.name()),
std::move(result_type),
Expand All @@ -101,6 +93,29 @@ Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(
return std::make_unique<StructType>(std::move(partition_fields));
}

Result<std::unique_ptr<StructType>> PartitionSpec::RawPartitionType(
const Schema& schema) const {
const auto& ids_to_original = schema.IdsToOriginal();
if (ids_to_original.empty()) {
return PartitionType(schema);
}

ICEBERG_ASSIGN_OR_RAISE(auto partition_type, PartitionType(schema));
std::vector<SchemaField> raw_partition_fields;
raw_partition_fields.reserve(partition_type->fields().size());
for (const auto& field : partition_type->fields()) {
auto original_id = ids_to_original.find(field.field_id());
if (original_id == ids_to_original.end()) {
return InvalidSchema("Cannot find original field ID for reassigned field ID: {}",
field.field_id());
}
raw_partition_fields.emplace_back(original_id->second, std::string(field.name()),
field.type(), field.optional(),
std::string(field.doc()));
}
return std::make_unique<StructType>(std::move(raw_partition_fields));
}

Result<std::string> PartitionSpec::PartitionPath(const PartitionValues& data) const {
ICEBERG_PRECHECK(fields_.size() == data.num_fields(),
"Partition spec and data mismatch, expected field num {}, got {}",
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
/// \brief Get the partition type binding to the input schema.
Result<std::unique_ptr<StructType>> PartitionType(const Schema& schema) const;

/// \brief Get the partition type as physically written in manifest files.
Result<std::unique_ptr<StructType>> RawPartitionType(const Schema& schema) const;

/// \brief Get the partition path for the given partition data.
Result<std::string> PartitionPath(const PartitionValues& data) const;

Expand Down
121 changes: 115 additions & 6 deletions src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,103 @@

namespace iceberg {

struct SchemaReassignIdContext {
Schema::IdMap ids_to_reassigned;
Schema::IdMap ids_to_original;
};

namespace {

const Schema::IdMap& EmptyIdMap() {
static const Schema::IdMap kEmpty;
return kEmpty;
}

void RecordIdReassignment(int32_t old_id, int32_t new_id,
Schema::IdMap& ids_to_reassigned,
Schema::IdMap& ids_to_original) {
if (new_id != old_id) {
ids_to_reassigned[old_id] = new_id;
ids_to_original[new_id] = old_id;
}
}

SchemaField ReassignField(const SchemaField& field, int32_t new_id,
const Schema::GetId& get_id, Schema::IdMap& ids_to_reassigned,
Schema::IdMap& ids_to_original);

std::shared_ptr<Type> ReassignTypeIds(const std::shared_ptr<Type>& type,
const Schema::GetId& get_id,
Schema::IdMap& ids_to_reassigned,
Schema::IdMap& ids_to_original) {
switch (type->type_id()) {
case TypeId::kStruct: {
const auto& struct_type = static_cast<const StructType&>(*type);
const auto& fields = struct_type.fields();
std::vector<int32_t> new_ids;
new_ids.reserve(fields.size());
for (const auto& field : fields) {
const auto new_id = get_id(field.field_id());
RecordIdReassignment(field.field_id(), new_id, ids_to_reassigned,
ids_to_original);
new_ids.push_back(new_id);
}

std::vector<SchemaField> reassigned_fields;
reassigned_fields.reserve(fields.size());
for (size_t i = 0; i < fields.size(); ++i) {
reassigned_fields.emplace_back(ReassignField(fields[i], new_ids[i], get_id,
ids_to_reassigned, ids_to_original));
}
return std::make_shared<StructType>(std::move(reassigned_fields));
}
case TypeId::kList: {
const auto& list_type = static_cast<const ListType&>(*type);
const auto& element = list_type.element();
const auto new_id = get_id(element.field_id());
RecordIdReassignment(element.field_id(), new_id, ids_to_reassigned,
ids_to_original);
return std::make_shared<ListType>(
ReassignField(element, new_id, get_id, ids_to_reassigned, ids_to_original));
}
case TypeId::kMap: {
const auto& map_type = static_cast<const MapType&>(*type);
const auto& key = map_type.key();
const auto& value = map_type.value();
const auto new_key_id = get_id(key.field_id());
const auto new_value_id = get_id(value.field_id());
RecordIdReassignment(key.field_id(), new_key_id, ids_to_reassigned,
ids_to_original);
RecordIdReassignment(value.field_id(), new_value_id, ids_to_reassigned,
ids_to_original);
return std::make_shared<MapType>(
ReassignField(key, new_key_id, get_id, ids_to_reassigned, ids_to_original),
ReassignField(value, new_value_id, get_id, ids_to_reassigned, ids_to_original));
}
default:
return type;
}
}

SchemaField ReassignField(const SchemaField& field, int32_t new_id,
const Schema::GetId& get_id, Schema::IdMap& ids_to_reassigned,
Schema::IdMap& ids_to_original) {
return {new_id, std::string(field.name()),
ReassignTypeIds(field.type(), get_id, ids_to_reassigned, ids_to_original),
field.optional(), std::string(field.doc())};
}

std::vector<SchemaField> ReassignIds(std::vector<SchemaField> fields,
const Schema::GetId& get_id,
SchemaReassignIdContext& reassign_id_context) {
auto reassigned_type = ReassignTypeIds(std::make_shared<StructType>(std::move(fields)),
get_id, reassign_id_context.ids_to_reassigned,
reassign_id_context.ids_to_original);
const auto& reassigned_fields =
internal::checked_cast<const StructType&>(*reassigned_type).fields();
return {reassigned_fields.begin(), reassigned_fields.end()};
}

Status ValidateFieldNullability(const Type& type) {
auto validate_field = [&](const SchemaField& field) -> Status {
ICEBERG_PRECHECK(field.optional() || field.type()->type_id() != TypeId::kUnknown,
Expand Down Expand Up @@ -73,17 +168,23 @@ Status ValidateFieldNullability(const Type& type) {

} // namespace

Schema::Schema(std::vector<SchemaField> fields, int32_t schema_id)
Schema::Schema(std::vector<SchemaField> fields, int32_t schema_id, GetId get_id)
: StructType(std::move(fields)),
schema_id_(schema_id),
cache_(std::make_unique<SchemaCache>(this)) {}
cache_(std::make_unique<SchemaCache>(this)) {
if (get_id) {
reassign_id_context_ = std::make_unique<SchemaReassignIdContext>();
fields_ = ReassignIds(std::move(fields_), get_id, *reassign_id_context_);
}
}

Schema::~Schema() = default;

Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
int32_t schema_id,
std::vector<int32_t> identifier_field_ids) {
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
std::vector<int32_t> identifier_field_ids,
GetId get_id) {
auto schema = std::make_unique<Schema>(std::move(fields), schema_id, std::move(get_id));

if (!identifier_field_ids.empty()) {
auto id_to_parent = IndexParents(*schema);
Expand All @@ -99,8 +200,8 @@ Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,

Result<std::unique_ptr<Schema>> Schema::Make(
std::vector<SchemaField> fields, int32_t schema_id,
const std::vector<std::string>& identifier_field_names) {
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
const std::vector<std::string>& identifier_field_names, GetId get_id) {
auto schema = std::make_unique<Schema>(std::move(fields), schema_id, std::move(get_id));

std::vector<int32_t> fresh_identifier_ids;
for (const auto& name : identifier_field_names) {
Expand Down Expand Up @@ -181,6 +282,14 @@ const std::shared_ptr<Schema>& Schema::EmptySchema() {

int32_t Schema::schema_id() const { return schema_id_; }

const Schema::IdMap& Schema::IdsToReassigned() const {
return reassign_id_context_ ? reassign_id_context_->ids_to_reassigned : EmptyIdMap();
}

const Schema::IdMap& Schema::IdsToOriginal() const {
return reassign_id_context_ ? reassign_id_context_->ids_to_original : EmptyIdMap();
}

std::string Schema::ToString() const {
std::string repr = "schema<";
for (const auto& field : fields_) {
Expand Down
25 changes: 22 additions & 3 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
/// and any utility functions. See iceberg/type.h and iceberg/field.h as well.

#include <cstdint>
#include <functional>
#include <optional>
#include <string>
#include <unordered_map>
Expand All @@ -40,6 +41,7 @@
namespace iceberg {

class SchemaCache;
struct SchemaReassignIdContext;

/// \brief A schema for a Table.
///
Expand All @@ -55,7 +57,14 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Special value to select all columns from manifest files.
static constexpr std::string_view kAllColumns = "*";

explicit Schema(std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId);
/// \brief Maps an original field ID to its reassigned ID.
///
/// The mapping is total: return the original ID when no reassignment is needed.
using GetId = std::function<int32_t(int32_t)>;
using IdMap = std::unordered_map<int32_t, int32_t>;

explicit Schema(std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId,
GetId get_id = {});

~Schema() override;

Expand All @@ -64,21 +73,24 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \param fields The fields that make up the schema.
/// \param schema_id The unique identifier for this schema (default:kInitialSchemaId).
/// \param identifier_field_ids Field IDs that uniquely identify rows in the table.
/// \param get_id Function mapping each original field ID to its reassigned ID.
/// \return A new Schema instance or Status if failed.
static Result<std::unique_ptr<Schema>> Make(std::vector<SchemaField> fields,
int32_t schema_id,
std::vector<int32_t> identifier_field_ids);
std::vector<int32_t> identifier_field_ids,
GetId get_id = {});

/// \brief Create a schema.
///
/// \param fields The fields that make up the schema.
/// \param schema_id The unique identifier for this schema (default: kInitialSchemaId).
/// \param identifier_field_names Canonical names of fields that uniquely identify rows
/// in the table.
/// \param get_id Function mapping each original field ID to its reassigned ID.
/// \return A new Schema instance or Status if failed.
static Result<std::unique_ptr<Schema>> Make(
std::vector<SchemaField> fields, int32_t schema_id,
const std::vector<std::string>& identifier_field_names);
const std::vector<std::string>& identifier_field_names, GetId get_id = {});

/// \brief Validate that the identifier field with the given ID is valid for the schema
///
Expand Down Expand Up @@ -166,6 +178,12 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Return the field IDs of the identifier fields.
const std::vector<int32_t>& IdentifierFieldIds() const;

/// \brief Return a map of original field IDs to reassigned field IDs.
const IdMap& IdsToReassigned() const;

/// \brief Return a map of reassigned field IDs to original field IDs.
const IdMap& IdsToOriginal() const;

/// \brief Return the canonical field names of the identifier fields.
Result<std::vector<std::string>> IdentifierFieldNames() const;

Expand Down Expand Up @@ -196,6 +214,7 @@ class ICEBERG_EXPORT Schema : public StructType {
const int32_t schema_id_;
// Field IDs that uniquely identify rows in the table.
std::vector<int32_t> identifier_field_ids_;
std::unique_ptr<SchemaReassignIdContext> reassign_id_context_;
// Cache for schema mappings to facilitate fast lookups.
std::unique_ptr<SchemaCache> cache_;
};
Expand Down
51 changes: 51 additions & 0 deletions src/iceberg/test/assign_id_visitor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ TEST(AssignFreshIdVisitorTest, FlatSchema) {
},
Schema::kInitialSchemaId),
*fresh_schema);
EXPECT_TRUE(fresh_schema->IdsToReassigned().empty());
EXPECT_TRUE(fresh_schema->IdsToOriginal().empty());
}

TEST(AssignFreshIdVisitorTest, NestedSchema) {
Expand Down Expand Up @@ -169,6 +171,55 @@ TEST(AssignFreshIdVisitorTest, NestedSchema) {
EXPECT_EQ(*expect_nested_struct_type, *nested_struct_type);
}

TEST(AssignFreshIdVisitorTest, GetIdMaps) {
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema());
std::vector<SchemaField> fields(schema->fields().begin(), schema->fields().end());
auto reassign_id = [](int32_t old_id) { return old_id + 1000; };

Schema reassigned_schema(std::move(fields), Schema::kInitialSchemaId, reassign_id);

EXPECT_EQ(reassigned_schema.fields()[0].field_id(), 1010);
EXPECT_EQ(reassigned_schema.fields()[1].field_id(), 1020);
auto list_type =
std::dynamic_pointer_cast<ListType>(reassigned_schema.fields()[1].type());
ASSERT_TRUE(list_type);
EXPECT_EQ(list_type->element().field_id(), 1101);

EXPECT_EQ(reassigned_schema.IdsToReassigned().size(), 15U);
EXPECT_THAT(
reassigned_schema.IdsToReassigned(),
testing::UnorderedElementsAre(
testing::Pair(10, 1010), testing::Pair(20, 1020), testing::Pair(30, 1030),
testing::Pair(40, 1040), testing::Pair(101, 1101), testing::Pair(102, 1102),
testing::Pair(103, 1103), testing::Pair(201, 1201), testing::Pair(202, 1202),
testing::Pair(203, 1203), testing::Pair(204, 1204), testing::Pair(301, 1301),
testing::Pair(302, 1302), testing::Pair(303, 1303), testing::Pair(304, 1304)));
EXPECT_THAT(
reassigned_schema.IdsToOriginal(),
testing::UnorderedElementsAre(
testing::Pair(1010, 10), testing::Pair(1020, 20), testing::Pair(1030, 30),
testing::Pair(1040, 40), testing::Pair(1101, 101), testing::Pair(1102, 102),
testing::Pair(1103, 103), testing::Pair(1201, 201), testing::Pair(1202, 202),
testing::Pair(1203, 203), testing::Pair(1204, 204), testing::Pair(1301, 301),
testing::Pair(1302, 302), testing::Pair(1303, 303), testing::Pair(1304, 304)));
}

TEST(AssignFreshIdVisitorTest, GetIdIdentifierNames) {
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema());
std::vector<SchemaField> fields(schema->fields().begin(), schema->fields().end());
auto reassign_id = [](int32_t old_id) { return old_id + 1000; };

ICEBERG_UNWRAP_OR_FAIL(
auto reassigned_schema,
Schema::Make(std::move(fields), Schema::kInitialSchemaId,
std::vector<std::string>{"id", "struct.outer_id"}, reassign_id));

EXPECT_THAT(reassigned_schema->IdentifierFieldIds(), testing::ElementsAre(1010, 1301));
ICEBERG_UNWRAP_OR_FAIL(auto identifier_field_names,
reassigned_schema->IdentifierFieldNames());
EXPECT_THAT(identifier_field_names, testing::ElementsAre("id", "struct.outer_id"));
}

TEST(AssignFreshIdVisitorTest, RefreshIdentifierId) {
int32_t id = 0;
auto next_id = [&id]() { return ++id; };
Expand Down
Loading
Loading