diff --git a/src/iceberg/catalog/rest/endpoint.h b/src/iceberg/catalog/rest/endpoint.h index 7382955ce..fdcd2108e 100644 --- a/src/iceberg/catalog/rest/endpoint.h +++ b/src/iceberg/catalog/rest/endpoint.h @@ -128,6 +128,26 @@ class ICEBERG_REST_EXPORT Endpoint { return {HttpMethod::kPost, "/v1/{prefix}/transactions/commit"}; } + // Scan planning endpoints + static Endpoint PlanTableScan() { + return {HttpMethod::kPost, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"}; + } + + static Endpoint FetchPlanningResult() { + return {HttpMethod::kGet, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"}; + } + + static Endpoint CancelPlanning() { + return {HttpMethod::kDelete, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"}; + } + + static Endpoint FetchScanTasks() { + return {HttpMethod::kPost, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks"}; + } + private: Endpoint(HttpMethod method, std::string_view path) : method_(method), path_(path) {} diff --git a/src/iceberg/catalog/rest/error_handlers.cc b/src/iceberg/catalog/rest/error_handlers.cc index f3e5b8fb3..67146e745 100644 --- a/src/iceberg/catalog/rest/error_handlers.cc +++ b/src/iceberg/catalog/rest/error_handlers.cc @@ -30,6 +30,9 @@ namespace { constexpr std::string_view kIllegalArgumentException = "IllegalArgumentException"; constexpr std::string_view kNoSuchNamespaceException = "NoSuchNamespaceException"; constexpr std::string_view kNamespaceNotEmptyException = "NamespaceNotEmptyException"; +constexpr std::string_view kNoSuchTableException = "NoSuchTableException"; +constexpr std::string_view kNoSuchPlanIdException = "NoSuchPlanIdException"; +constexpr std::string_view kNoSuchPlanTaskException = "NoSuchPlanTaskException"; } // namespace @@ -183,4 +186,52 @@ Status ViewCommitErrorHandler::Accept(const ErrorResponse& error) const { return DefaultErrorHandler::Accept(error); } +const std::shared_ptr& PlanErrorHandler::Instance() { + static const std::shared_ptr instance{new PlanErrorHandler()}; + return instance; +} + +Status PlanErrorHandler::Accept(const ErrorResponse& error) const { + switch (error.code) { + case 404: + if (error.type == kNoSuchNamespaceException) { + return NoSuchNamespace(error.message); + } + if (error.type == kNoSuchTableException) { + return NoSuchTable(error.message); + } + if (error.type == kNoSuchPlanIdException) { + return NoSuchPlanId(error.message); + } + return NotFound(error.message); + case 406: + return NotSupported(error.message); + } + + return DefaultErrorHandler::Accept(error); +} + +const std::shared_ptr& PlanTaskErrorHandler::Instance() { + static const std::shared_ptr instance{new PlanTaskErrorHandler()}; + return instance; +} + +Status PlanTaskErrorHandler::Accept(const ErrorResponse& error) const { + switch (error.code) { + case 404: + if (error.type == kNoSuchNamespaceException) { + return NoSuchNamespace(error.message); + } + if (error.type == kNoSuchTableException) { + return NoSuchTable(error.message); + } + if (error.type == kNoSuchPlanTaskException) { + return NoSuchPlanTask(error.message); + } + return NotFound(error.message); + } + + return DefaultErrorHandler::Accept(error); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/error_handlers.h b/src/iceberg/catalog/rest/error_handlers.h index eae2c9b7f..ee338fb3e 100644 --- a/src/iceberg/catalog/rest/error_handlers.h +++ b/src/iceberg/catalog/rest/error_handlers.h @@ -127,4 +127,26 @@ class ICEBERG_REST_EXPORT ViewCommitErrorHandler final : public DefaultErrorHand constexpr ViewCommitErrorHandler() = default; }; +/// \brief Plan operation error handler. +class ICEBERG_REST_EXPORT PlanErrorHandler final : public DefaultErrorHandler { + public: + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorResponse& error) const override; + + private: + constexpr PlanErrorHandler() = default; +}; + +/// \brief Fetch scan tasks operation error handler. +class ICEBERG_REST_EXPORT PlanTaskErrorHandler final : public DefaultErrorHandler { + public: + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorResponse& error) const override; + + private: + constexpr PlanTaskErrorHandler() = default; +}; + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/json_serde.cc b/src/iceberg/catalog/rest/json_serde.cc index eebdc1969..552435191 100644 --- a/src/iceberg/catalog/rest/json_serde.cc +++ b/src/iceberg/catalog/rest/json_serde.cc @@ -17,8 +17,10 @@ * under the License. */ +#include #include #include +#include #include #include @@ -26,8 +28,11 @@ #include "iceberg/catalog/rest/json_serde_internal.h" #include "iceberg/catalog/rest/types.h" +#include "iceberg/expression/json_serde_internal.h" +#include "iceberg/file_format.h" #include "iceberg/json_serde_internal.h" #include "iceberg/partition_spec.h" +#include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_identifier.h" #include "iceberg/table_requirement.h" @@ -78,6 +83,392 @@ constexpr std::string_view kExpiresIn = "expires_in"; constexpr std::string_view kIssuedTokenType = "issued_token_type"; constexpr std::string_view kRefreshToken = "refresh_token"; constexpr std::string_view kOAuthScope = "scope"; +constexpr std::string_view kPlanStatus = "status"; +constexpr std::string_view kPlanId = "plan-id"; +constexpr std::string_view kPlanTasks = "plan-tasks"; +constexpr std::string_view kFileScanTasks = "file-scan-tasks"; +constexpr std::string_view kDeleteFiles = "delete-files"; +constexpr std::string_view kSnapshotId = "snapshot-id"; +constexpr std::string_view kSelect = "select"; +constexpr std::string_view kFilter = "filter"; +constexpr std::string_view kCaseSensitive = "case-sensitive"; +constexpr std::string_view kUseSnapshotSchema = "use-snapshot-schema"; +constexpr std::string_view kStartSnapshotId = "start-snapshot-id"; +constexpr std::string_view kEndSnapshotId = "end-snapshot-id"; +constexpr std::string_view kStatsFields = "stats-fields"; +constexpr std::string_view kMinRowsRequested = "min-rows-requested"; +constexpr std::string_view kPlanTask = "plan-task"; +constexpr std::string_view kContent = "content"; +constexpr std::string_view kFilePath = "file-path"; +constexpr std::string_view kFileFormat = "file-format"; +constexpr std::string_view kSpecId = "spec-id"; +constexpr std::string_view kPartition = "partition"; +constexpr std::string_view kRecordCount = "record-count"; +constexpr std::string_view kFileSizeInBytes = "file-size-in-bytes"; +constexpr std::string_view kColumnSizes = "column-sizes"; +constexpr std::string_view kValueCounts = "value-counts"; +constexpr std::string_view kNullValueCounts = "null-value-counts"; +constexpr std::string_view kNanValueCounts = "nan-value-counts"; +constexpr std::string_view kLowerBounds = "lower-bounds"; +constexpr std::string_view kUpperBounds = "upper-bounds"; +constexpr std::string_view kKeyMetadata = "key-metadata"; +constexpr std::string_view kSplitOffsets = "split-offsets"; +constexpr std::string_view kEqualityIds = "equality-ids"; +constexpr std::string_view kSortOrderId = "sort-order-id"; +constexpr std::string_view kFirstRowId = "first-row-id"; +constexpr std::string_view kReferencedDataFile = "referenced-data-file"; +constexpr std::string_view kContentOffset = "content-offset"; +constexpr std::string_view kContentSizeInBytes = "content-size-in-bytes"; +constexpr std::string_view kDataFile = "data-file"; +constexpr std::string_view kDeleteFileReferences = "delete-file-references"; +constexpr std::string_view kResidualFilter = "residual-filter"; + +} // namespace + +Result DataFileFromJson( + const nlohmann::json& json, + const std::unordered_map>& partition_spec_by_id, + const Schema& schema) { + if (!json.is_object()) { + return JsonParseError("DataFile must be a JSON object: {}", SafeDumpJson(json)); + } + DataFile df; + + ICEBERG_ASSIGN_OR_RAISE(auto content_str, GetJsonValue(json, kContent)); + if (content_str == ToString(DataFile::Content::kData)) { + df.content = DataFile::Content::kData; + } else if (content_str == ToString(DataFile::Content::kPositionDeletes)) { + df.content = DataFile::Content::kPositionDeletes; + } else if (content_str == ToString(DataFile::Content::kEqualityDeletes)) { + df.content = DataFile::Content::kEqualityDeletes; + } else { + return JsonParseError("Unknown data file content: {}", content_str); + } + + ICEBERG_ASSIGN_OR_RAISE(df.file_path, GetJsonValue(json, kFilePath)); + ICEBERG_ASSIGN_OR_RAISE(auto format_str, GetJsonValue(json, kFileFormat)); + ICEBERG_ASSIGN_OR_RAISE(df.file_format, FileFormatTypeFromString(format_str)); + + if (json.contains(kSpecId) && !json.at(kSpecId).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue(json, kSpecId)); + df.partition_spec_id = spec_id; + } + + if (json.contains(kPartition)) { + ICEBERG_ASSIGN_OR_RAISE(auto partition_vals, + GetJsonValue(json, kPartition)); + if (!partition_vals.is_array()) { + return JsonParseError("PartitionValues must be a JSON array: {}", + SafeDumpJson(partition_vals)); + } + std::vector literals; + auto it = partition_spec_by_id.find(df.partition_spec_id.value_or(-1)); + if (it == partition_spec_by_id.end()) { + return JsonParseError("Invalid partition spec id: {}", + df.partition_spec_id.value_or(-1)); + } + ICEBERG_ASSIGN_OR_RAISE(auto struct_type, it->second->PartitionType(schema)); + auto fields = struct_type->fields(); + if (partition_vals.size() != fields.size()) { + return JsonParseError("Invalid partition data size: expected = {}, actual = {}", + fields.size(), partition_vals.size()); + } + for (size_t pos = 0; pos < fields.size(); ++pos) { + ICEBERG_ASSIGN_OR_RAISE( + auto literal, LiteralFromJson(partition_vals[pos], fields[pos].type().get())); + literals.push_back(std::move(literal)); + } + df.partition = PartitionValues(std::move(literals)); + } + + ICEBERG_ASSIGN_OR_RAISE(df.record_count, GetJsonValue(json, kRecordCount)); + ICEBERG_ASSIGN_OR_RAISE(df.file_size_in_bytes, + GetJsonValue(json, kFileSizeInBytes)); + + auto parse_int_map = [&](std::string_view key, + std::map& target) -> Status { + if (!json.contains(key) || json.at(key).is_null()) { + return {}; + } + ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue(json, key)); + ICEBERG_ASSIGN_OR_RAISE(auto keys, + GetTypedJsonValue>(map_json.at("keys"))); + ICEBERG_ASSIGN_OR_RAISE( + auto values, GetTypedJsonValue>(map_json.at("values"))); + if (keys.size() != values.size()) { + return JsonParseError("'{}' map keys and values have different lengths", key); + } + for (size_t i = 0; i < keys.size(); ++i) { + target[keys[i]] = values[i]; + } + return {}; + }; + + ICEBERG_RETURN_UNEXPECTED(parse_int_map(kColumnSizes, df.column_sizes)); + ICEBERG_RETURN_UNEXPECTED(parse_int_map(kValueCounts, df.value_counts)); + ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNullValueCounts, df.null_value_counts)); + ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNanValueCounts, df.nan_value_counts)); + + auto parse_binary_map = [&](std::string_view key, + std::map>& target) -> Status { + if (!json.contains(key) || json.at(key).is_null()) { + return {}; + } + ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue(json, key)); + ICEBERG_ASSIGN_OR_RAISE(auto keys, + GetJsonValue>(map_json, "keys")); + ICEBERG_ASSIGN_OR_RAISE( + auto values, GetJsonValue>>(map_json, "values")); + if (keys.size() != values.size()) { + return JsonParseError("'{}' binary map keys and values have different lengths", key); + } + for (size_t i = 0; i < keys.size(); ++i) { + target[keys[i]] = values[i]; + } + return {}; + }; + + ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kLowerBounds, df.lower_bounds)); + ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kUpperBounds, df.upper_bounds)); + + if (json.contains(kKeyMetadata) && !json.at(kKeyMetadata).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.key_metadata, + GetJsonValue>(json, kKeyMetadata)); + } + if (json.contains(kSplitOffsets) && !json.at(kSplitOffsets).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.split_offsets, + GetJsonValue>(json, kSplitOffsets)); + } + if (json.contains(kEqualityIds) && !json.at(kEqualityIds).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.equality_ids, + GetJsonValue>(json, kEqualityIds)); + } + if (json.contains(kSortOrderId) && !json.at(kSortOrderId).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.sort_order_id, GetJsonValue(json, kSortOrderId)); + } + if (json.contains(kFirstRowId) && !json.at(kFirstRowId).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.first_row_id, GetJsonValue(json, kFirstRowId)); + } + if (json.contains(kReferencedDataFile) && !json.at(kReferencedDataFile).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.referenced_data_file, + GetJsonValue(json, kReferencedDataFile)); + } + if (json.contains(kContentOffset) && !json.at(kContentOffset).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.content_offset, + GetJsonValue(json, kContentOffset)); + } + if (json.contains(kContentSizeInBytes) && !json.at(kContentSizeInBytes).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.content_size_in_bytes, + GetJsonValue(json, kContentSizeInBytes)); + } + + return df; +} + +Result>> FileScanTasksFromJson( + const nlohmann::json& json, + const std::vector>& delete_files, + const std::unordered_map>& partition_spec_by_id, + const Schema& schema) { + if (!json.is_array()) { + return JsonParseError("Cannot parse file scan tasks from non-array: {}", + SafeDumpJson(json)); + } + std::vector> file_scan_tasks; + for (const auto& task_json : json) { + if (!task_json.is_object()) { + return JsonParseError("Cannot parse file scan task from a non-object: {}", + SafeDumpJson(task_json)); + } + + ICEBERG_ASSIGN_OR_RAISE(auto data_file_json, + GetJsonValue(task_json, kDataFile)); + ICEBERG_ASSIGN_OR_RAISE(auto data_file, + DataFileFromJson(data_file_json, partition_spec_by_id, schema)); + + std::vector> task_delete_files; + if (task_json.contains(kDeleteFileReferences) && + !task_json.at(kDeleteFileReferences).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(auto refs, GetJsonValue>( + task_json, kDeleteFileReferences)); + for (int32_t ref : refs) { + if (ref < 0 || static_cast(ref) >= delete_files.size()) { + return JsonParseError( + "delete-file-references index {} is out of range (delete_files size: {})", + ref, delete_files.size()); + } + task_delete_files.push_back(delete_files[ref]); + } + } + + std::shared_ptr residual_filter; + if (task_json.contains(kResidualFilter) && !task_json.at(kResidualFilter).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(auto filter_json, + GetJsonValue(task_json, kResidualFilter)); + ICEBERG_ASSIGN_OR_RAISE(residual_filter, ExpressionFromJson(filter_json)); + } + + file_scan_tasks.push_back( + std::make_shared(std::make_shared(std::move(data_file)), + std::move(task_delete_files), + std::move(residual_filter))); + } + return file_scan_tasks; +} + +nlohmann::json ToJson(const DataFile& df) { + nlohmann::json json; + json[kContent] = ToString(df.content); + json[kFilePath] = df.file_path; + json[kFileFormat] = ToString(df.file_format); + + if (df.partition_spec_id.has_value()) { + json[kSpecId] = df.partition_spec_id.value(); + } + + json[kRecordCount] = df.record_count; + json[kFileSizeInBytes] = df.file_size_in_bytes; + + auto write_int_map = [&](std::string_view key, + const std::map& m) { + if (!m.empty()) { + std::vector keys; + std::vector values; + for (const auto& [k, v] : m) { + keys.push_back(k); + values.push_back(v); + } + json[key] = {{"keys", std::move(keys)}, {"values", std::move(values)}}; + } + }; + + write_int_map(kColumnSizes, df.column_sizes); + write_int_map(kValueCounts, df.value_counts); + write_int_map(kNullValueCounts, df.null_value_counts); + write_int_map(kNanValueCounts, df.nan_value_counts); + + auto write_binary_map = [&](std::string_view key, + const std::map>& m) { + if (!m.empty()) { + std::vector keys; + std::vector> values; + for (const auto& [k, v] : m) { + keys.push_back(k); + values.push_back(v); + } + json[key] = {{"keys", std::move(keys)}, {"values", std::move(values)}}; + } + }; + + write_binary_map(kLowerBounds, df.lower_bounds); + write_binary_map(kUpperBounds, df.upper_bounds); + + if (!df.key_metadata.empty()) { + json[kKeyMetadata] = df.key_metadata; + } + if (!df.split_offsets.empty()) { + json[kSplitOffsets] = df.split_offsets; + } + if (!df.equality_ids.empty()) { + json[kEqualityIds] = df.equality_ids; + } + if (df.sort_order_id.has_value()) { + json[kSortOrderId] = df.sort_order_id.value(); + } + if (df.first_row_id.has_value()) { + json[kFirstRowId] = df.first_row_id.value(); + } + if (df.referenced_data_file.has_value()) { + json[kReferencedDataFile] = df.referenced_data_file.value(); + } + if (df.content_offset.has_value()) { + json[kContentOffset] = df.content_offset.value(); + } + if (df.content_size_in_bytes.has_value()) { + json[kContentSizeInBytes] = df.content_size_in_bytes.value(); + } + + return json; +} + +namespace { + +nlohmann::json BaseScanTaskResponseToJson(const BaseScanTaskResponse& response) { + nlohmann::json json; + + SetContainerField(json, kPlanTasks, response.plan_tasks); + + // Build delete_files array and a pointer-to-index map for reference lookup. + std::unordered_map delete_file_index; + nlohmann::json delete_files_json = nlohmann::json::array(); + for (size_t i = 0; i < response.delete_files.size(); ++i) { + if (response.delete_files[i]) { + delete_files_json.push_back(ToJson(*response.delete_files[i])); + delete_file_index[response.delete_files[i].get()] = static_cast(i); + } + } + if (!delete_files_json.empty()) { + json[kDeleteFiles] = std::move(delete_files_json); + } + + nlohmann::json tasks_json = nlohmann::json::array(); + for (const auto& task : response.file_scan_tasks) { + if (!task) continue; + nlohmann::json task_json; + if (task->data_file()) { + task_json[kDataFile] = ToJson(*task->data_file()); + } + if (!task->delete_files().empty()) { + std::vector refs; + for (const auto& df : task->delete_files()) { + auto it = delete_file_index.find(df.get()); + if (it != delete_file_index.end()) { + refs.push_back(it->second); + } + } + if (!refs.empty()) { + task_json[kDeleteFileReferences] = std::move(refs); + } + } + tasks_json.push_back(std::move(task_json)); + } + if (!tasks_json.empty()) { + json[kFileScanTasks] = std::move(tasks_json); + } + + return json; +} + +Status BaseScanTaskResponseFromJson( + const nlohmann::json& json, BaseScanTaskResponse* response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + // 1. plan_tasks + ICEBERG_ASSIGN_OR_RAISE( + response->plan_tasks, + GetJsonValueOrDefault>(json, kPlanTasks)); + + // 2. delete_files + ICEBERG_ASSIGN_OR_RAISE( + auto delete_files_json, + GetJsonValueOrDefault(json, kDeleteFiles, nlohmann::json::array())); + for (const auto& entry_json : delete_files_json) { + ICEBERG_ASSIGN_OR_RAISE(auto delete_file, + DataFileFromJson(entry_json, partition_specs_by_id, schema)); + response->delete_files.push_back(std::make_shared(std::move(delete_file))); + } + + // 3. file_scan_tasks + ICEBERG_ASSIGN_OR_RAISE(auto file_scan_tasks_json, + GetJsonValueOrDefault(json, kFileScanTasks, + nlohmann::json::array())); + ICEBERG_ASSIGN_OR_RAISE( + response->file_scan_tasks, + FileScanTasksFromJson(file_scan_tasks_json, response->delete_files, + partition_specs_by_id, schema)); + return {}; +} } // namespace @@ -506,6 +897,104 @@ Result OAuthTokenResponseFromJson(const nlohmann::json& json return response; } +Result ToJson(const PlanTableScanRequest& request) { + nlohmann::json json; + if (request.snapshot_id.has_value()) { + json[kSnapshotId] = request.snapshot_id.value(); + } + if (!request.select.empty()) { + json[kSelect] = request.select; + } + if (request.filter) { + ICEBERG_ASSIGN_OR_RAISE(auto filter_json, iceberg::ToJson(*request.filter)); + json[kFilter] = std::move(filter_json); + } + json[kCaseSensitive] = request.case_sensitive; + json[kUseSnapshotSchema] = request.use_snapshot_schema; + if (request.start_snapshot_id.has_value()) { + json[kStartSnapshotId] = request.start_snapshot_id.value(); + } + if (request.end_snapshot_id.has_value()) { + json[kEndSnapshotId] = request.end_snapshot_id.value(); + } + if (!request.stats_fields.empty()) { + json[kStatsFields] = request.stats_fields; + } + if (request.min_rows_requested.has_value()) { + json[kMinRowsRequested] = request.min_rows_requested.value(); + } + return json; +} + +nlohmann::json ToJson(const FetchScanTasksRequest& request) { + nlohmann::json json; + json[kPlanTask] = request.planTask; + return json; +} + +Result PlanTableScanResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + PlanTableScanResponse response; + ICEBERG_ASSIGN_OR_RAISE(auto plan_status_str, + GetJsonValue(json, kPlanStatus)); + ICEBERG_ASSIGN_OR_RAISE(response.plan_status, PlanStatusFromString(plan_status_str)); + ICEBERG_ASSIGN_OR_RAISE(response.plan_id, + GetJsonValueOrDefault(json, kPlanId)); + ICEBERG_RETURN_UNEXPECTED( + BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + +Result FetchPlanningResultResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + FetchPlanningResultResponse response; + ICEBERG_ASSIGN_OR_RAISE(auto plan_status_str, + GetJsonValue(json, kPlanStatus)); + ICEBERG_ASSIGN_OR_RAISE(response.plan_status, PlanStatusFromString(plan_status_str)); + ICEBERG_RETURN_UNEXPECTED( + BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + +Result FetchScanTasksResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + FetchScanTasksResponse response; + ICEBERG_RETURN_UNEXPECTED( + BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + +nlohmann::json ToJson(const PlanTableScanResponse& response) { + nlohmann::json json = BaseScanTaskResponseToJson(response); + json[kPlanStatus] = ToString(response.plan_status); + if (!response.plan_id.empty()) { + json[kPlanId] = response.plan_id; + } + return json; +} + +nlohmann::json ToJson(const FetchPlanningResultResponse& response) { + nlohmann::json json = BaseScanTaskResponseToJson(response); + json[kPlanStatus] = ToString(response.plan_status); + return json; +} + +nlohmann::json ToJson(const FetchScanTasksResponse& response) { + return BaseScanTaskResponseToJson(response); +} + #define ICEBERG_DEFINE_FROM_JSON(Model) \ template <> \ Result FromJson(const nlohmann::json& json) { \ diff --git a/src/iceberg/catalog/rest/json_serde_internal.h b/src/iceberg/catalog/rest/json_serde_internal.h index 820e077d7..0726d0afc 100644 --- a/src/iceberg/catalog/rest/json_serde_internal.h +++ b/src/iceberg/catalog/rest/json_serde_internal.h @@ -19,11 +19,19 @@ #pragma once +#include +#include +#include + #include #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/catalog/rest/types.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" #include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/table_scan.h" /// \file iceberg/catalog/rest/json_serde_internal.h /// JSON serialization and deserialization for Iceberg REST Catalog API types. @@ -62,4 +70,43 @@ ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse) #undef ICEBERG_DECLARE_JSON_SERDE +ICEBERG_REST_EXPORT Result PlanTableScanResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result +FetchPlanningResultResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result FetchScanTasksResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result ToJson(const PlanTableScanRequest& request); +ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksRequest& request); + +ICEBERG_REST_EXPORT nlohmann::json ToJson(const DataFile& df); + +ICEBERG_REST_EXPORT Result DataFileFromJson( + const nlohmann::json& json, + const std::unordered_map>& partition_spec_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result>> FileScanTasksFromJson( + const nlohmann::json& json, + const std::vector>& delete_files, + const std::unordered_map>& partition_spec_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT nlohmann::json ToJson(const PlanTableScanResponse& response); +ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchPlanningResultResponse& response); +ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksResponse& response); + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/resource_paths.cc b/src/iceberg/catalog/rest/resource_paths.cc index f86a75ec0..ac8c52255 100644 --- a/src/iceberg/catalog/rest/resource_paths.cc +++ b/src/iceberg/catalog/rest/resource_paths.cc @@ -20,6 +20,7 @@ #include "iceberg/catalog/rest/resource_paths.h" #include +#include #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/table_identifier.h" @@ -102,4 +103,24 @@ Result ResourcePaths::CommitTransaction() const { return std::format("{}/v1/{}transactions/commit", base_uri_, prefix_); } +Result ResourcePaths::Plan(const TableIdentifier& ident, + std::optional plan_id) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + if (plan_id.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_plan_id, EncodeString(plan_id.value())); + return std::format("{}/v1/{}namespaces/{}/tables/{}/plan/{}", base_uri_, prefix_, + encoded_namespace, encoded_table_name, encoded_plan_id); + } + return std::format("{}/v1/{}namespaces/{}/tables/{}/plan", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + +Result ResourcePaths::FetchScanTasks(const TableIdentifier& ident) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + return std::format("{}/v1/{}namespaces/{}/tables/{}/tasks", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/resource_paths.h b/src/iceberg/catalog/rest/resource_paths.h index 1b502aaa7..86cd7d5d3 100644 --- a/src/iceberg/catalog/rest/resource_paths.h +++ b/src/iceberg/catalog/rest/resource_paths.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include "iceberg/catalog/rest/iceberg_rest_export.h" @@ -81,6 +82,15 @@ class ICEBERG_REST_EXPORT ResourcePaths { /// \brief Get the /v1/{prefix}/transactions/commit endpoint path. Result CommitTransaction() const; + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan endpoint + /// path, or /plan/{plan_id} if plan_id is provided. + Result Plan(const TableIdentifier& ident, + std::optional plan_id = std::nullopt) const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks endpoint + /// path. + Result FetchScanTasks(const TableIdentifier& ident) const; + private: ResourcePaths(std::string base_uri, const std::string& prefix); diff --git a/src/iceberg/catalog/rest/types.cc b/src/iceberg/catalog/rest/types.cc index 3abfb1406..53d057d9f 100644 --- a/src/iceberg/catalog/rest/types.cc +++ b/src/iceberg/catalog/rest/types.cc @@ -118,6 +118,82 @@ bool CommitTableResponse::operator==(const CommitTableResponse& other) const { return true; } +bool PlanTableScanRequest::operator==(const PlanTableScanRequest& other) const { + return snapshot_id == other.snapshot_id && select == other.select && + filter == other.filter && case_sensitive == other.case_sensitive && + use_snapshot_schema == other.use_snapshot_schema && + start_snapshot_id == other.start_snapshot_id && + end_snapshot_id == other.end_snapshot_id && stats_fields == other.stats_fields && + min_rows_requested == other.min_rows_requested; +} + +bool BaseScanTaskResponse::operator==(const BaseScanTaskResponse& other) const { + if (plan_tasks != other.plan_tasks) { + return false; + } + if (delete_files.size() != other.delete_files.size()) { + return false; + } + for (size_t i = 0; i < delete_files.size(); ++i) { + if (!delete_files[i] != !other.delete_files[i]) { + return false; + } + if (delete_files[i] && *delete_files[i] != *other.delete_files[i]) { + return false; + } + } + if (file_scan_tasks.size() != other.file_scan_tasks.size()) { + return false; + } + for (size_t i = 0; i < file_scan_tasks.size(); ++i) { + const auto& a = file_scan_tasks[i]; + const auto& b = other.file_scan_tasks[i]; + if (!a != !b) { + return false; + } + if (!a) continue; + if (!a->data_file() != !b->data_file()) { + return false; + } + if (a->data_file() && *a->data_file() != *b->data_file()) { + return false; + } + if (a->delete_files().size() != b->delete_files().size()) { + return false; + } + for (size_t j = 0; j < a->delete_files().size(); ++j) { + if (!a->delete_files()[j] != !b->delete_files()[j]) { + return false; + } + if (a->delete_files()[j] && *a->delete_files()[j] != *b->delete_files()[j]) { + return false; + } + } + if (a->residual_filter() != b->residual_filter()) { + return false; + } + } + return true; +} + +bool PlanTableScanResponse::operator==(const PlanTableScanResponse& other) const { + return BaseScanTaskResponse::operator==(other) && plan_status == other.plan_status && + plan_id == other.plan_id; +} + +bool FetchPlanningResultResponse::operator==( + const FetchPlanningResultResponse& other) const { + return BaseScanTaskResponse::operator==(other) && plan_status == other.plan_status; +} + +bool FetchScanTasksRequest::operator==(const FetchScanTasksRequest& other) const { + return planTask == other.planTask; +} + +bool FetchScanTasksResponse::operator==(const FetchScanTasksResponse& other) const { + return BaseScanTaskResponse::operator==(other); +} + Status OAuthTokenResponse::Validate() const { if (access_token.empty()) { return ValidationFailed("OAuth2 token response missing required 'access_token'"); @@ -135,4 +211,84 @@ Status OAuthTokenResponse::Validate() const { return {}; } +Status PlanTableScanRequest::Validate() const { + if (snapshot_id.has_value()) { + if (start_snapshot_id.has_value() || end_snapshot_id.has_value()) { + return ValidationFailed( + "Invalid scan: cannot provide both snapshotId and " + "startSnapshotId/endSnapshotId"); + } + } + if (start_snapshot_id.has_value() || end_snapshot_id.has_value()) { + if (!start_snapshot_id.has_value() || !end_snapshot_id.has_value()) { + return ValidationFailed( + "Invalid incremental scan: startSnapshotId and endSnapshotId is required"); + } + } + if (min_rows_requested.has_value() && min_rows_requested.value() < 0) { + return ValidationFailed("Invalid scan: minRowsRequested is negative"); + } + return {}; +} + +Status PlanTableScanResponse::Validate() const { + if (plan_status == PlanStatus::kSubmitted && plan_id.empty()) { + return ValidationFailed( + "Invalid response: plan id should be defined when status is 'submitted'"); + } + if (plan_status == PlanStatus::kCancelled) { + return ValidationFailed( + "Invalid response: 'cancelled' is not a valid status for planTableScan"); + } + if (plan_status != PlanStatus::kCompleted && (!plan_tasks.empty() || !file_scan_tasks.empty())) { + return ValidationFailed( + "Invalid response: tasks can only be defined when status is 'completed'"); + } + if (!plan_id.empty() && plan_status != PlanStatus::kSubmitted && + plan_status != PlanStatus::kCompleted) { + return ValidationFailed( + "Invalid response: plan id can only be defined when status is 'submitted' or " + "'completed'"); + } + if (file_scan_tasks.empty() && !delete_files.empty()) { + return ValidationFailed( + "Invalid response: deleteFiles should only be returned with fileScanTasks that " + "reference them"); + } + return {}; +} + +Status FetchPlanningResultResponse::Validate() const { + if (plan_status != PlanStatus::kCompleted && (!plan_tasks.empty() || !file_scan_tasks.empty())) { + return ValidationFailed( + "Invalid response: tasks can only be returned in a 'completed' status"); + } + if (file_scan_tasks.empty() && !delete_files.empty()) { + return ValidationFailed( + "Invalid response: deleteFiles should only be returned with fileScanTasks that " + "reference them"); + } + return {}; +} + +Status FetchScanTasksRequest::Validate() const { + if (planTask.empty()) { + return ValidationFailed("Invalid planTask: null"); + } + return {}; +} + +Status FetchScanTasksResponse::Validate() const { + if (file_scan_tasks.empty() && !delete_files.empty()) { + return ValidationFailed( + "Invalid response: deleteFiles should only be returned with fileScanTasks that " + "reference them"); + } + if (plan_tasks.empty() && file_scan_tasks.empty()) { + return ValidationFailed( + "Invalid response: planTasks and fileScanTask cannot both be null"); + } + return {}; +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h index 6495a6517..3e735cdcb 100644 --- a/src/iceberg/catalog/rest/types.h +++ b/src/iceberg/catalog/rest/types.h @@ -23,14 +23,18 @@ #include #include #include +#include #include #include #include "iceberg/catalog/rest/endpoint.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/expression/expression.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/table_identifier.h" +#include "iceberg/table_scan.h" #include "iceberg/type_fwd.h" #include "iceberg/util/macros.h" @@ -295,4 +299,73 @@ struct ICEBERG_REST_EXPORT OAuthTokenResponse { bool operator==(const OAuthTokenResponse&) const = default; }; +/// \brief Request to initiate a server-side scan planning operation. +struct ICEBERG_REST_EXPORT PlanTableScanRequest { + std::optional snapshot_id; + std::vector select; + std::shared_ptr filter; + bool case_sensitive = true; + bool use_snapshot_schema = false; + std::optional start_snapshot_id; + std::optional end_snapshot_id; + std::vector stats_fields; + std::optional min_rows_requested; + + Status Validate() const; + + bool operator==(const PlanTableScanRequest&) const; +}; + +/// \brief Base response containing scan tasks and delete files returned by scan plan +/// endpoints. +struct ICEBERG_REST_EXPORT BaseScanTaskResponse { + std::vector plan_tasks; + std::vector> file_scan_tasks; + std::vector> delete_files; + // std::unordered_map specsById; + + Status Validate() const { return {}; }; + + bool operator==(const BaseScanTaskResponse&) const; +}; + +/// \brief Response from initiating a scan planning operation, including plan status and +/// initial scan tasks. +struct ICEBERG_REST_EXPORT PlanTableScanResponse : BaseScanTaskResponse { + PlanStatus plan_status = PlanStatus::kCompleted; + std::string plan_id; + // TODO(sandeepg): Add credentials. + + Status Validate() const; + + bool operator==(const PlanTableScanResponse&) const; +}; + +/// \brief Response from polling an asynchronous scan plan, including current status and +/// available scan tasks. +struct ICEBERG_REST_EXPORT FetchPlanningResultResponse : BaseScanTaskResponse { + PlanStatus plan_status = PlanStatus::kCompleted; + // TODO(sandeepg): Add credentials. + + Status Validate() const; + + bool operator==(const FetchPlanningResultResponse&) const; +}; + +/// \brief Request to fetch the scan tasks for a given plan task token. +struct ICEBERG_REST_EXPORT FetchScanTasksRequest { + std::string planTask; + + Status Validate() const; + + bool operator==(const FetchScanTasksRequest&) const; +}; + +/// \brief Response containing the file scan tasks for a given plan task token. +struct ICEBERG_REST_EXPORT FetchScanTasksResponse : BaseScanTaskResponse { + Status Validate() const; + + bool operator==(const FetchScanTasksResponse&) const; +}; + } // namespace iceberg::rest diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 765508705..98246303d 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -48,6 +48,8 @@ enum class ErrorKind { kJsonParseError, kNamespaceNotEmpty, kNoSuchNamespace, + kNoSuchPlanId, + kNoSuchPlanTask, kNoSuchTable, kNoSuchView, kNotAllowed, @@ -111,6 +113,8 @@ DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NamespaceNotEmpty) DEFINE_ERROR_FUNCTION(NoSuchNamespace) +DEFINE_ERROR_FUNCTION(NoSuchPlanId) +DEFINE_ERROR_FUNCTION(NoSuchPlanTask) DEFINE_ERROR_FUNCTION(NoSuchTable) DEFINE_ERROR_FUNCTION(NoSuchView) DEFINE_ERROR_FUNCTION(NotAllowed) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index ed2ede707..96c2ad4f2 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -765,4 +765,22 @@ IncrementalChangelogScan::PlanFiles(std::optional from_snapshot_id_excl return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented"); } +std::string_view ToString(PlanStatus status) { + switch (status) { + case PlanStatus::kSubmitted: return "submitted"; + case PlanStatus::kCompleted: return "completed"; + case PlanStatus::kCancelled: return "cancelled"; + case PlanStatus::kFailed: return "failed"; + } + return "unknown"; +} + +Result PlanStatusFromString(std::string_view status_str) { + if (status_str == "submitted") return PlanStatus::kSubmitted; + if (status_str == "completed") return PlanStatus::kCompleted; + if (status_str == "cancelled") return PlanStatus::kCancelled; + if (status_str == "failed") return PlanStatus::kFailed; + return JsonParseError("Unknown plan status: {}", status_str); +} + } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index b21adcca6..506baba4d 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,17 @@ namespace iceberg { +/// \brief Status of a server-side scan planning operation. +enum class PlanStatus { + kSubmitted, + kCompleted, + kCancelled, + kFailed, +}; + +ICEBERG_EXPORT std::string_view ToString(PlanStatus status); +ICEBERG_EXPORT Result PlanStatusFromString(std::string_view status_str); + /// \brief An abstract scan task. class ICEBERG_EXPORT ScanTask { public: diff --git a/src/iceberg/test/rest_json_serde_test.cc b/src/iceberg/test/rest_json_serde_test.cc index 9da052e6a..4bc650e6f 100644 --- a/src/iceberg/test/rest_json_serde_test.cc +++ b/src/iceberg/test/rest_json_serde_test.cc @@ -25,12 +25,16 @@ #include "iceberg/catalog/rest/json_serde_internal.h" #include "iceberg/catalog/rest/types.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" +#include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" #include "iceberg/table_requirement.h" +#include "iceberg/table_scan.h" #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" @@ -1380,4 +1384,459 @@ INSTANTIATE_TEST_SUITE_P( return info.param.test_name; }); +// Helper: empty schema and specs for scan response tests that don't need partition +// parsing. +static Schema EmptySchema() { return Schema({}, 0); } +static std::unordered_map> EmptySpecs() { + return {}; +} + +// --- PlanTableScanResponse --- + +TEST(PlanTableScanResponseFromJsonTest, SubmittedStatusMissingOptionalFields) { + // "submitted" response: only status and plan-id, no tasks + auto json = nlohmann::json::parse(R"({"status":"submitted","plan-id":"abc-123"})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kSubmitted); + EXPECT_EQ(result->plan_id, "abc-123"); + EXPECT_TRUE(result->plan_tasks.empty()); + EXPECT_TRUE(result->file_scan_tasks.empty()); + EXPECT_TRUE(result->delete_files.empty()); +} + +TEST(PlanTableScanResponseFromJsonTest, CompletedStatusWithPlanTasks) { + // "completed" response with plan-tasks but no file-scan-tasks + auto json = nlohmann::json::parse( + R"({"status":"completed","plan-id":"abc-123","plan-tasks":["task-1","task-2"],"delete-files":[],"file-scan-tasks":[]})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kCompleted); + EXPECT_EQ(result->plan_id, "abc-123"); + ASSERT_EQ(result->plan_tasks.size(), 2); + EXPECT_EQ(result->plan_tasks[0], "task-1"); + EXPECT_EQ(result->plan_tasks[1], "task-2"); +} + +TEST(PlanTableScanResponseFromJsonTest, MissingRequiredStatus) { + auto json = nlohmann::json::parse(R"({"plan-id":"abc-123"})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Missing 'status'")); +} + +TEST(PlanTableScanResponseFromJsonTest, MissingPlanIdDefaultsToEmptyForFailedStatus) { + // plan-id is optional for non-submitted/completed statuses + auto json = nlohmann::json::parse(R"({"status":"failed"})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_TRUE(result->plan_id.empty()); +} + +// --- FetchPlanningResultResponse --- + +TEST(FetchPlanningResultResponseFromJsonTest, SubmittedStatusNoTasks) { + auto json = nlohmann::json::parse(R"({"status":"submitted"})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kSubmitted); + EXPECT_TRUE(result->plan_tasks.empty()); + EXPECT_TRUE(result->file_scan_tasks.empty()); + EXPECT_TRUE(result->delete_files.empty()); +} + +TEST(FetchPlanningResultResponseFromJsonTest, CompletedStatusWithPlanTasks) { + auto json = nlohmann::json::parse( + R"({"status":"completed","plan-tasks":["task-1"],"delete-files":[],"file-scan-tasks":[]})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kCompleted); + ASSERT_EQ(result->plan_tasks.size(), 1); + EXPECT_EQ(result->plan_tasks[0], "task-1"); +} + +TEST(FetchPlanningResultResponseFromJsonTest, MissingRequiredStatus) { + auto json = nlohmann::json::parse(R"({})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Missing 'status'")); +} + +// --- FetchScanTasksResponse --- + +TEST(FetchScanTasksResponseFromJsonTest, WithFileScanTasks) { + // One file scan task with a data file and one delete file referenced by index. + auto json = nlohmann::json::parse(R"({ + "plan-tasks": [], + "delete-files": [ + { + "content": "position_deletes", + "file-path": "s3://bucket/deletes/delete.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 512, + "record-count": 5 + } + ], + "file-scan-tasks": [ + { + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 12345, + "record-count": 100 + }, + "delete-file-references": [0] + } + ] + })"); + auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_TRUE(result->plan_tasks.empty()); + ASSERT_EQ(result->delete_files.size(), 1); + ASSERT_EQ(result->file_scan_tasks.size(), 1); + EXPECT_EQ(result->file_scan_tasks[0]->data_file()->file_path, + "s3://bucket/data/file.parquet"); + ASSERT_EQ(result->file_scan_tasks[0]->delete_files().size(), 1); + EXPECT_EQ(result->file_scan_tasks[0]->delete_files()[0]->file_path, + "s3://bucket/deletes/delete.parquet"); +} + +TEST(FetchScanTasksResponseFromJsonTest, WithPlanTasksOnly) { + auto json = nlohmann::json::parse( + R"({"plan-tasks":["task-1","task-2"],"delete-files":[],"file-scan-tasks":[]})"); + auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + ASSERT_EQ(result->plan_tasks.size(), 2); + EXPECT_EQ(result->plan_tasks[0], "task-1"); + EXPECT_TRUE(result->file_scan_tasks.empty()); +} + +TEST(FetchScanTasksResponseFromJsonTest, AllFieldsMissing) { + // Both plan-tasks and file-scan-tasks absent → Validate() should fail + auto json = nlohmann::json::parse(R"({})"); + auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsError(ErrorKind::kValidationFailed)); +} + +// --- DataFileFromJson --- + +TEST(DataFileFromJsonTest, RequiredFieldsOnly) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 12345, + "record-count": 100 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.content, DataFile::Content::kData); + EXPECT_EQ(df.file_path, "s3://bucket/data/file.parquet"); + EXPECT_EQ(df.file_format, FileFormatType::kParquet); + EXPECT_EQ(df.file_size_in_bytes, 12345); + EXPECT_EQ(df.record_count, 100); + EXPECT_TRUE(df.column_sizes.empty()); + EXPECT_FALSE(df.sort_order_id.has_value()); + EXPECT_FALSE(df.partition_spec_id.has_value()); +} + +TEST(DataFileFromJsonTest, LowercaseFormat) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.avro", + "file-format": "avro", + "file-size-in-bytes": 500, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value().content, DataFile::Content::kData); + EXPECT_EQ(result.value().file_format, FileFormatType::kAvro); +} + +TEST(DataFileFromJsonTest, WithOptionalFields) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 1, + "file-size-in-bytes": 12345, + "record-count": 100, + "column-sizes": {"keys": [1, 2], "values": [1000, 2000]}, + "value-counts": {"keys": [1, 2], "values": [100, 100]}, + "null-value-counts": {"keys": [1], "values": [0]}, + "nan-value-counts": {"keys": [2], "values": [5]}, + "split-offsets": [0, 4096], + "sort-order-id": 0 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.partition_spec_id, 1); + ASSERT_EQ(df.column_sizes.size(), 2U); + EXPECT_EQ(df.column_sizes.at(1), 1000); + EXPECT_EQ(df.column_sizes.at(2), 2000); + ASSERT_EQ(df.value_counts.size(), 2U); + EXPECT_EQ(df.value_counts.at(1), 100); + ASSERT_EQ(df.null_value_counts.size(), 1U); + EXPECT_EQ(df.null_value_counts.at(1), 0); + ASSERT_EQ(df.nan_value_counts.size(), 1U); + EXPECT_EQ(df.nan_value_counts.at(2), 5); + ASSERT_EQ(df.split_offsets.size(), 2U); + EXPECT_EQ(df.split_offsets[0], 0); + EXPECT_EQ(df.split_offsets[1], 4096); + EXPECT_EQ(df.sort_order_id, 0); +} + +TEST(DataFileFromJsonTest, EqualityDeleteFile) { + auto json = R"({ + "content": "equality_deletes", + "file-path": "s3://bucket/deletes/eq_delete.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 5000, + "record-count": 50, + "equality-ids": [1, 2] + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.content, DataFile::Content::kEqualityDeletes); + ASSERT_EQ(df.equality_ids.size(), 2U); + EXPECT_EQ(df.equality_ids[0], 1); + EXPECT_EQ(df.equality_ids[1], 2); +} + +TEST(DataFileFromJsonTest, PositionDeleteFileWithReferencedDataFile) { + auto json = R"({ + "content": "position_deletes", + "file-path": "s3://bucket/deletes/pos_delete.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 3000, + "record-count": 20, + "referenced-data-file": "s3://bucket/data/file.parquet" + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.content, DataFile::Content::kPositionDeletes); + ASSERT_TRUE(df.referenced_data_file.has_value()); + EXPECT_EQ(df.referenced_data_file.value(), "s3://bucket/data/file.parquet"); +} + +TEST(DataFileFromJsonTest, InvalidContentType) { + auto json = R"({ + "content": "UNKNOWN", + "file-path": "s3://bucket/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 100, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Unknown data file content")); +} + +TEST(DataFileFromJsonTest, MissingRequiredField) { + auto json = R"({ + "content": "data", + "file-format": "PARQUET", + "file-size-in-bytes": 100, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); +} + +TEST(DataFileFromJsonTest, NotAnObject) { + auto result = DataFileFromJson(nlohmann::json::array(), {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("DataFile must be a JSON object")); +} + +// --- FileScanTasksFromJson --- + +TEST(FileScanTasksFromJsonTest, EmptyArray) { + auto result = FileScanTasksFromJson(nlohmann::json::array(), {}, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_TRUE(result.value().empty()); +} + +TEST(FileScanTasksFromJsonTest, SingleTaskNoDeleteFiles) { + auto json = R"([{ + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 12345, + "record-count": 100 + } + }])"_json; + + auto result = FileScanTasksFromJson(json, {}, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + ASSERT_EQ(result.value().size(), 1U); + const auto& task = result.value()[0]; + ASSERT_NE(task->data_file(), nullptr); + EXPECT_EQ(task->data_file()->file_path, "s3://bucket/data/file.parquet"); + EXPECT_TRUE(task->delete_files().empty()); + EXPECT_EQ(task->residual_filter(), nullptr); +} + +TEST(FileScanTasksFromJsonTest, TaskWithDeleteFileReferences) { + DataFile delete_file; + delete_file.content = DataFile::Content::kPositionDeletes; + delete_file.file_path = "s3://bucket/deletes/pos_delete.parquet"; + delete_file.file_format = FileFormatType::kParquet; + delete_file.file_size_in_bytes = 1000; + delete_file.record_count = 5; + + auto json = R"([{ + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 12345, + "record-count": 100 + }, + "delete-file-references": [0] + }])"_json; + + auto result = + FileScanTasksFromJson(json, {std::make_shared(delete_file)}, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + ASSERT_EQ(result.value().size(), 1U); + const auto& task = result.value()[0]; + ASSERT_EQ(task->delete_files().size(), 1U); + EXPECT_EQ(task->delete_files()[0]->file_path, "s3://bucket/deletes/pos_delete.parquet"); +} + +TEST(FileScanTasksFromJsonTest, DeleteFileReferenceOutOfRange) { + auto json = R"([{ + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 100, + "record-count": 10 + }, + "delete-file-references": [5] + }])"_json; + + auto result = FileScanTasksFromJson(json, {}, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("out of range")); +} + +TEST(FileScanTasksFromJsonTest, NotAnArray) { + auto result = FileScanTasksFromJson(nlohmann::json::object(), {}, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("non-array")); +} + +// --- Roundtrip tests --- + +TEST(DataFileRoundtripTest, RequiredFieldsOnly) { + DataFile df; + df.content = DataFile::Content::kData; + df.file_path = "s3://bucket/data/file.parquet"; + df.file_format = FileFormatType::kParquet; + df.file_size_in_bytes = 12345; + df.record_count = 100; + + auto json = ToJson(df); + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value(), df); +} + +TEST(DataFileRoundtripTest, WithOptionalFields) { + DataFile df; + df.content = DataFile::Content::kPositionDeletes; + df.file_path = "s3://bucket/deletes/pos.parquet"; + df.file_format = FileFormatType::kParquet; + df.file_size_in_bytes = 5000; + df.record_count = 50; + df.partition_spec_id = 1; + df.column_sizes = {{1, 1000}, {2, 2000}}; + df.value_counts = {{1, 100}, {2, 100}}; + df.null_value_counts = {{1, 0}}; + df.nan_value_counts = {{2, 5}}; + df.split_offsets = {0, 4096}; + df.sort_order_id = 0; + df.referenced_data_file = "s3://bucket/data/file.parquet"; + + auto json = ToJson(df); + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value(), df); +} + +TEST(FetchScanTasksResponseRoundtripTest, WithFileScanTasksAndDeleteFiles) { + auto json = nlohmann::json::parse(R"({ + "plan-tasks": [], + "delete-files": [ + { + "content": "position_deletes", + "file-path": "s3://bucket/deletes/delete.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 512, + "record-count": 5 + } + ], + "file-scan-tasks": [ + { + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 12345, + "record-count": 100 + }, + "delete-file-references": [0] + } + ] + })"); + + auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + + auto roundtrip_json = ToJson(*result); + auto result2 = FetchScanTasksResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result2, IsOk()); + EXPECT_EQ(*result, *result2); +} + +TEST(PlanTableScanResponseRoundtripTest, SubmittedStatus) { + auto json = nlohmann::json::parse(R"({"status": "submitted", "plan-id": "abc-123"})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + + auto roundtrip_json = ToJson(*result); + auto result2 = PlanTableScanResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result2, IsOk()); + EXPECT_EQ(*result, *result2); +} + +TEST(FetchPlanningResultResponseRoundtripTest, CompletedWithPlanTasks) { + auto json = + nlohmann::json::parse(R"({"status": "completed", "plan-tasks": ["task-1", "task-2"]})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + + auto roundtrip_json = ToJson(*result); + auto result2 = + FetchPlanningResultResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result2, IsOk()); + EXPECT_EQ(*result, *result2); +} + } // namespace iceberg::rest