Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/iceberg/catalog/rest/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,10 @@ Result<RenameTableRequest> RenameTableRequestFromJson(const nlohmann::json& json
}

// LoadTableResult (used by CreateTableResponse, LoadTableResponse)
nlohmann::json ToJson(const LoadTableResult& result) {
Result<nlohmann::json> ToJson(const LoadTableResult& result) {
nlohmann::json json;
SetOptionalStringField(json, kMetadataLocation, result.metadata_location);
json[kMetadata] = ToJson(*result.metadata);
ICEBERG_ASSIGN_OR_RAISE(json[kMetadata], ToJson(*result.metadata));
SetContainerField(json, kConfig, result.config);
return json;
}
Expand Down Expand Up @@ -820,12 +820,12 @@ Result<ListTablesResponse> ListTablesResponseFromJson(const nlohmann::json& json
return response;
}

nlohmann::json ToJson(const CreateTableRequest& request) {
Result<nlohmann::json> ToJson(const CreateTableRequest& request) {
nlohmann::json json;
json[kName] = request.name;
SetOptionalStringField(json, kLocation, request.location);
if (request.schema) {
json[kSchema] = ToJson(*request.schema);
ICEBERG_ASSIGN_OR_RAISE(json[kSchema], ToJson(*request.schema));
}
if (request.partition_spec) {
json[kPartitionSpec] = ToJson(*request.partition_spec);
Expand Down Expand Up @@ -872,7 +872,7 @@ Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json
}

// CommitTableRequest serialization
nlohmann::json ToJson(const CommitTableRequest& request) {
Result<nlohmann::json> ToJson(const CommitTableRequest& request) {
nlohmann::json json;
if (!request.identifier.name.empty()) {
json[kIdentifier] = ToJson(request.identifier);
Expand All @@ -886,7 +886,8 @@ nlohmann::json ToJson(const CommitTableRequest& request) {

nlohmann::json updates_json = nlohmann::json::array();
for (const auto& update : request.updates) {
updates_json.push_back(ToJson(*update));
ICEBERG_ASSIGN_OR_RAISE(auto update_json, ToJson(*update));
updates_json.push_back(std::move(update_json));
}
json[kUpdates] = std::move(updates_json);

Expand Down Expand Up @@ -932,11 +933,11 @@ Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& json
}

// CommitTableResponse serialization
nlohmann::json ToJson(const CommitTableResponse& response) {
Result<nlohmann::json> ToJson(const CommitTableResponse& response) {
nlohmann::json json;
json[kMetadataLocation] = response.metadata_location;
if (response.metadata) {
json[kMetadata] = ToJson(*response.metadata);
ICEBERG_ASSIGN_OR_RAISE(json[kMetadata], ToJson(*response.metadata));
}
return json;
}
Expand Down
30 changes: 26 additions & 4 deletions src/iceberg/catalog/rest/json_serde_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,38 @@ ICEBERG_DECLARE_JSON_SERDE(GetNamespaceResponse)
ICEBERG_DECLARE_JSON_SERDE(UpdateNamespacePropertiesRequest)
ICEBERG_DECLARE_JSON_SERDE(UpdateNamespacePropertiesResponse)
ICEBERG_DECLARE_JSON_SERDE(ListTablesResponse)
ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)
ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse)

#undef ICEBERG_DECLARE_JSON_SERDE

// These models embed a Schema/TableMetadata whose ToJson returns Result, so their own
// ToJson returns Result too. FromJson is declared like the macro-based models above.
ICEBERG_REST_EXPORT Result<LoadTableResult> LoadTableResultFromJson(
const nlohmann::json& json);
template <>
ICEBERG_REST_EXPORT Result<LoadTableResult> FromJson(const nlohmann::json& json);
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const LoadTableResult& model);

ICEBERG_REST_EXPORT Result<CreateTableRequest> CreateTableRequestFromJson(
const nlohmann::json& json);
template <>
ICEBERG_REST_EXPORT Result<CreateTableRequest> FromJson(const nlohmann::json& json);
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const CreateTableRequest& model);

ICEBERG_REST_EXPORT Result<CommitTableRequest> CommitTableRequestFromJson(
const nlohmann::json& json);
template <>
ICEBERG_REST_EXPORT Result<CommitTableRequest> FromJson(const nlohmann::json& json);
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const CommitTableRequest& model);

ICEBERG_REST_EXPORT Result<CommitTableResponse> CommitTableResponseFromJson(
const nlohmann::json& json);
template <>
ICEBERG_REST_EXPORT Result<CommitTableResponse> FromJson(const nlohmann::json& json);
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const CommitTableResponse& model);

ICEBERG_REST_EXPORT Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
const nlohmann::json& json,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
Expand Down
6 changes: 4 additions & 2 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
.properties = properties,
};

ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(auto request_json, ToJson(request));
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(request_json));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Post(path, json_request, /*headers=*/{},
*TableErrorHandler::Instance(), session));
Expand Down Expand Up @@ -710,7 +711,8 @@ Result<CommitTableResponse> RestCatalog::UpdateTableInternal(
request.updates.push_back(update->Clone());
}

ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(auto request_json, ToJson(request));
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(request_json));
ICEBERG_ASSIGN_OR_RAISE(auto is_create, TableRequirements::IsCreate(requirements));
const ErrorHandler* error_handler = TableCommitErrorHandler::Instance().get();
if (is_create) {
Expand Down
43 changes: 27 additions & 16 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,27 +315,28 @@ Result<std::unique_ptr<SortOrder>> SortOrderFromJson(const nlohmann::json& json)
return SortOrder::Make(parsed.order_id, std::move(parsed.fields));
}

