diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000..908383e --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,60 @@ +# Sparrow-IPC AI Agent Instructions + +C++20 library for Arrow IPC serialization/deserialization using FlatBuffers. See [../examples/write_and_read_streams.cpp](../examples/write_and_read_streams.cpp) for usage patterns. + +## Architecture + +- **Serialization**: `record_batch` → `serializer` → FlatBuffer metadata + body → stream (continuation bytes + length + message + padding + data) +- **Deserialization**: Binary stream → `extract_encapsulated_message()` → parse FlatBuffer → reconstruct `record_batch` +- **Critical**: All record batches in a stream must have identical schemas (validated in `serialize_record_batches_to_ipc_stream`) +- **Memory model**: Deserialized arrays use `std::span` - source buffer must outlive arrays + +## Build System + +**Dependency fetching** (unique pattern in `cmake/external_dependencies.cmake`): +- `FETCH_DEPENDENCIES_WITH_CMAKE=OFF` - require via `find_package()` (CI default) +- `FETCH_DEPENDENCIES_WITH_CMAKE=MISSING` - auto-fetch missing (local dev) +- All binaries/libs → `${CMAKE_BINARY_DIR}/bin/${CMAKE_BUILD_TYPE}` (not standard locations) + +**FlatBuffer schemas**: Auto-downloaded from Apache Arrow during configure → `${CMAKE_BINARY_DIR}/generated/*_generated.h`. Never edit generated headers. + +**Build**: +```bash +mamba env create -f environment-dev.yml && mamba activate sparrow-ipc +cmake -B build -DCMAKE_INSTALL_PREFIX=$CONDA_PREFIX -DCMAKE_PREFIX_PATH=$CONDA_PREFIX -DSPARROW_IPC_BUILD_TESTS=ON +cmake --build build -j12 +cmake --build build --target run_tests +``` + +## Platform-Specific Patterns + +**Linux executables linking sparrow-ipc**: Must set RPATH (libs in same dir): +```cmake +set_target_properties(my_exe PROPERTIES + BUILD_RPATH_USE_ORIGIN ON + BUILD_RPATH "$ORIGIN" + INSTALL_RPATH "$ORIGIN") +``` +See `integration_tests/CMakeLists.txt` for examples. Missing this causes "cannot open shared object file" errors. + +**Windows**: Explicit DLL copying in CMakeLists (see `tests/CMakeLists.txt:32-47`). + +## Testing + +- Arrow test data: Auto-fetched from `apache/arrow-testing`, `.json.gz` files extracted during configure +- Unit tests: `cmake --build build --target run_tests` +- Integration tests: `integration_tests/` tools integrate with Apache Arrow's Archery framework via Docker + +## Naming & Style + +- `snake_case` for everything (types, functions) +- `m_` prefix for members +- Namespace: `sparrow_ipc` +- Format: `cmake --build build --target clang-format` (requires `ACTIVATE_LINTER=ON`) + +## Common Issues + +1. Schema mismatches in stream → `std::invalid_argument` +2. Deallocating source buffer while arrays in use → undefined behavior +3. Missing RPATH on Linux → runtime linking errors +4. Only LZ4 compression supported (not ZSTD yet) diff --git a/.github/workflows/integration_tests.yaml b/.github/workflows/integration_tests.yaml new file mode 100644 index 0000000..0ecd755 --- /dev/null +++ b/.github/workflows/integration_tests.yaml @@ -0,0 +1,65 @@ +name: Integration tests + +on: + workflow_dispatch: + pull_request: + push: + branches: [main] + +jobs: + build_integration_container_and_run_tests: + runs-on: ubuntu-22.04 + steps: + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y mold libpthread-stubs0-dev libboost-thread-dev doctest-dev + + - name: Install specific version of tzdata + run: sudo apt-get install tzdata + + - name: Checkout code + uses: actions/checkout@v5 + + - name: Configure using CMake + run: | + cmake -G Ninja \ + -Bbuild \ + -DCMAKE_BUILD_TYPE:STRING=RELEASE \ + -DSPARROW_IPC_BUILD_INTEGRATION_TESTS=ON \ + -DFETCH_DEPENDENCIES_WITH_CMAKE=MISSING \ + -DSPARROW_IPC_BUILD_SHARED=OFF + + - name: Build arrow_file_to_stream target + working-directory: build + run: cmake --build . --config Release --target arrow_file_to_stream + + - name: Build arrow_stream_to_file target + working-directory: build + run: cmake --build . --config Release --target arrow_stream_to_file + + - name: Build arrow_json_to_file target + working-directory: build + run: cmake --build . --config Release --target arrow_json_to_file + + - name: Build arrow_validate target + working-directory: build + run: cmake --build . --config Release --target arrow_validate + + - name: List all folders and subfolders + run: | + echo "Listing all folders and subfolders:" + find . -type d + + - name: Build Docker image + run: docker build -t sparrow/integration-tests -f ci/docker/integration.dockerfile . + + - name: Run Integration tests + run: | + docker run --rm \ + -e ARCHERY_INTEGRATION_WITH_EXTERNAL_LIBRARY=/workspace/build/bin/RELEASE/ \ + -e ARCHERY_INTEGRATION_EXTERNAL_LIBRARY_IPC_PRODUCER=true \ + -e ARCHERY_INTEGRATION_EXTERNAL_LIBRARY_IPC_CONSUMER=true \ + -v ${{ github.workspace }}:/workspace \ + -w /arrow-integration sparrow/integration-tests \ + "/arrow-integration/ci/scripts/integration_arrow.sh /arrow-integration /build" diff --git a/CMakeLists.txt b/CMakeLists.txt index 47fab25..05e1a76 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,8 +11,6 @@ include(CMakeDependentOption) list(PREPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake") message(DEBUG "CMake module path: ${CMAKE_MODULE_PATH}") -include(external_dependencies) - set(SPARROW_IPC_COMPILE_DEFINITIONS "" CACHE STRING "List of public compile definitions of the sparrow-ipc target") set(SPARROW_IPC_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include) @@ -85,11 +83,27 @@ MESSAGE(STATUS "🔧 Build docs: ${SPARROW_IPC_BUILD_DOCS}") OPTION(SPARROW_IPC_BUILD_EXAMPLES "Build sparrow-ipc examples" OFF) MESSAGE(STATUS "🔧 Build examples: ${SPARROW_IPC_BUILD_EXAMPLES}") +OPTION(SPARROW_IPC_BUILD_INTEGRATION_TESTS "Build sparrow-ipc integration tests" OFF) +MESSAGE(STATUS "🔧 Build integration tests: ${SPARROW_IPC_BUILD_INTEGRATION_TESTS}") + # Code coverage # ============= OPTION(SPARROW_IPC_ENABLE_COVERAGE "Enable sparrow-ipc test coverage" OFF) MESSAGE(STATUS "🔧 Enable coverage: ${SPARROW_IPC_ENABLE_COVERAGE}") +include(external_dependencies) + +# Build +# ===== +set(BINARY_BUILD_DIR "${CMAKE_BINARY_DIR}/bin/${CMAKE_BUILD_TYPE}") + +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_DEBUG "${BINARY_BUILD_DIR}") +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_RELEASE "${BINARY_BUILD_DIR}") +set(CMAKE_LIBRARY_OUTPUT_DIRECTORY_DEBUG "${BINARY_BUILD_DIR}") +set(CMAKE_LIBRARY_OUTPUT_DIRECTORY_RELEASE "${BINARY_BUILD_DIR}") +set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY_DEBUG "${BINARY_BUILD_DIR}") +set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY_RELEASE "${BINARY_BUILD_DIR}") + if(SPARROW_IPC_ENABLE_COVERAGE) include(code_coverage) endif() @@ -284,6 +298,13 @@ if(SPARROW_IPC_BUILD_EXAMPLES) add_subdirectory(examples) endif() +# Integration tests +# ================= +if(SPARROW_IPC_BUILD_INTEGRATION_TESTS) + message(STATUS "🔨 Create integration tests targets") + add_subdirectory(integration_tests) +endif() + # Installation # ============ include(GNUInstallDirs) diff --git a/ci/docker/integration.dockerfile b/ci/docker/integration.dockerfile new file mode 100644 index 0000000..67e9061 --- /dev/null +++ b/ci/docker/integration.dockerfile @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM apache/arrow-dev:amd64-conda-integration + +ENV ARROW_USE_CCACHE=OFF \ + ARROW_CPP_EXE_PATH=/build/cpp/debug \ + BUILD_DOCS_CPP=OFF \ + ARROW_INTEGRATION_CPP=ON \ + ARROW_INTEGRATION_CSHARP=OFF \ + ARROW_INTEGRATION_GO=OFF \ + ARROW_INTEGRATION_JAVA=OFF \ + ARROW_INTEGRATION_JS=OFF \ + ARCHERY_INTEGRATION_WITH_NANOARROW="0" \ + ARCHERY_INTEGRATION_WITH_RUST="0" + +RUN apt update + +RUN apt install build-essential git -y + +# Clone the arrow monorepo // TODO: change to the official repo +RUN git clone --depth 1 --branch archery_supports_external_libraries https://github.com/Alex-PLACET/arrow.git /arrow-integration --recurse-submodules + +# Build all the integrations +RUN conda run --no-capture-output \ + /arrow-integration/ci/scripts/integration_arrow_build.sh \ + /arrow-integration \ + /build diff --git a/cmake/external_dependencies.cmake b/cmake/external_dependencies.cmake index 897b48f..56a4054 100644 --- a/cmake/external_dependencies.cmake +++ b/cmake/external_dependencies.cmake @@ -68,7 +68,7 @@ function(find_package_or_fetch) endfunction() set(SPARROW_BUILD_SHARED ${SPARROW_IPC_BUILD_SHARED}) -if(${SPARROW_IPC_BUILD_TESTS}) +if(${SPARROW_IPC_BUILD_TESTS} OR ${SPARROW_IPC_BUILD_INTEGRATION_TESTS}) set(CREATE_JSON_READER_TARGET ON) endif() find_package_or_fetch( @@ -81,7 +81,7 @@ unset(CREATE_JSON_READER_TARGET) if(NOT TARGET sparrow::sparrow) add_library(sparrow::sparrow ALIAS sparrow) endif() -if(${SPARROW_IPC_BUILD_TESTS}) +if(${SPARROW_IPC_BUILD_TESTS} OR ${SPARROW_IPC_BUILD_INTEGRATION_TESTS}) find_package_or_fetch( PACKAGE_NAME sparrow-json-reader ) @@ -123,7 +123,7 @@ if(NOT TARGET lz4::lz4) add_library(lz4::lz4 ALIAS lz4) endif() -if(SPARROW_IPC_BUILD_TESTS) +if(${SPARROW_IPC_BUILD_TESTS} OR ${SPARROW_IPC_BUILD_INTEGRATION_TESTS}) find_package_or_fetch( PACKAGE_NAME doctest GIT_REPOSITORY https://github.com/doctest/doctest.git diff --git a/include/sparrow_ipc/memory_output_stream.hpp b/include/sparrow_ipc/memory_output_stream.hpp index 27e2e06..a245bd3 100644 --- a/include/sparrow_ipc/memory_output_stream.hpp +++ b/include/sparrow_ipc/memory_output_stream.hpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace sparrow_ipc { diff --git a/integration_tests/CMakeLists.txt b/integration_tests/CMakeLists.txt new file mode 100644 index 0000000..46da543 --- /dev/null +++ b/integration_tests/CMakeLists.txt @@ -0,0 +1,232 @@ +cmake_minimum_required(VERSION 3.28) + +# Create executable for arrow_file_to_stream integration test +add_executable(arrow_file_to_stream arrow_file_to_stream.cpp) + +target_link_libraries(arrow_file_to_stream + PRIVATE + sparrow-ipc + sparrow::sparrow + sparrow::json_reader +) + +set_target_properties(arrow_file_to_stream + PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF + VERSION ${SPARROW_IPC_BINARY_VERSION} + SOVERSION ${SPARROW_IPC_BINARY_CURRENT} + FOLDER "integration_tests" + BUILD_RPATH_USE_ORIGIN ON + BUILD_RPATH "$ORIGIN" + INSTALL_RPATH "$ORIGIN" +) + +target_include_directories(arrow_file_to_stream + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + +add_dependencies(arrow_file_to_stream generate_flatbuffers_headers) + +# Create executable for arrow_stream_to_file integration test +add_executable(arrow_stream_to_file arrow_stream_to_file.cpp) + +target_link_libraries(arrow_stream_to_file + PRIVATE + sparrow-ipc + sparrow::sparrow +) + +set_target_properties(arrow_stream_to_file + PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF + VERSION ${SPARROW_IPC_BINARY_VERSION} + SOVERSION ${SPARROW_IPC_BINARY_CURRENT} + FOLDER integration_tests + BUILD_RPATH_USE_ORIGIN ON + BUILD_RPATH "$ORIGIN" + INSTALL_RPATH "$ORIGIN" +) + +target_include_directories(arrow_stream_to_file + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + +add_dependencies(arrow_stream_to_file generate_flatbuffers_headers) + +# Create executable for arrow_json_to_file integration test +add_executable(arrow_json_to_file arrow_json_to_file.cpp) + +target_link_libraries(arrow_json_to_file + PRIVATE + sparrow-ipc + sparrow::sparrow + sparrow::json_reader +) + +set_target_properties(arrow_json_to_file + PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF + VERSION ${SPARROW_IPC_BINARY_VERSION} + SOVERSION ${SPARROW_IPC_BINARY_CURRENT} + FOLDER integration_tests + BUILD_RPATH_USE_ORIGIN ON + BUILD_RPATH "$ORIGIN" + INSTALL_RPATH "$ORIGIN" +) + +target_include_directories(arrow_json_to_file + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + +add_dependencies(arrow_json_to_file generate_flatbuffers_headers) + +# Create executable for arrow_validate integration test +add_executable(arrow_validate arrow_validate.cpp) + +target_link_libraries(arrow_validate + PRIVATE + sparrow-ipc + sparrow::sparrow + sparrow::json_reader +) + +set_target_properties(arrow_validate + PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF + VERSION ${SPARROW_IPC_BINARY_VERSION} + SOVERSION ${SPARROW_IPC_BINARY_CURRENT} + FOLDER integration_tests + BUILD_RPATH_USE_ORIGIN ON + BUILD_RPATH "$ORIGIN" + INSTALL_RPATH "$ORIGIN" +) + +target_include_directories(arrow_validate + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + +add_dependencies(arrow_validate generate_flatbuffers_headers) + +# Create test executable for integration tools +add_executable(test_integration_tools main.cpp test_integration_tools.cpp) + +target_link_libraries(test_integration_tools + PRIVATE + sparrow-ipc + sparrow::sparrow + sparrow::json_reader + doctest::doctest + arrow-testing-data +) + +target_compile_definitions(test_integration_tools + PRIVATE + INTEGRATION_TOOLS_DIR="${CMAKE_CURRENT_BINARY_DIR}" +) + +set_target_properties(test_integration_tools + PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF +) + +target_include_directories(test_integration_tools + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + +add_dependencies(test_integration_tools generate_flatbuffers_headers arrow_file_to_stream arrow_stream_to_file arrow_json_to_file arrow_validate) + +# Register with CTest +enable_testing() +add_test(NAME integration_tools_test COMMAND test_integration_tools) + +# On Windows, copy required DLLs +if(WIN32) + add_custom_command( + TARGET arrow_file_to_stream POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMENT "Copying DLLs to arrow_file_to_stream executable directory" + ) + + add_custom_command( + TARGET arrow_stream_to_file POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMENT "Copying DLLs to arrow_stream_to_file executable directory" + ) + + add_custom_command( + TARGET arrow_json_to_file POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMENT "Copying DLLs to arrow_json_to_file executable directory" + ) + + add_custom_command( + TARGET arrow_validate POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMENT "Copying DLLs to arrow_validate executable directory" + ) + + add_custom_command( + TARGET test_integration_tools POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMENT "Copying DLLs to test_integration_tools executable directory" + ) +endif() + +set_target_properties(arrow_file_to_stream arrow_stream_to_file arrow_json_to_file arrow_validate test_integration_tools PROPERTIES FOLDER "Integration Tests") diff --git a/integration_tests/arrow_file_to_stream.cpp b/integration_tests/arrow_file_to_stream.cpp new file mode 100644 index 0000000..e321f59 --- /dev/null +++ b/integration_tests/arrow_file_to_stream.cpp @@ -0,0 +1,116 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include "sparrow/json_reader/json_parser.hpp" + +#include +#include + +/** + * @brief Reads a JSON file containing record batches and outputs the serialized Arrow IPC stream to stdout. + * + * This program takes a JSON file path as a command-line argument, parses the record batches + * from the JSON data, serializes them into Arrow IPC stream format, and writes the binary + * stream to stdout. The output can be redirected to a file or piped to another program. + * + * Usage: arrow_file_to_stream + * + * @param argc Number of command-line arguments + * @param argv Array of command-line arguments + * @return EXIT_SUCCESS on success, EXIT_FAILURE on error + */ +int main(int argc, char* argv[]) +{ + // Check command-line arguments + if (argc != 2) + { + std::cerr << "Usage: " << argv[0] << " \n"; + std::cerr << "Reads a JSON file and outputs the serialized Arrow IPC stream to stdout.\n"; + return EXIT_FAILURE; + } + + const std::filesystem::path json_path(argv[1]); + + try + { + // Check if the JSON file exists + if (!std::filesystem::exists(json_path)) + { + std::cerr << "Error: File not found: " << json_path << "\n"; + return EXIT_FAILURE; + } + + // Open and parse the JSON file + std::ifstream json_file(json_path); + if (!json_file.is_open()) + { + std::cerr << "Error: Could not open file: " << json_path << "\n"; + return EXIT_FAILURE; + } + + nlohmann::json json_data; + try + { + json_data = nlohmann::json::parse(json_file); + } + catch (const nlohmann::json::parse_error& e) + { + std::cerr << "Error: Failed to parse JSON file: " << e.what() << "\n"; + return EXIT_FAILURE; + } + json_file.close(); + + // Get the number of batches + if (!json_data.contains("batches") || !json_data["batches"].is_array()) + { + std::cerr << "Error: JSON file does not contain a 'batches' array.\n"; + return EXIT_FAILURE; + } + + const size_t num_batches = json_data["batches"].size(); + + // Parse all record batches from JSON + std::vector record_batches; + record_batches.reserve(num_batches); + + for (size_t batch_idx = 0; batch_idx < num_batches; ++batch_idx) + { + try + { + record_batches.emplace_back( + sparrow::json_reader::build_record_batch_from_json(json_data, batch_idx) + ); + } + catch (const std::exception& e) + { + std::cerr << "Error: Failed to build record batch " << batch_idx << ": " << e.what() + << "\n"; + return EXIT_FAILURE; + } + } + + // Serialize record batches to Arrow IPC stream format + std::vector stream_data; + sparrow_ipc::memory_output_stream stream(stream_data); + sparrow_ipc::serializer serializer(stream); + + serializer << record_batches << sparrow_ipc::end_stream; + + // Write the binary stream to stdout + std::cout.write(reinterpret_cast(stream_data.data()), stream_data.size()); + std::cout.flush(); + + return EXIT_SUCCESS; + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return EXIT_FAILURE; + } +} diff --git a/integration_tests/arrow_json_to_file.cpp b/integration_tests/arrow_json_to_file.cpp new file mode 100644 index 0000000..35f39a2 --- /dev/null +++ b/integration_tests/arrow_json_to_file.cpp @@ -0,0 +1,128 @@ +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +/** + * @brief Reads a JSON file containing record batches and writes the serialized Arrow IPC stream to a file. + * + * This program takes a JSON file path and an output file path as command-line arguments, + * parses the record batches from the JSON data, serializes them into Arrow IPC stream format, + * and writes the binary stream to the specified output file. + * + * Usage: json_to_file + * + * @param argc Number of command-line arguments + * @param argv Array of command-line arguments + * @return EXIT_SUCCESS on success, EXIT_FAILURE on error + */ +int main(int argc, char* argv[]) +{ + // Check command-line arguments + if (argc != 3) + { + std::cerr << "Usage: " << argv[0] << " \n"; + std::cerr << "Reads a JSON file and writes the serialized Arrow IPC stream to a file.\n"; + return EXIT_FAILURE; + } + + const std::filesystem::path json_path(argv[1]); + const std::filesystem::path output_path(argv[2]); + + try + { + // Check if the JSON file exists + if (!std::filesystem::exists(json_path)) + { + std::cerr << "Error: Input file not found: " << json_path << "\n"; + return EXIT_FAILURE; + } + + // Open and parse the JSON file + std::ifstream json_file(json_path); + if (!json_file.is_open()) + { + std::cerr << "Error: Could not open input file: " << json_path << "\n"; + return EXIT_FAILURE; + } + + nlohmann::json json_data; + try + { + json_data = nlohmann::json::parse(json_file); + } + catch (const nlohmann::json::parse_error& e) + { + std::cerr << "Error: Failed to parse JSON file: " << e.what() << "\n"; + return EXIT_FAILURE; + } + json_file.close(); + + // Get the number of batches + if (!json_data.contains("batches") || !json_data["batches"].is_array()) + { + std::cerr << "Error: JSON file does not contain a 'batches' array.\n"; + return EXIT_FAILURE; + } + + const size_t num_batches = json_data["batches"].size(); + + // Parse all record batches from JSON + std::vector record_batches; + record_batches.reserve(num_batches); + + for (size_t batch_idx = 0; batch_idx < num_batches; ++batch_idx) + { + try + { + record_batches.emplace_back( + sparrow::json_reader::build_record_batch_from_json(json_data, batch_idx) + ); + } + catch (const std::exception& e) + { + std::cerr << "Error: Failed to build record batch " << batch_idx << ": " << e.what() << "\n"; + return EXIT_FAILURE; + } + } + + // Serialize record batches to Arrow IPC stream format + std::vector stream_data; + sparrow_ipc::memory_output_stream stream(stream_data); + sparrow_ipc::serializer serializer(stream); + + serializer << record_batches << sparrow_ipc::end_stream; + + // Write the binary stream to the output file + std::ofstream output_file(output_path, std::ios::out | std::ios::binary); + if (!output_file.is_open()) + { + std::cerr << "Error: Could not open output file: " << output_path << "\n"; + return EXIT_FAILURE; + } + + output_file.write(reinterpret_cast(stream_data.data()), stream_data.size()); + output_file.close(); + + if (!output_file.good()) + { + std::cerr << "Error: Failed to write to output file: " << output_path << "\n"; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return EXIT_FAILURE; + } +} diff --git a/integration_tests/arrow_stream_to_file.cpp b/integration_tests/arrow_stream_to_file.cpp new file mode 100644 index 0000000..fd84e56 --- /dev/null +++ b/integration_tests/arrow_stream_to_file.cpp @@ -0,0 +1,110 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +/** + * @brief Reads an Arrow IPC stream from a file and writes it to another file. + * + * This program reads a binary Arrow IPC stream from an input file, deserializes it + * to verify its validity, then re-serializes it and writes the result to the specified + * output file. This ensures the output file contains a valid Arrow IPC stream. + * + * Usage: stream_to_file + * + * @param argc Number of command-line arguments + * @param argv Array of command-line arguments + * @return EXIT_SUCCESS on success, EXIT_FAILURE on error + */ +int main(int argc, char* argv[]) +{ + // Check command-line arguments + if (argc != 3) + { + std::cerr << "Usage: " << argv[0] << " \n"; + std::cerr << "Reads an Arrow IPC stream from a file and writes it to another file.\n"; + return EXIT_FAILURE; + } + + const std::filesystem::path input_path(argv[1]); + const std::filesystem::path output_path(argv[2]); + + try + { + // Check if the input file exists + if (!std::filesystem::exists(input_path)) + { + std::cerr << "Error: Input file not found: " << input_path << "\n"; + return EXIT_FAILURE; + } + + // Read the entire stream from the input file + std::ifstream input_file(input_path, std::ios::in | std::ios::binary); + if (!input_file.is_open()) + { + std::cerr << "Error: Could not open input file: " << input_path << "\n"; + return EXIT_FAILURE; + } + + std::vector input_stream_data( + (std::istreambuf_iterator(input_file)), + std::istreambuf_iterator() + ); + input_file.close(); + + if (input_stream_data.empty()) + { + std::cerr << "Error: No data received from stdin.\n"; + return EXIT_FAILURE; + } + + // Deserialize the stream to validate it and extract record batches + std::vector record_batches; + try + { + record_batches = sparrow_ipc::deserialize_stream(std::span(input_stream_data)); + } + catch (const std::exception& e) + { + std::cerr << "Error: Failed to deserialize stream: " << e.what() << "\n"; + return EXIT_FAILURE; + } + + // Re-serialize the record batches to ensure a valid output stream + std::vector output_stream_data; + sparrow_ipc::memory_output_stream stream(output_stream_data); + sparrow_ipc::serializer serializer(stream); + + serializer << record_batches << sparrow_ipc::end_stream; + + // Write the stream to the output file + std::ofstream output_file(output_path, std::ios::out | std::ios::binary); + if (!output_file.is_open()) + { + std::cerr << "Error: Could not open output file: " << output_path << "\n"; + return EXIT_FAILURE; + } + + output_file.write(reinterpret_cast(output_stream_data.data()), output_stream_data.size()); + output_file.close(); + + if (!output_file.good()) + { + std::cerr << "Error: Failed to write to output file: " << output_path << "\n"; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return EXIT_FAILURE; + } +} diff --git a/integration_tests/arrow_validate.cpp b/integration_tests/arrow_validate.cpp new file mode 100644 index 0000000..34b7a0d --- /dev/null +++ b/integration_tests/arrow_validate.cpp @@ -0,0 +1,287 @@ +#include +#include +#include +#include +#include +#include +#if defined(__cpp_lib_format) +# include +#endif + +#include +#include + +#include + +#include "sparrow/json_reader/json_parser.hpp" + +/** + * @brief Helper function to compare two record batches for equality. + * + * Compares the structure and data of two record batches element-by-element. + * Reports detailed error messages for any mismatches found. + * + * @param rb1 The first record batch to compare + * @param rb2 The second record batch to compare + * @param batch_idx The index of the batch being compared (for error reporting) + * @return true if the batches are identical, false otherwise + */ +bool compare_record_batch(const sparrow::record_batch& rb1, const sparrow::record_batch& rb2, size_t batch_idx) +{ + bool all_match = true; + + // Check number of columns + if (rb1.nb_columns() != rb2.nb_columns()) + { + std::cerr << "Error: Batch " << batch_idx << " has different number of columns: " << rb1.nb_columns() + << " vs " << rb2.nb_columns() << "\n"; + return false; + } + + // Check number of rows + if (rb1.nb_rows() != rb2.nb_rows()) + { + std::cerr << "Error: Batch " << batch_idx << " has different number of rows: " << rb1.nb_rows() + << " vs " << rb2.nb_rows() << "\n"; + return false; + } + + // Check column names + const auto& names1 = rb1.names(); + const auto& names2 = rb2.names(); + if (names1.size() != names2.size()) + { + std::cerr << "Error: Batch " << batch_idx << " has different number of column names\n"; + all_match = false; + } + else + { + for (size_t i = 0; i < names1.size(); ++i) + { + if (names1[i] != names2[i]) + { + std::cerr << "Error: Batch " << batch_idx << " column " << i << " has different name: '" + << names1[i] << "' vs '" << names2[i] << "'\n"; + all_match = false; + } + } + } + + // Check each column + for (size_t col_idx = 0; col_idx < rb1.nb_columns(); ++col_idx) + { + const auto& col1 = rb1.get_column(col_idx); + const auto& col2 = rb2.get_column(col_idx); + + // Check column size + if (col1.size() != col2.size()) + { + std::cerr << "Error: Batch " << batch_idx << ", column " << col_idx + << " has different size: " << col1.size() << " vs " << col2.size() << "\n"; + all_match = false; + continue; + } + + // Check column data type + if (col1.data_type() != col2.data_type()) + { + std::cerr << "Error: Batch " << batch_idx << ", column " << col_idx << " has different data type\n"; + all_match = false; + continue; + } + + // Check column name + const auto col_name1 = col1.name(); + const auto col_name2 = col2.name(); + if (col_name1 != col_name2) + { + std::cerr << "Warning: Batch " << batch_idx << ", column " << col_idx + << " has different name in column metadata\n"; + } + + // Check each value in the column + for (size_t row_idx = 0; row_idx < col1.size(); ++row_idx) + { + if (col1[row_idx] != col2[row_idx]) + { + std::cerr << "Error: Batch " << batch_idx << ", column " << col_idx << " ('" + << col_name1.value_or("unnamed") << "'), row " << row_idx + << " has different value\n"; +#if defined(__cpp_lib_format) + std::cerr << " JSON value: " << std::format("{}", col1[row_idx]) << "\n"; + std::cerr << " Stream value: " << std::format("{}", col2[row_idx]) << "\n"; +#endif + all_match = false; + } + } + } + + return all_match; +} + +/** + * @brief Validates that a JSON file and an Arrow stream file contain identical data. + * + * This program reads a JSON file containing Arrow record batches and an Arrow IPC + * stream file, converts both to vectors of record batches, and compares them + * element-by-element to ensure they are identical. + * + * Usage: validate + * + * @param argc Number of command-line arguments + * @param argv Array of command-line arguments + * @return EXIT_SUCCESS if the files match, EXIT_FAILURE on error or mismatch + */ +int main(int argc, char* argv[]) +{ + // Check command-line arguments + if (argc != 3) + { + std::cerr << "Usage: " << argv[0] << " \n"; + std::cerr << "Validates that a JSON file and an Arrow stream file contain identical data.\n"; + return EXIT_FAILURE; + } + + const std::filesystem::path json_path(argv[1]); + const std::filesystem::path stream_path(argv[2]); + + try + { + // Check if the JSON file exists + if (!std::filesystem::exists(json_path)) + { + std::cerr << "Error: JSON file not found: " << json_path << "\n"; + return EXIT_FAILURE; + } + + // Check if the stream file exists + if (!std::filesystem::exists(stream_path)) + { + std::cerr << "Error: Stream file not found: " << stream_path << "\n"; + return EXIT_FAILURE; + } + + // Load and parse the JSON file + std::cout << "Loading JSON file: " << json_path << "\n"; + std::ifstream json_file(json_path); + if (!json_file.is_open()) + { + std::cerr << "Error: Could not open JSON file: " << json_path << "\n"; + return EXIT_FAILURE; + } + + nlohmann::json json_data; + try + { + json_data = nlohmann::json::parse(json_file); + } + catch (const nlohmann::json::parse_error& e) + { + std::cerr << "Error: Failed to parse JSON file: " << e.what() << "\n"; + return EXIT_FAILURE; + } + json_file.close(); + + // Check for batches in JSON + if (!json_data.contains("batches") || !json_data["batches"].is_array()) + { + std::cerr << "Error: JSON file does not contain a 'batches' array.\n"; + return EXIT_FAILURE; + } + + const size_t num_batches = json_data["batches"].size(); + std::cout << "JSON file contains " << num_batches << " batch(es)\n"; + + // Parse all record batches from JSON + std::vector json_batches; + json_batches.reserve(num_batches); + + for (size_t batch_idx = 0; batch_idx < num_batches; ++batch_idx) + { + try + { + json_batches.emplace_back( + sparrow::json_reader::build_record_batch_from_json(json_data, batch_idx) + ); + } + catch (const std::exception& e) + { + std::cerr << "Error: Failed to build record batch " << batch_idx << " from JSON: " << e.what() + << "\n"; + return EXIT_FAILURE; + } + } + + // Load and deserialize the stream file + std::cout << "Loading stream file: " << stream_path << "\n"; + std::ifstream stream_file(stream_path, std::ios::in | std::ios::binary); + if (!stream_file.is_open()) + { + std::cerr << "Error: Could not open stream file: " << stream_path << "\n"; + return EXIT_FAILURE; + } + + std::vector stream_data( + (std::istreambuf_iterator(stream_file)), + std::istreambuf_iterator() + ); + stream_file.close(); + + if (stream_data.empty()) + { + std::cerr << "Error: Stream file is empty.\n"; + return EXIT_FAILURE; + } + + // Deserialize the stream + std::vector stream_batches; + try + { + stream_batches = sparrow_ipc::deserialize_stream(std::span(stream_data)); + } + catch (const std::exception& e) + { + std::cerr << "Error: Failed to deserialize stream: " << e.what() << "\n"; + return EXIT_FAILURE; + } + + std::cout << "Stream file contains " << stream_batches.size() << " batch(es)\n"; + + // Compare the number of batches + if (json_batches.size() != stream_batches.size()) + { + std::cerr << "Error: Number of batches mismatch!\n"; + std::cerr << " JSON file: " << json_batches.size() << " batch(es)\n"; + std::cerr << " Stream file: " << stream_batches.size() << " batch(es)\n"; + return EXIT_FAILURE; + } + + // Compare each batch + std::cout << "Comparing " << json_batches.size() << " batch(es)...\n"; + bool all_match = true; + for (size_t batch_idx = 0; batch_idx < json_batches.size(); ++batch_idx) + { + std::cout << " Comparing batch " << batch_idx << "...\n"; + if (!compare_record_batch(json_batches[batch_idx], stream_batches[batch_idx], batch_idx)) + { + all_match = false; + } + } + + if (all_match) + { + std::cout << "\n✓ Validation successful: JSON and stream files contain identical data!\n"; + return EXIT_SUCCESS; + } + else + { + std::cerr << "\n✗ Validation failed: JSON and stream files contain different data.\n"; + return EXIT_FAILURE; + } + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return EXIT_FAILURE; + } +} diff --git a/integration_tests/main.cpp b/integration_tests/main.cpp new file mode 100644 index 0000000..9522fa7 --- /dev/null +++ b/integration_tests/main.cpp @@ -0,0 +1,2 @@ +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include "doctest/doctest.h" diff --git a/integration_tests/test_integration_tools.cpp b/integration_tests/test_integration_tools.cpp new file mode 100644 index 0000000..b0347fe --- /dev/null +++ b/integration_tests/test_integration_tools.cpp @@ -0,0 +1,718 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "sparrow/json_reader/json_parser.hpp" + +#include "doctest/doctest.h" +#include "sparrow_ipc/deserialize.hpp" + +// Helper function to execute a command and capture output +struct CommandResult +{ + int exit_code; + std::string stdout_data; + std::string stderr_data; +}; + +#ifdef _WIN32 +# include + +CommandResult execute_command(const std::string& command) +{ + CommandResult result; + + // Create temporary files for stdout and stderr + const std::string stdout_file = std::tmpnam(nullptr); + const std::string stderr_file = std::tmpnam(nullptr); + + const std::string full_command = command + " > " + stdout_file + " 2> " + stderr_file; + + result.exit_code = std::system(full_command.c_str()); + + // Read stdout + std::ifstream stdout_stream(stdout_file, std::ios::binary); + if (stdout_stream) + { + std::ostringstream ss; + ss << stdout_stream.rdbuf(); + result.stdout_data = ss.str(); + } + + // Read stderr + std::ifstream stderr_stream(stderr_file, std::ios::binary); + if (stderr_stream) + { + std::ostringstream ss; + ss << stderr_stream.rdbuf(); + result.stderr_data = ss.str(); + } + + // Clean up + std::filesystem::remove(stdout_file); + std::filesystem::remove(stderr_file); + + return result; +} + +#else +# include +# include + +CommandResult execute_command(const std::string& command) +{ + CommandResult result; + + // Check if command already contains output redirection + const bool has_redirection = (command.find('>') != std::string::npos); + + if (has_redirection) + { + // Command already has redirection, execute as-is + // But we still want to capture stderr for error checking + const std::filesystem::path stderr_file = std::filesystem::temp_directory_path() + / ("stderr_" + std::to_string(std::time(nullptr))); + const std::string full_command = command + " 2> " + stderr_file.string(); + result.exit_code = std::system(full_command.c_str()); + + // Read stderr + std::ifstream stderr_stream(stderr_file, std::ios::binary); + if (stderr_stream) + { + std::ostringstream ss; + ss << stderr_stream.rdbuf(); + result.stderr_data = ss.str(); + } + + // Clean up + std::filesystem::remove(stderr_file); + } + else + { + // Create temporary files for stdout and stderr + const std::filesystem::path stdout_file = std::filesystem::temp_directory_path() + / ("stdout_" + std::to_string(std::time(nullptr))); + const std::filesystem::path stderr_file = std::filesystem::temp_directory_path() + / ("stderr_" + std::to_string(std::time(nullptr))); + + // The command string is already properly formed (executable path + args) + // We need to redirect stdout and stderr to files + const std::string full_command = command + " > " + stdout_file.string() + " 2> " + stderr_file.string(); + + result.exit_code = std::system(full_command.c_str()); + + // Read stdout + std::ifstream stdout_stream(stdout_file, std::ios::binary); + if (stdout_stream) + { + std::ostringstream ss; + ss << stdout_stream.rdbuf(); + result.stdout_data = ss.str(); + } + + // Read stderr + std::ifstream stderr_stream(stderr_file, std::ios::binary); + if (stderr_stream) + { + std::ostringstream ss; + ss << stderr_stream.rdbuf(); + result.stderr_data = ss.str(); + } + + // Clean up + std::filesystem::remove(stdout_file); + std::filesystem::remove(stderr_file); + } + + return result; +} +#endif + +// Helper to compare record batches +void compare_record_batches( + const std::vector& record_batches_1, + const std::vector& record_batches_2 +) +{ + REQUIRE_EQ(record_batches_1.size(), record_batches_2.size()); + for (size_t i = 0; i < record_batches_1.size(); ++i) + { + REQUIRE_EQ(record_batches_1[i].nb_columns(), record_batches_2[i].nb_columns()); + for (size_t y = 0; y < record_batches_1[i].nb_columns(); y++) + { + const auto& column_1 = record_batches_1[i].get_column(y); + const auto& column_2 = record_batches_2[i].get_column(y); + REQUIRE_EQ(column_1.size(), column_2.size()); + CHECK_EQ(record_batches_1[i].names()[y], record_batches_2[i].names()[y]); + for (size_t z = 0; z < column_1.size(); z++) + { + const auto col_name = column_1.name().value_or("NA"); + INFO("Comparing batch " << i << ", column " << y << " named: " << col_name << ", row " << z); + REQUIRE_EQ(column_1.data_type(), column_2.data_type()); + CHECK_EQ(column_1[z], column_2[z]); + } + } + } +} + +TEST_SUITE("Integration Tools Tests") +{ + // Get paths to test data + const std::filesystem::path arrow_testing_data_dir = ARROW_TESTING_DATA_DIR; + const std::filesystem::path tests_resources_files_path = arrow_testing_data_dir / "data" / "arrow-ipc-stream" + / "integration" / "cpp-21.0.0"; + + // Paths to the executables - defined at compile time + const std::filesystem::path exe_dir = INTEGRATION_TOOLS_DIR; + const std::filesystem::path file_to_stream_exe = exe_dir / "file_to_stream"; + const std::filesystem::path stream_to_file_exe = exe_dir / "stream_to_file"; + const std::filesystem::path json_to_file_exe = exe_dir / "json_to_file"; + const std::filesystem::path validate_exe = exe_dir / "validate"; + + // Helper to build command with properly quoted executable + auto make_command = [](const std::filesystem::path& exe, const std::string& args = "") + { + std::string cmd = "\"" + exe.string() + "\""; + if (!args.empty()) + { + cmd += " " + args; + } + return cmd; + }; + + TEST_CASE("file_to_stream - No arguments") + { + auto result = execute_command(make_command(file_to_stream_exe)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("file_to_stream - Non-existent file") + { + const std::string non_existent = "non_existent_file_12345.json"; + auto result = execute_command(make_command(file_to_stream_exe, non_existent)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("not found") != std::string::npos); + } + + TEST_CASE("stream_to_file - No arguments") + { + auto result = execute_command(make_command(stream_to_file_exe)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("stream_to_file - Only one argument") + { + auto result = execute_command(make_command(stream_to_file_exe, "output.stream")); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("stream_to_file - Non-existent input file") + { + const std::string non_existent = "non_existent_file_12345.stream"; + const std::string output_file = "output.stream"; + auto result = execute_command(make_command(stream_to_file_exe, non_existent + " " + output_file)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("not found") != std::string::npos); + } + + TEST_CASE("json_to_file - No arguments") + { + auto result = execute_command(make_command(json_to_file_exe)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("json_to_file - Only one argument") + { + const std::string json_file = "input.json"; + auto result = execute_command(make_command(json_to_file_exe, json_file)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("json_to_file - Non-existent input file") + { + const std::string non_existent = "non_existent_file_12345.json"; + const std::string output_file = "output.stream"; + auto result = execute_command( + make_command(json_to_file_exe, "\"" + non_existent + "\" \"" + output_file + "\"") + ); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("not found") != std::string::npos); + } + + TEST_CASE("validate - No arguments") + { + auto result = execute_command(make_command(validate_exe)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("validate - Only one argument") + { + const std::string json_file = "input.json"; + auto result = execute_command(make_command(validate_exe, json_file)); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("Usage:") != std::string::npos); + } + + TEST_CASE("validate - Non-existent JSON file") + { + const std::string non_existent_json = "non_existent_file_12345.json"; + const std::string stream_file = "existing.stream"; + auto result = execute_command( + make_command(validate_exe, "\"" + non_existent_json + "\" \"" + stream_file + "\"") + ); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("not found") != std::string::npos); + } + + TEST_CASE("validate - Non-existent stream file") + { + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + const std::string non_existent_stream = "non_existent_file_12345.stream"; + auto result = execute_command( + make_command(validate_exe, "\"" + json_file.string() + "\" \"" + non_existent_stream + "\"") + ); + CHECK_NE(result.exit_code, 0); + CHECK(result.stderr_data.find("not found") != std::string::npos); + } + + TEST_CASE("file_to_stream - Convert JSON to stream") + { + // Test with a known good JSON file + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + const std::filesystem::path output_stream = std::filesystem::temp_directory_path() + / "test_output.stream"; + + // Execute file_to_stream + const std::string command = "\"" + file_to_stream_exe.string() + "\" \"" + json_file.string() + + "\" > \"" + output_stream.string() + "\""; + auto result = execute_command(command); + + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(output_stream)); + CHECK_GT(std::filesystem::file_size(output_stream), 0); + + // Verify the output is a valid stream by deserializing it + std::ifstream stream_file(output_stream, std::ios::binary); + REQUIRE(stream_file.is_open()); + + std::vector stream_data( + (std::istreambuf_iterator(stream_file)), + std::istreambuf_iterator() + ); + stream_file.close(); + + // Should be able to deserialize without errors + CHECK_NOTHROW(sparrow_ipc::deserialize_stream(std::span(stream_data))); + + // Clean up + std::filesystem::remove(output_stream); + } + + TEST_CASE("stream_to_file - Process stream file") + { + const std::filesystem::path input_stream = tests_resources_files_path / "generated_primitive.stream"; + + if (!std::filesystem::exists(input_stream)) + { + MESSAGE("Skipping test: test file not found at " << input_stream); + return; + } + + const std::filesystem::path output_stream = std::filesystem::temp_directory_path() + / "test_stream_output.stream"; + + // Execute stream_to_file + const std::string command = "\"" + stream_to_file_exe.string() + "\" \"" + input_stream.string() + + "\" \"" + output_stream.string() + "\""; + auto result = execute_command(command); + + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(output_stream)); + CHECK_GT(std::filesystem::file_size(output_stream), 0); + + // Verify the output is a valid stream + std::ifstream output_file(output_stream, std::ios::binary); + REQUIRE(output_file.is_open()); + + std::vector output_data( + (std::istreambuf_iterator(output_file)), + std::istreambuf_iterator() + ); + output_file.close(); + + CHECK_NOTHROW(sparrow_ipc::deserialize_stream(std::span(output_data))); + + // Clean up + std::filesystem::remove(output_stream); + } + + TEST_CASE("Round-trip: JSON -> stream -> file -> deserialize") + { + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + const std::filesystem::path intermediate_stream = std::filesystem::temp_directory_path() + / "intermediate.stream"; + const std::filesystem::path final_stream = std::filesystem::temp_directory_path() / "final.stream"; + + // Step 1: JSON -> stream + { + const std::string command = "\"" + file_to_stream_exe.string() + "\" \"" + json_file.string() + + "\" > \"" + intermediate_stream.string() + "\""; + auto result = execute_command(command); + REQUIRE_EQ(result.exit_code, 0); + REQUIRE(std::filesystem::exists(intermediate_stream)); + } + + // Step 2: stream -> file + { + const std::string command = "\"" + stream_to_file_exe.string() + "\" \"" + + intermediate_stream.string() + "\" \"" + final_stream.string() + "\""; + auto result = execute_command(command); + REQUIRE_EQ(result.exit_code, 0); + REQUIRE(std::filesystem::exists(final_stream)); + } + + // Step 3: Compare the results + // Load original JSON data + std::ifstream json_input(json_file); + REQUIRE(json_input.is_open()); + nlohmann::json json_data = nlohmann::json::parse(json_input); + json_input.close(); + + const size_t num_batches = json_data["batches"].size(); + std::vector original_batches; + for (size_t i = 0; i < num_batches; ++i) + { + original_batches.emplace_back(sparrow::json_reader::build_record_batch_from_json(json_data, i)); + } + + // Load final stream + std::ifstream final_file(final_stream, std::ios::binary); + REQUIRE(final_file.is_open()); + std::vector final_data( + (std::istreambuf_iterator(final_file)), + std::istreambuf_iterator() + ); + final_file.close(); + + auto final_batches = sparrow_ipc::deserialize_stream(std::span(final_data)); + + // Compare + compare_record_batches(original_batches, final_batches); + + // Clean up + std::filesystem::remove(intermediate_stream); + std::filesystem::remove(final_stream); + } + + TEST_CASE("json_to_file - Convert JSON to stream file") + { + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + const std::filesystem::path output_stream = std::filesystem::temp_directory_path() + / "json_to_file_output.stream"; + + // Execute json_to_file + const std::string command = "\"" + json_to_file_exe.string() + "\" \"" + json_file.string() + "\" \"" + + output_stream.string() + "\""; + auto result = execute_command(command); + + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(output_stream)); + CHECK_GT(std::filesystem::file_size(output_stream), 0); + + // Verify the output is a valid stream by deserializing it + std::ifstream stream_file(output_stream, std::ios::binary); + REQUIRE(stream_file.is_open()); + + std::vector stream_data( + (std::istreambuf_iterator(stream_file)), + std::istreambuf_iterator() + ); + stream_file.close(); + + // Should be able to deserialize without errors + auto deserialized_batches = sparrow_ipc::deserialize_stream(std::span(stream_data)); + CHECK_GT(deserialized_batches.size(), 0); + + // Clean up + std::filesystem::remove(output_stream); + } + + TEST_CASE("validate - Successful validation of matching files") + { + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + // First, create a stream file from the JSON + const std::filesystem::path stream_file = std::filesystem::temp_directory_path() + / "validate_test.stream"; + { + const std::string command = "\"" + json_to_file_exe.string() + "\" \"" + json_file.string() + + "\" \"" + stream_file.string() + "\""; + auto result = execute_command(command); + REQUIRE_EQ(result.exit_code, 0); + REQUIRE(std::filesystem::exists(stream_file)); + } + + // Now validate that the JSON and stream match + { + const std::string command = "\"" + validate_exe.string() + "\" \"" + json_file.string() + "\" \"" + + stream_file.string() + "\""; + auto result = execute_command(command); + + CHECK_EQ(result.exit_code, 0); + const bool validation_success = result.stdout_data.find("Validation successful") != std::string::npos + || result.stdout_data.find("identical data") != std::string::npos; + CHECK(validation_success); + } + + // Clean up + std::filesystem::remove(stream_file); + } + + TEST_CASE("validate - Validation with reference stream file") + { + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + const std::filesystem::path stream_file = tests_resources_files_path / "generated_primitive.stream"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: JSON file not found at " << json_file); + return; + } + + if (!std::filesystem::exists(stream_file)) + { + MESSAGE("Skipping test: Stream file not found at " << stream_file); + return; + } + + // Validate that the JSON and reference stream match + const std::string command = "\"" + validate_exe.string() + "\" \"" + json_file.string() + "\" \"" + + stream_file.string() + "\""; + auto result = execute_command(command); + + CHECK_EQ(result.exit_code, 0); + const bool validation_success = result.stdout_data.find("Validation successful") != std::string::npos + || result.stdout_data.find("identical data") != std::string::npos; + CHECK(validation_success); + } + + TEST_CASE("json_to_file and validate - Round-trip with validation") + { + const std::filesystem::path json_file = tests_resources_files_path / "generated_binary.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + const std::filesystem::path stream_file = std::filesystem::temp_directory_path() + / "roundtrip_validate.stream"; + + // Step 1: Convert JSON to stream + { + const std::string command = "\"" + json_to_file_exe.string() + "\" \"" + json_file.string() + + "\" \"" + stream_file.string() + "\""; + auto result = execute_command(command); + REQUIRE_EQ(result.exit_code, 0); + REQUIRE(std::filesystem::exists(stream_file)); + } + + // Step 2: Validate the stream against the JSON + { + const std::string command = "\"" + validate_exe.string() + "\" \"" + json_file.string() + "\" \"" + + stream_file.string() + "\""; + auto result = execute_command(command); + CHECK_EQ(result.exit_code, 0); + } + + // Clean up + std::filesystem::remove(stream_file); + } + + TEST_CASE("Paths with spaces") + { + const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json"; + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test: test file not found at " << json_file); + return; + } + + // Create temporary directory with spaces in the name + const std::filesystem::path temp_dir = std::filesystem::temp_directory_path() / "test dir with spaces"; + std::filesystem::create_directories(temp_dir); + + const std::filesystem::path output_stream = temp_dir / "output file.stream"; + const std::filesystem::path final_stream = temp_dir / "final output.stream"; + + // Step 1: JSON -> stream with spaces in output path + { + const std::string command = "\"" + file_to_stream_exe.string() + "\" \"" + json_file.string() + + "\" > \"" + output_stream.string() + "\""; + auto result = execute_command(command); + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(output_stream)); + } + + // Step 2: stream -> file with spaces in both paths + { + const std::string command = "\"" + stream_to_file_exe.string() + "\" \"" + output_stream.string() + + "\" \"" + final_stream.string() + "\""; + auto result = execute_command(command); + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(final_stream)); + } + + // Verify the final output is valid + std::ifstream final_file(final_stream, std::ios::binary); + REQUIRE(final_file.is_open()); + std::vector final_data( + (std::istreambuf_iterator(final_file)), + std::istreambuf_iterator() + ); + final_file.close(); + + CHECK_NOTHROW(sparrow_ipc::deserialize_stream(std::span(final_data))); + + // Step 3: Test json_to_file with spaces in paths + const std::filesystem::path json_to_file_output = temp_dir / "json to file output.stream"; + { + const std::string command = "\"" + json_to_file_exe.string() + "\" \"" + json_file.string() + + "\" \"" + json_to_file_output.string() + "\""; + auto result = execute_command(command); + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(json_to_file_output)); + } + + // Step 4: Test validate with spaces in paths + { + const std::string command = "\"" + validate_exe.string() + "\" \"" + json_file.string() + "\" \"" + + json_to_file_output.string() + "\""; + auto result = execute_command(command); + CHECK_EQ(result.exit_code, 0); + } + + // Clean up + std::filesystem::remove_all(temp_dir); + } + + TEST_CASE("Multiple test files") + { + const std::vector test_files = { + "generated_primitive", + "generated_binary", + "generated_primitive_zerolength", + "generated_binary_zerolength" + }; + + for (const auto& test_file : test_files) + { + const std::filesystem::path json_file = tests_resources_files_path / (test_file + ".json"); + + if (!std::filesystem::exists(json_file)) + { + MESSAGE("Skipping test file: " << json_file); + continue; + } + + SUBCASE(test_file.c_str()) + { + const std::filesystem::path output_stream = std::filesystem::temp_directory_path() + / (test_file + "_output.stream"); + + // Convert JSON to stream + const std::string command = "\"" + file_to_stream_exe.string() + "\" \"" + json_file.string() + + "\" > \"" + output_stream.string() + "\""; + auto result = execute_command(command); + + CHECK_EQ(result.exit_code, 0); + CHECK(std::filesystem::exists(output_stream)); + + // Deserialize and verify + std::ifstream stream_file(output_stream, std::ios::binary); + if (stream_file.is_open()) + { + std::vector stream_data( + (std::istreambuf_iterator(stream_file)), + std::istreambuf_iterator() + ); + stream_file.close(); + + CHECK_NOTHROW(sparrow_ipc::deserialize_stream(std::span(stream_data))); + } + + // Test json_to_file with the same file + const std::filesystem::path json_to_file_output = std::filesystem::temp_directory_path() + / (test_file + "_json_to_file.stream"); + { + const std::string cmd = "\"" + json_to_file_exe.string() + "\" \"" + json_file.string() + + "\" \"" + json_to_file_output.string() + "\""; + auto res = execute_command(cmd); + CHECK_EQ(res.exit_code, 0); + CHECK(std::filesystem::exists(json_to_file_output)); + } + + // Test validate with the json_to_file output + { + const std::string cmd = "\"" + validate_exe.string() + "\" \"" + json_file.string() + + "\" \"" + json_to_file_output.string() + "\""; + auto res = execute_command(cmd); + CHECK_EQ(res.exit_code, 0); + } + + // Clean up + std::filesystem::remove(output_stream); + std::filesystem::remove(json_to_file_output); + } + } + } +} diff --git a/src/utils.cpp b/src/utils.cpp index 2fc2490..73db136 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -1,5 +1,7 @@ #include "sparrow_ipc/utils.hpp" +#include + namespace sparrow_ipc::utils { std::optional parse_format(std::string_view format_str, std::string_view sep)