Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/iceberg/catalog/rest/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,10 @@ Result<RenameTableRequest> RenameTableRequestFromJson(const nlohmann::json& json
}

// LoadTableResult (used by CreateTableResponse, LoadTableResponse)
nlohmann::json ToJson(const LoadTableResult& result) {
Result<nlohmann::json> ToJson(const LoadTableResult& result) {
nlohmann::json json;
SetOptionalStringField(json, kMetadataLocation, result.metadata_location);
json[kMetadata] = ToJson(*result.metadata);
ICEBERG_ASSIGN_OR_RAISE(json[kMetadata], ToJson(*result.metadata));
SetContainerField(json, kConfig, result.config);
return json;
}
Expand Down Expand Up @@ -820,12 +820,12 @@ Result<ListTablesResponse> ListTablesResponseFromJson(const nlohmann::json& json
return response;
}

nlohmann::json ToJson(const CreateTableRequest& request) {
Result<nlohmann::json> ToJson(const CreateTableRequest& request) {
nlohmann::json json;
json[kName] = request.name;
SetOptionalStringField(json, kLocation, request.location);
if (request.schema) {
json[kSchema] = ToJson(*request.schema);
ICEBERG_ASSIGN_OR_RAISE(json[kSchema], ToJson(*request.schema));
}
if (request.partition_spec) {
json[kPartitionSpec] = ToJson(*request.partition_spec);
Expand Down Expand Up @@ -872,7 +872,7 @@ Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json
}

// CommitTableRequest serialization
nlohmann::json ToJson(const CommitTableRequest& request) {
Result<nlohmann::json> ToJson(const CommitTableRequest& request) {
nlohmann::json json;
if (!request.identifier.name.empty()) {
json[kIdentifier] = ToJson(request.identifier);
Expand All @@ -886,7 +886,8 @@ nlohmann::json ToJson(const CommitTableRequest& request) {

nlohmann::json updates_json = nlohmann::json::array();
for (const auto& update : request.updates) {
updates_json.push_back(ToJson(*update));
ICEBERG_ASSIGN_OR_RAISE(auto update_json, ToJson(*update));
updates_json.push_back(std::move(update_json));
}
json[kUpdates] = std::move(updates_json);

Expand Down Expand Up @@ -932,11 +933,11 @@ Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& json
}

// CommitTableResponse serialization
nlohmann::json ToJson(const CommitTableResponse& response) {
Result<nlohmann::json> ToJson(const CommitTableResponse& response) {
nlohmann::json json;
json[kMetadataLocation] = response.metadata_location;
if (response.metadata) {
json[kMetadata] = ToJson(*response.metadata);
ICEBERG_ASSIGN_OR_RAISE(json[kMetadata], ToJson(*response.metadata));
}
return json;
}
Expand Down
30 changes: 26 additions & 4 deletions src/iceberg/catalog/rest/json_serde_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,38 @@ ICEBERG_DECLARE_JSON_SERDE(GetNamespaceResponse)
ICEBERG_DECLARE_JSON_SERDE(UpdateNamespacePropertiesRequest)
ICEBERG_DECLARE_JSON_SERDE(UpdateNamespacePropertiesResponse)
ICEBERG_DECLARE_JSON_SERDE(ListTablesResponse)
ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)
ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse)

#undef ICEBERG_DECLARE_JSON_SERDE

// These models embed a Schema/TableMetadata whose default-value serialization can fail,
// so their ToJson returns Result. FromJson is declared like the macro-based models above.
ICEBERG_REST_EXPORT Result<LoadTableResult> LoadTableResultFromJson(
const nlohmann::json& json);
template <>
ICEBERG_REST_EXPORT Result<LoadTableResult> FromJson(const nlohmann::json& json);
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const LoadTableResult& model);

ICEBERG_REST_EXPORT Result<CreateTableRequest> CreateTableRequestFromJson(
const nlohmann::json& json);
template <>
ICEBERG_REST_EXPORT Result<CreateTableRequest> FromJson(const nlohmann::json& json);
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const CreateTableRequest& model);

ICEBERG_REST_EXPORT Result<CommitTableRequest> CommitTableRequestFromJson(
const nlohmann::json& json);
template <>
ICEBERG_REST_EXPORT Result<CommitTableRequest> FromJson(const nlohmann::json& json);
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const CommitTableRequest& model);

ICEBERG_REST_EXPORT Result<CommitTableResponse> CommitTableResponseFromJson(
const nlohmann::json& json);
template <>
ICEBERG_REST_EXPORT Result<CommitTableResponse> FromJson(const nlohmann::json& json);
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const CommitTableResponse& model);

ICEBERG_REST_EXPORT Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
const nlohmann::json& json,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
Expand Down
6 changes: 4 additions & 2 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
.properties = properties,
};

ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(auto request_json, ToJson(request));
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(request_json));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Post(path, json_request, /*headers=*/{},
*TableErrorHandler::Instance(), session));
Expand Down Expand Up @@ -710,7 +711,8 @@ Result<CommitTableResponse> RestCatalog::UpdateTableInternal(
request.updates.push_back(update->Clone());
}

ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(auto request_json, ToJson(request));
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(request_json));
ICEBERG_ASSIGN_OR_RAISE(auto is_create, TableRequirements::IsCreate(requirements));
const ErrorHandler* error_handler = TableCommitErrorHandler::Instance().get();
if (is_create) {
Expand Down
106 changes: 88 additions & 18 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include <nlohmann/json.hpp>

#include "iceberg/constants.h"
#include "iceberg/expression/json_serde_internal.h"
#include "iceberg/expression/literal.h"
#include "iceberg/json_serde_internal.h"
#include "iceberg/name_mapping.h"
#include "iceberg/partition_field.h"
Expand All @@ -49,6 +51,7 @@
#include "iceberg/util/json_util_internal.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"
#include "iceberg/util/temporal_util.h"
#include "iceberg/util/timepoint.h"

namespace iceberg {
Expand Down Expand Up @@ -315,28 +318,35 @@ Result<std::unique_ptr<SortOrder>> SortOrderFromJson(const nlohmann::json& json)
return SortOrder::Make(parsed.order_id, std::move(parsed.fields));
}

nlohmann::json ToJson(const SchemaField& field) {
Result<nlohmann::json> ToJson(const SchemaField& field) {
nlohmann::json json;
json[kId] = field.field_id();
json[kName] = field.name();
json[kRequired] = !field.optional();
json[kType] = ToJson(*field.type());
ICEBERG_ASSIGN_OR_RAISE(json[kType], ToJson(*field.type()));
if (!field.doc().empty()) {
json[kDoc] = field.doc();
}
if (field.initial_default().has_value()) {
ICEBERG_ASSIGN_OR_RAISE(json[kInitialDefault],
ToJson(field.initial_default()->get()));
}
if (field.write_default().has_value()) {
ICEBERG_ASSIGN_OR_RAISE(json[kWriteDefault], ToJson(field.write_default()->get()));
}
return json;
}

nlohmann::json ToJson(const Type& type) {
Result<nlohmann::json> ToJson(const Type& type) {
switch (type.type_id()) {
case TypeId::kStruct: {
const auto& struct_type = internal::checked_cast<const StructType&>(type);
nlohmann::json json;
json[kType] = kStruct;
nlohmann::json fields_json = nlohmann::json::array();
for (const auto& field : struct_type.fields()) {
fields_json.push_back(ToJson(field));
// TODO(gangwu): add default values
ICEBERG_ASSIGN_OR_RAISE(auto field_json, ToJson(field));
fields_json.push_back(std::move(field_json));
}
json[kFields] = fields_json;
return json;
Expand All @@ -349,7 +359,7 @@ nlohmann::json ToJson(const Type& type) {
const auto& element_field = list_type.fields().front();
json[kElementId] = element_field.field_id();
json[kElementRequired] = !element_field.optional();
json[kElement] = ToJson(*element_field.type());
ICEBERG_ASSIGN_OR_RAISE(json[kElement], ToJson(*element_field.type()));
return json;
}
case TypeId::kMap: {
Expand All @@ -359,12 +369,12 @@ nlohmann::json ToJson(const Type& type) {

const auto& key_field = map_type.key();
json[kKeyId] = key_field.field_id();
json[kKey] = ToJson(*key_field.type());
ICEBERG_ASSIGN_OR_RAISE(json[kKey], ToJson(*key_field.type()));

const auto& value_field = map_type.value();
json[kValueId] = value_field.field_id();
json[kValueRequired] = !value_field.optional();
json[kValue] = ToJson(*value_field.type());
ICEBERG_ASSIGN_OR_RAISE(json[kValue], ToJson(*value_field.type()));
return json;
}
case TypeId::kBoolean:
Expand Down Expand Up @@ -416,8 +426,9 @@ nlohmann::json ToJson(const Type& type) {
std::unreachable();
}

nlohmann::json ToJson(const Schema& schema) {
nlohmann::json json = ToJson(internal::checked_cast<const Type&>(schema));
Result<nlohmann::json> ToJson(const Schema& schema) {
ICEBERG_ASSIGN_OR_RAISE(nlohmann::json json,
ToJson(internal::checked_cast<const Type&>(schema)));
json[kSchemaId] = schema.schema_id();
if (!schema.IdentifierFieldIds().empty()) {
json[kIdentifierFieldIds] = schema.IdentifierFieldIds();
Expand All @@ -426,7 +437,8 @@ nlohmann::json ToJson(const Schema& schema) {
}

Result<std::string> ToJsonString(const Schema& schema) {
return ToJsonString(ToJson(schema));
ICEBERG_ASSIGN_OR_RAISE(auto json, ToJson(schema));
return ToJsonString(json);
}

nlohmann::json ToJson(const SnapshotRef& ref) {
Expand Down Expand Up @@ -625,16 +637,66 @@ Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
}
}

namespace {

// The spec's JSON single-value form for `timestamptz` / `timestamptz_ns` default
// values requires a UTC offset. The shared timestamp parser accepts any offset and
// silently normalizes to UTC, which would let C++ accept default metadata that Java
// rejects and then rewrite the offset on serialization. Enforce UTC for these
// defaults at parse time, where the original offset is still visible.
Status ValidateTimestamptzDefaultIsUtc(const Type& type, const nlohmann::json& value) {
const auto type_id = type.type_id();
if (type_id != TypeId::kTimestampTz && type_id != TypeId::kTimestampTzNs) {
return {};
}
if (!value.is_string()) {
return JsonParseError("Invalid timestamptz default {} for {}: expected a string",
SafeDumpJson(value), type.ToString());
}
const auto str = value.get<std::string>();
ICEBERG_ASSIGN_OR_RAISE(bool is_utc, TemporalUtils::IsUtcOffset(str));
if (!is_utc) {
return JsonParseError(
"Invalid timestamptz default '{}' for {}: default values must use a UTC offset",
str, type.ToString());
}
return {};
}

} // namespace

Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(
auto type, GetJsonValue<nlohmann::json>(json, kType).and_then(TypeFromJson));
ICEBERG_ASSIGN_OR_RAISE(auto field_id, GetJsonValue<int32_t>(json, kId));
ICEBERG_ASSIGN_OR_RAISE(auto name, GetJsonValue<std::string>(json, kName));
ICEBERG_ASSIGN_OR_RAISE(auto required, GetJsonValue<bool>(json, kRequired));
ICEBERG_ASSIGN_OR_RAISE(auto doc, GetJsonValueOrDefault<std::string>(json, kDoc));
ICEBERG_ASSIGN_OR_RAISE(auto initial_default_json,
GetJsonValueOptional<nlohmann::json>(json, kInitialDefault));
ICEBERG_ASSIGN_OR_RAISE(auto write_default_json,
GetJsonValueOptional<nlohmann::json>(json, kWriteDefault));

std::shared_ptr<const Literal> initial_default;
if (initial_default_json.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
ValidateTimestamptzDefaultIsUtc(*type, *initial_default_json));
ICEBERG_ASSIGN_OR_RAISE(Literal literal,
LiteralFromJson(*initial_default_json, type.get()));
Comment thread
wgtmac marked this conversation as resolved.
initial_default = std::make_shared<const Literal>(std::move(literal));
}
std::shared_ptr<const Literal> write_default;
if (write_default_json.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
ValidateTimestamptzDefaultIsUtc(*type, *write_default_json));
ICEBERG_ASSIGN_OR_RAISE(Literal literal,
LiteralFromJson(*write_default_json, type.get()));
write_default = std::make_shared<const Literal>(std::move(literal));
}
Comment thread
huan233usc marked this conversation as resolved.

return std::make_unique<SchemaField>(field_id, std::move(name), std::move(type),
!required, doc);
!required, doc, std::move(initial_default),
std::move(write_default));
}

Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
Expand Down Expand Up @@ -966,7 +1028,7 @@ Result<EncryptedKey> EncryptedKeyFromJson(const nlohmann::json& json) {
};
}

nlohmann::json ToJson(const TableMetadata& table_metadata) {
Result<nlohmann::json> ToJson(const TableMetadata& table_metadata) {
nlohmann::json json;

json[kFormatVersion] = table_metadata.format_version;
Expand All @@ -984,15 +1046,22 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
if (table_metadata.format_version == 1) {
for (const auto& schema : table_metadata.schemas) {
if (schema->schema_id() == table_metadata.current_schema_id) {
json[kSchema] = ToJson(*schema);
ICEBERG_ASSIGN_OR_RAISE(json[kSchema], ToJson(*schema));
break;
}
}
}

// write the current schema ID and schema list
json[kCurrentSchemaId] = table_metadata.current_schema_id;
json[kSchemas] = ToJsonList(table_metadata.schemas);
// Schemas can carry fallible default-value serialization, so the shared ToJsonList
// helper (which assumes infallible ToJson) is not used here.
nlohmann::json schemas_json = nlohmann::json::array();
for (const auto& schema : table_metadata.schemas) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_json, ToJson(*schema));
schemas_json.push_back(std::move(schema_json));
}
json[kSchemas] = std::move(schemas_json);

// for older readers, continue writing the default spec as "partition-spec"
if (table_metadata.format_version == 1) {
Expand Down Expand Up @@ -1042,7 +1111,8 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
}

Result<std::string> ToJsonString(const TableMetadata& table_metadata) {
return ToJsonString(ToJson(table_metadata));
ICEBERG_ASSIGN_OR_RAISE(auto json, ToJson(table_metadata));
return ToJsonString(json);
}

namespace {
Expand Down Expand Up @@ -1446,7 +1516,7 @@ Result<Namespace> NamespaceFromJson(const nlohmann::json& json) {
return ns;
}

nlohmann::json ToJson(const TableUpdate& update) {
Result<nlohmann::json> ToJson(const TableUpdate& update) {
nlohmann::json json;
switch (update.kind()) {
case TableUpdate::Kind::kAssignUUID: {
Expand All @@ -1465,7 +1535,7 @@ nlohmann::json ToJson(const TableUpdate& update) {
const auto& u = internal::checked_cast<const table::AddSchema&>(update);
json[kAction] = kActionAddSchema;
if (u.schema()) {
json[kSchema] = ToJson(*u.schema());
ICEBERG_ASSIGN_OR_RAISE(json[kSchema], ToJson(*u.schema()));
} else {
json[kSchema] = nlohmann::json::value_t::null;
}
Expand Down
Loading
Loading