nlohmann::json ToJson(const SchemaField& field) {
Result<nlohmann::json> ToJson(const SchemaField& field) {
nlohmann::json json;
json[kId] = field.field_id();
json[kName] = field.name();
json[kRequired] = !field.optional();
json[kType] = ToJson(*field.type());
ICEBERG_ASSIGN_OR_RAISE(json[kType], ToJson(*field.type()));
if (!field.doc().empty()) {
json[kDoc] = field.doc();
}
return json;
}

nlohmann::json ToJson(const Type& type) {
Result<nlohmann::json> ToJson(const Type& type) {
switch (type.type_id()) {
case TypeId::kStruct: {
const auto& struct_type = internal::checked_cast<const StructType&>(type);
nlohmann::json json;
json[kType] = kStruct;
nlohmann::json fields_json = nlohmann::json::array();
for (const auto& field : struct_type.fields()) {
fields_json.push_back(ToJson(field));
ICEBERG_ASSIGN_OR_RAISE(auto field_json, ToJson(field));
fields_json.push_back(std::move(field_json));
// TODO(gangwu): add default values
}
json[kFields] = fields_json;
Expand All @@ -349,7 +350,7 @@ nlohmann::json ToJson(const Type& type) {
const auto& element_field = list_type.fields().front();
json[kElementId] = element_field.field_id();
json[kElementRequired] = !element_field.optional();
json[kElement] = ToJson(*element_field.type());
ICEBERG_ASSIGN_OR_RAISE(json[kElement], ToJson(*element_field.type()));
return json;
}
case TypeId::kMap: {
Expand All @@ -359,12 +360,12 @@ nlohmann::json ToJson(const Type& type) {

const auto& key_field = map_type.key();
json[kKeyId] = key_field.field_id();
json[kKey] = ToJson(*key_field.type());
ICEBERG_ASSIGN_OR_RAISE(json[kKey], ToJson(*key_field.type()));

const auto& value_field = map_type.value();
json[kValueId] = value_field.field_id();
json[kValueRequired] = !value_field.optional();
json[kValue] = ToJson(*value_field.type());
ICEBERG_ASSIGN_OR_RAISE(json[kValue], ToJson(*value_field.type()));
return json;
}
case TypeId::kBoolean:
Expand Down Expand Up @@ -416,8 +417,9 @@ nlohmann::json ToJson(const Type& type) {
std::unreachable();
}

nlohmann::json ToJson(const Schema& schema) {
nlohmann::json json = ToJson(internal::checked_cast<const Type&>(schema));
Result<nlohmann::json> ToJson(const Schema& schema) {
ICEBERG_ASSIGN_OR_RAISE(nlohmann::json json,
ToJson(internal::checked_cast<const Type&>(schema)));
json[kSchemaId] = schema.schema_id();
if (!schema.IdentifierFieldIds().empty()) {
json[kIdentifierFieldIds] = schema.IdentifierFieldIds();
Expand All @@ -426,7 +428,8 @@ nlohmann::json ToJson(const Schema& schema) {
}

Result<std::string> ToJsonString(const Schema& schema) {
return ToJsonString(ToJson(schema));
ICEBERG_ASSIGN_OR_RAISE(auto json, ToJson(schema));
return ToJsonString(json);
}

nlohmann::json ToJson(const SnapshotRef& ref) {
Expand Down Expand Up @@ -966,7 +969,7 @@ Result<EncryptedKey> EncryptedKeyFromJson(const nlohmann::json& json) {
};
}

nlohmann::json ToJson(const TableMetadata& table_metadata) {
Result<nlohmann::json> ToJson(const TableMetadata& table_metadata) {
nlohmann::json json;

json[kFormatVersion] = table_metadata.format_version;
Expand All @@ -984,15 +987,22 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
if (table_metadata.format_version == 1) {
for (const auto& schema : table_metadata.schemas) {
if (schema->schema_id() == table_metadata.current_schema_id) {
json[kSchema] = ToJson(*schema);
ICEBERG_ASSIGN_OR_RAISE(json[kSchema], ToJson(*schema));
break;
}
}
}

// write the current schema ID and schema list
json[kCurrentSchemaId] = table_metadata.current_schema_id;
json[kSchemas] = ToJsonList(table_metadata.schemas);
// ToJson(Schema) is fallible, so the shared ToJsonList helper (which assumes an
// infallible ToJson) cannot be used here; build the array with an explicit loop.
nlohmann::json schemas_json = nlohmann::json::array();
for (const auto& schema : table_metadata.schemas) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_json, ToJson(*schema));
schemas_json.push_back(std::move(schema_json));
}
json[kSchemas] = std::move(schemas_json);

// for older readers, continue writing the default spec as "partition-spec"
if (table_metadata.format_version == 1) {
Expand Down Expand Up @@ -1042,7 +1052,8 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
}

Result<std::string> ToJsonString(const TableMetadata& table_metadata) {
return ToJsonString(ToJson(table_metadata));
ICEBERG_ASSIGN_OR_RAISE(auto json, ToJson(table_metadata));
return ToJsonString(json);
}

namespace {
Expand Down Expand Up @@ -1446,7 +1457,7 @@ Result<Namespace> NamespaceFromJson(const nlohmann::json& json) {
return ns;
}

nlohmann::json ToJson(const TableUpdate& update) {
Result<nlohmann::json> ToJson(const TableUpdate& update) {
nlohmann::json json;
switch (update.kind()) {
case TableUpdate::Kind::kAssignUUID: {
Expand All @@ -1465,7 +1476,7 @@ nlohmann::json ToJson(const TableUpdate& update) {
const auto& u = internal::checked_cast<const table::AddSchema&>(update);
json[kAction] = kActionAddSchema;
if (u.schema()) {
json[kSchema] = ToJson(*u.schema());
ICEBERG_ASSIGN_OR_RAISE(json[kSchema], ToJson(*u.schema()));
} else {
json[kSchema] = nlohmann::json::value_t::null;
}
Expand Down
10 changes: 5 additions & 5 deletions src/iceberg/json_serde_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ ICEBERG_EXPORT Result<std::unique_ptr<SortOrder>> SortOrderFromJson(
///
/// \param schema The Iceberg schema to convert.
/// \return The JSON representation of the schema.
ICEBERG_EXPORT nlohmann::json ToJson(const Schema& schema);
ICEBERG_EXPORT Result<nlohmann::json> ToJson(const Schema& schema);

/// \brief Convert an Iceberg Schema to JSON.
///
Expand All @@ -111,7 +111,7 @@ ICEBERG_EXPORT Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::js
///
/// \param type The Iceberg type to convert.
/// \return The JSON representation of the type.
ICEBERG_EXPORT nlohmann::json ToJson(const Type& type);
ICEBERG_EXPORT Result<nlohmann::json> ToJson(const Type& type);

/// \brief Convert JSON to an Iceberg Type.
///
Expand All @@ -123,7 +123,7 @@ ICEBERG_EXPORT Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json&
///
/// \param field The Iceberg field to convert.
/// \return The JSON representation of the field.
ICEBERG_EXPORT nlohmann::json ToJson(const SchemaField& field);
ICEBERG_EXPORT Result<nlohmann::json> ToJson(const SchemaField& field);

/// \brief Convert JSON to an Iceberg SchemaField.
///
Expand Down Expand Up @@ -300,7 +300,7 @@ ICEBERG_EXPORT Result<EncryptedKey> EncryptedKeyFromJson(const nlohmann::json& j
///
/// \param table_metadata The `TableMetadata` object to be serialized.
/// \return A JSON object representing the `TableMetadata`.
ICEBERG_EXPORT nlohmann::json ToJson(const TableMetadata& table_metadata);
ICEBERG_EXPORT Result<nlohmann::json> ToJson(const TableMetadata& table_metadata);

/// \brief Serializes a `TableMetadata` object to JSON.
///
Expand Down Expand Up @@ -404,7 +404,7 @@ ICEBERG_EXPORT Result<Namespace> NamespaceFromJson(const nlohmann::json& json);
///
/// \param update The `TableUpdate` object to be serialized.
/// \return A JSON object representing the `TableUpdate`.
ICEBERG_EXPORT nlohmann::json ToJson(const TableUpdate& update);
ICEBERG_EXPORT Result<nlohmann::json> ToJson(const TableUpdate& update);

/// \brief Deserializes a JSON object into a `TableUpdate` object.
///
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ Result<std::string> TableMetadataUtil::Write(FileIO& io, const TableMetadata* ba

Status TableMetadataUtil::Write(FileIO& io, const std::string& location,
const TableMetadata& metadata) {
auto json = ToJson(metadata);
ICEBERG_ASSIGN_OR_RAISE(auto json, ToJson(metadata));
ICEBERG_ASSIGN_OR_RAISE(auto json_string, ToJsonString(json));
return io.WriteFile(location, json_string);
}
Expand Down
Loading
Loading