From 3c267c26c4e263fc516edc0f54f0bfaf686db4cd Mon Sep 17 00:00:00 2001 From: "xuan.zhao" Date: Thu, 23 Apr 2026 18:16:33 +0800 Subject: [PATCH] feat(puffin): add puffin file reader and writer - PuffinWriter: in-memory writer that builds complete Puffin files - Add() writes blobs with optional compression - Finish() serializes footer with JSON metadata - Tracks BlobMetadata for all written blobs - PuffinReader: in-memory reader that parses Puffin files - ReadFileMetadata() parses footer and validates magic bytes - ReadBlob() reads and decompresses individual blobs - ReadAll() reads all blobs from metadata - Expose Compress/Decompress as public API in puffin_format.h - Register new sources in CMake and Meson build systems - Add comprehensive tests including Java binary compatibility --- src/iceberg/CMakeLists.txt | 4 +- src/iceberg/meson.build | 2 + src/iceberg/puffin/meson.build | 8 +- src/iceberg/puffin/puffin_format.cc | 24 +- src/iceberg/puffin/puffin_format.h | 10 + src/iceberg/puffin/puffin_reader.cc | 204 +++++++++ src/iceberg/puffin/puffin_reader.h | 86 ++++ src/iceberg/puffin/puffin_writer.cc | 172 +++++++ src/iceberg/puffin/puffin_writer.h | 114 +++++ src/iceberg/test/CMakeLists.txt | 3 +- src/iceberg/test/meson.build | 6 +- src/iceberg/test/puffin_reader_writer_test.cc | 418 ++++++++++++++++++ 12 files changed, 1035 insertions(+), 16 deletions(-) create mode 100644 src/iceberg/puffin/puffin_reader.cc create mode 100644 src/iceberg/puffin/puffin_reader.h create mode 100644 src/iceberg/puffin/puffin_writer.cc create mode 100644 src/iceberg/puffin/puffin_writer.h create mode 100644 src/iceberg/test/puffin_reader_writer_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 79298b1a1..7a0bc567b 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -170,7 +170,9 @@ set(ICEBERG_DATA_SOURCES deletes/roaring_position_bitmap.cc puffin/file_metadata.cc puffin/json_serde.cc - puffin/puffin_format.cc) + puffin/puffin_format.cc + puffin/puffin_reader.cc + puffin/puffin_writer.cc) set(ICEBERG_DATA_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_DATA_SHARED_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 0b5f269d5..2a7e73810 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -152,6 +152,8 @@ iceberg_data_sources = files( 'puffin/file_metadata.cc', 'puffin/json_serde.cc', 'puffin/puffin_format.cc', + 'puffin/puffin_reader.cc', + 'puffin/puffin_writer.cc', ) # CRoaring does not export symbols, so on Windows it must diff --git a/src/iceberg/puffin/meson.build b/src/iceberg/puffin/meson.build index 7869d7b2c..7f30468db 100644 --- a/src/iceberg/puffin/meson.build +++ b/src/iceberg/puffin/meson.build @@ -16,6 +16,12 @@ # under the License. install_headers( - ['file_metadata.h', 'puffin_format.h', 'type_fwd.h'], + [ + 'file_metadata.h', + 'puffin_format.h', + 'puffin_reader.h', + 'puffin_writer.h', + 'type_fwd.h', + ], subdir: 'iceberg/puffin', ) diff --git a/src/iceberg/puffin/puffin_format.cc b/src/iceberg/puffin/puffin_format.cc index 88807d0ca..88d378f04 100644 --- a/src/iceberg/puffin/puffin_format.cc +++ b/src/iceberg/puffin/puffin_format.cc @@ -36,6 +36,18 @@ constexpr std::pair GetFlagPosition(PuffinFlag flag) { std::unreachable(); } +} // namespace + +bool IsFlagSet(std::span flags, PuffinFlag flag) { + auto [byte_num, bit_num] = GetFlagPosition(flag); + return (flags[byte_num] & (1 << bit_num)) != 0; +} + +void SetFlag(std::span flags, PuffinFlag flag) { + auto [byte_num, bit_num] = GetFlagPosition(flag); + flags[byte_num] |= (1 << bit_num); +} + // TODO(zhaoxuan1994): Move compression logic to a unified codec interface. Result> Compress(PuffinCompressionCodec codec, std::span input) { @@ -63,16 +75,4 @@ Result> Decompress(PuffinCompressionCodec codec, std::unreachable(); } -} // namespace - -bool IsFlagSet(std::span flags, PuffinFlag flag) { - auto [byte_num, bit_num] = GetFlagPosition(flag); - return (flags[byte_num] & (1 << bit_num)) != 0; -} - -void SetFlag(std::span flags, PuffinFlag flag) { - auto [byte_num, bit_num] = GetFlagPosition(flag); - flags[byte_num] |= (1 << bit_num); -} - } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_format.h b/src/iceberg/puffin/puffin_format.h index e5ecf9003..b3b5f10de 100644 --- a/src/iceberg/puffin/puffin_format.h +++ b/src/iceberg/puffin/puffin_format.h @@ -23,8 +23,10 @@ /// Puffin file format constants and utilities. #include +#include #include #include +#include #include "iceberg/iceberg_data_export.h" #include "iceberg/puffin/file_metadata.h" @@ -66,4 +68,12 @@ ICEBERG_DATA_EXPORT bool IsFlagSet(std::span flags, PuffinFlag /// \brief Set a flag in the flags bytes. ICEBERG_DATA_EXPORT void SetFlag(std::span flags, PuffinFlag flag); +/// \brief Compress data using the specified codec. +ICEBERG_DATA_EXPORT Result> Compress( + PuffinCompressionCodec codec, std::span input); + +/// \brief Decompress data using the specified codec. +ICEBERG_DATA_EXPORT Result> Decompress( + PuffinCompressionCodec codec, std::span input); + } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.cc b/src/iceberg/puffin/puffin_reader.cc new file mode 100644 index 000000000..24a51e158 --- /dev/null +++ b/src/iceberg/puffin/puffin_reader.cc @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include +#include +#include +#include +#include + +#include "iceberg/file_io.h" +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Validate magic bytes in a buffer at the given offset. +Status CheckMagic(std::span data, int64_t offset) { + if (offset < 0 || + offset + PuffinFormat::kMagicLength > static_cast(data.size())) { + return Invalid("Invalid file: cannot read magic at offset {}", offset); + } + auto* begin = reinterpret_cast(data.data() + offset); + if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { + return Invalid( + "Invalid file: expected magic at offset {}, got [{:#04x}, {:#04x}, " + "{:#04x}, {:#04x}]", + offset, begin[0], begin[1], begin[2], begin[3]); + } + return {}; +} + +// Validate that no unknown flag bits are set. +Status CheckUnknownFlags(std::span flags) { + constexpr uint8_t kKnownBitsMask = 0x01; + if ((flags[0] & ~kKnownBitsMask) != 0 || flags[1] != 0 || flags[2] != 0 || + flags[3] != 0) { + return Invalid( + "Invalid file: unknown footer flags set [{:#04x}, {:#04x}, {:#04x}, {:#04x}]", + flags[0], flags[1], flags[2], flags[3]); + } + return {}; +} + +} // namespace + +PuffinReader::PuffinReader(std::span data) + : data_(data), file_size_(static_cast(data.size())) {} + +PuffinReader::PuffinReader(std::unique_ptr input_file) + : input_file_(std::move(input_file)) {} + +PuffinReader::~PuffinReader() = default; + +Result> PuffinReader::ReadBytes(int64_t offset, int64_t length) { + if (IsFileMode()) { + if (!stream_) { + ICEBERG_ASSIGN_OR_RAISE(stream_, input_file_->Open()); + } + std::vector buf(length); + ICEBERG_RETURN_UNEXPECTED(stream_->ReadFully(offset, buf)); + return buf; + } + // Memory mode + if (offset < 0 || length < 0 || offset > file_size_ || length > file_size_ - offset) { + return Invalid("Read out of bounds: offset {} + length {} exceeds file size {}", + offset, length, file_size_); + } + return std::vector(data_.data() + offset, data_.data() + offset + length); +} + +Result PuffinReader::ReadFileMetadata() { + // Get file size + if (IsFileMode()) { + ICEBERG_ASSIGN_OR_RAISE(file_size_, input_file_->Size()); + } + + if (file_size_ < PuffinFormat::kFooterStructLength) { + return Invalid("Invalid file: file length {} is less than minimal footer size {}", + file_size_, PuffinFormat::kFooterStructLength); + } + + // Validate header magic + ICEBERG_ASSIGN_OR_RAISE(auto header_bytes, ReadBytes(0, PuffinFormat::kMagicLength)); + ICEBERG_RETURN_UNEXPECTED(CheckMagic(header_bytes, 0)); + + // Read footer struct from end of file + auto footer_struct_offset = file_size_ - PuffinFormat::kFooterStructLength; + ICEBERG_ASSIGN_OR_RAISE( + auto footer_struct, + ReadBytes(footer_struct_offset, PuffinFormat::kFooterStructLength)); + + // Validate footer end magic + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(footer_struct, PuffinFormat::kFooterStructMagicOffset)); + + // Read payload size + auto payload_size = ReadLittleEndian( + footer_struct.data() + PuffinFormat::kFooterStructPayloadSizeOffset); + + if (payload_size < 0) { + return Invalid("Invalid file: negative payload size {}", payload_size); + } + + // Calculate total footer size and validate + int64_t footer_size = PuffinFormat::kFooterStartMagicLength + + static_cast(payload_size) + + PuffinFormat::kFooterStructLength; + auto footer_offset = file_size_ - footer_size; + if (footer_offset < 0) { + return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size, + file_size_); + } + + // Validate footer start magic + ICEBERG_ASSIGN_OR_RAISE(auto footer_start_magic, + ReadBytes(footer_offset, PuffinFormat::kMagicLength)); + ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer_start_magic, 0)); + + // Check flags + std::array flags{}; + std::memcpy(flags.data(), footer_struct.data() + PuffinFormat::kFooterStructFlagsOffset, + 4); + ICEBERG_RETURN_UNEXPECTED(CheckUnknownFlags(flags)); + + PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone; + if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) { + footer_compression = PuffinFormat::kDefaultFooterCompressionCodec; + } + + // Read and decompress footer payload + auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength; + ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes, ReadBytes(payload_offset, payload_size)); + ICEBERG_ASSIGN_OR_RAISE(auto decompressed, + Decompress(footer_compression, payload_bytes)); + + // Parse JSON + std::string_view json_str(reinterpret_cast(decompressed.data()), + decompressed.size()); + return FileMetadataFromJsonString(json_str); +} + +Result>> PuffinReader::ReadBlob( + const BlobMetadata& blob_metadata) { + if (blob_metadata.offset < 0 || blob_metadata.length < 0 || + blob_metadata.offset > file_size_ || + blob_metadata.length > file_size_ - blob_metadata.offset) { + return Invalid("Invalid blob: offset {} + length {} exceeds file size {}", + blob_metadata.offset, blob_metadata.length, file_size_); + } + + ICEBERG_ASSIGN_OR_RAISE(auto raw_data, + ReadBytes(blob_metadata.offset, blob_metadata.length)); + + // Determine compression codec + ICEBERG_ASSIGN_OR_RAISE( + auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec)); + ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data)); + + return std::pair{blob_metadata, std::move(decompressed)}; +} + +Result>>> +PuffinReader::ReadAll(const std::vector& blobs) { + // Sort by offset for sequential I/O access pattern + std::vector sorted; + sorted.reserve(blobs.size()); + for (const auto& blob : blobs) { + sorted.push_back(&blob); + } + std::sort(sorted.begin(), sorted.end(), + [](const auto* a, const auto* b) { return a->offset < b->offset; }); + + std::vector>> results; + results.reserve(blobs.size()); + for (const auto* blob : sorted) { + ICEBERG_ASSIGN_OR_RAISE(auto blob_pair, ReadBlob(*blob)); + results.push_back(std::move(blob_pair)); + } + return results; +} + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.h b/src/iceberg/puffin/puffin_reader.h new file mode 100644 index 000000000..d95feb798 --- /dev/null +++ b/src/iceberg/puffin/puffin_reader.h @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_reader.h +/// Puffin file reader. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_data_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg { +class InputFile; +class SeekableInputStream; +} // namespace iceberg + +namespace iceberg::puffin { + +/// \brief Reader for Puffin files. +/// +/// Supports two modes: +/// - Memory mode: parses from an in-memory buffer. +/// - File mode: reads from an InputFile with seek support (for DV use case). +class ICEBERG_DATA_EXPORT PuffinReader { + public: + /// \brief Construct a memory-mode reader from file data. + explicit PuffinReader(std::span data); + + /// \brief Construct a file-mode reader from an InputFile. + explicit PuffinReader(std::unique_ptr input_file); + + ~PuffinReader(); + + /// \brief Read and return the file metadata from the footer. + Result ReadFileMetadata(); + + /// \brief Read a specific blob's data by its metadata. + /// \param blob_metadata The metadata describing the blob to read. + /// \return A pair of (BlobMetadata, decompressed data), or an error. + Result>> ReadBlob( + const BlobMetadata& blob_metadata); + + /// \brief Read all blobs described in the file metadata. + /// \return A vector of (BlobMetadata, decompressed data) pairs, or an error. + Result>>> ReadAll( + const std::vector& blobs); + + private: + /// In-memory data for memory mode. + std::span data_; + /// Input file for file mode. + std::unique_ptr input_file_; + /// Opened stream (lazily opened in file mode). + std::unique_ptr stream_; + /// Cached file size. + int64_t file_size_ = 0; + + bool IsFileMode() const { return input_file_ != nullptr; } + Result> ReadBytes(int64_t offset, int64_t length); +}; + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_writer.cc b/src/iceberg/puffin/puffin_writer.cc new file mode 100644 index 000000000..86014c003 --- /dev/null +++ b/src/iceberg/puffin/puffin_writer.cc @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_writer.h" + +#include + +#include "iceberg/file_io.h" +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +PuffinWriter::PuffinWriter(std::unordered_map properties, + PuffinCompressionCodec default_codec) + : default_codec_(default_codec), properties_(std::move(properties)) {} + +PuffinWriter::PuffinWriter(std::unique_ptr output_file, + std::unordered_map properties, + PuffinCompressionCodec default_codec) + : default_codec_(default_codec), properties_(std::move(properties)) { + auto stream_result = output_file->CreateOrOverwrite(); + if (stream_result.has_value()) { + stream_ = std::move(stream_result.value()); + } + // If CreateOrOverwrite fails, stream_ remains null and Add/Finish will fail + // when trying to write. This is intentional - we defer the error to the first + // write operation. +} + +PuffinWriter::~PuffinWriter() = default; + +Status PuffinWriter::WriteBytes(std::span data) { + if (IsStreamMode()) { + return stream_->Write(data); + } + buffer_.insert(buffer_.end(), data.begin(), data.end()); + return {}; +} + +Status PuffinWriter::WriteMagic() { + const auto& magic = PuffinFormat::kMagicV1; + return WriteBytes(std::span( + reinterpret_cast(magic.data()), magic.size())); +} + +Status PuffinWriter::WriteHeader() { + if (header_written_) return {}; + ICEBERG_RETURN_UNEXPECTED(WriteMagic()); + header_written_ = true; + return {}; +} + +Result PuffinWriter::CurrentPosition() const { + if (IsStreamMode()) { + return stream_->Position(); + } + return static_cast(buffer_.size()); +} + +Status PuffinWriter::Add(const Blob& blob) { + if (finished_) { + return Invalid("Writer already finished"); + } + if (IsStreamMode() && !stream_) { + return Invalid("Failed to open output stream"); + } + + ICEBERG_RETURN_UNEXPECTED(WriteHeader()); + + auto codec = blob.requested_compression.value_or(default_codec_); + std::span input_span( + reinterpret_cast(blob.data.data()), blob.data.size()); + ICEBERG_ASSIGN_OR_RAISE(auto compressed, Compress(codec, input_span)); + + ICEBERG_ASSIGN_OR_RAISE(auto offset, CurrentPosition()); + ICEBERG_RETURN_UNEXPECTED(WriteBytes(std::span(compressed))); + auto length = static_cast(compressed.size()); + + auto codec_name = CodecName(codec); + written_blobs_metadata_.push_back(BlobMetadata{ + .type = blob.type, + .input_fields = blob.input_fields, + .snapshot_id = blob.snapshot_id, + .sequence_number = blob.sequence_number, + .offset = offset, + .length = length, + .compression_codec = std::string(codec_name), + .properties = blob.properties, + }); + return {}; +} + +Result> PuffinWriter::Finish() { + if (finished_) { + return Invalid("Writer already finished"); + } + if (IsStreamMode() && !stream_) { + return Invalid("Failed to open output stream"); + } + + ICEBERG_RETURN_UNEXPECTED(WriteHeader()); + + FileMetadata file_metadata{ + .blobs = written_blobs_metadata_, + .properties = properties_, + }; + + auto footer_json = ToJsonString(file_metadata); + const auto footer_payload = std::span( + reinterpret_cast(footer_json.data()), footer_json.size()); + + // Footer start magic + ICEBERG_ASSIGN_OR_RAISE(auto footer_start, CurrentPosition()); + ICEBERG_RETURN_UNEXPECTED(WriteMagic()); + + // Footer payload + ICEBERG_RETURN_UNEXPECTED(WriteBytes(footer_payload)); + + // Footer struct: payload_size (4) + flags (4) + magic (4) + auto payload_size = static_cast(footer_payload.size()); + std::array size_buf{}; + WriteLittleEndian(payload_size, size_buf.data()); + ICEBERG_RETURN_UNEXPECTED(WriteBytes(size_buf)); + + // Flags (no compression for now) + std::array flags{}; + ICEBERG_RETURN_UNEXPECTED(WriteBytes(flags)); + + // Footer end magic + ICEBERG_RETURN_UNEXPECTED(WriteMagic()); + + ICEBERG_ASSIGN_OR_RAISE(auto end_pos, CurrentPosition()); + footer_size_ = end_pos - footer_start; + file_size_ = end_pos; + finished_ = true; + + if (IsStreamMode()) { + ICEBERG_RETURN_UNEXPECTED(stream_->Flush()); + ICEBERG_RETURN_UNEXPECTED(stream_->Close()); + return std::vector{}; + } + return std::move(buffer_); +} + +const std::vector& PuffinWriter::written_blobs_metadata() const { + return written_blobs_metadata_; +} + +std::optional PuffinWriter::footer_size() const { return footer_size_; } + +std::optional PuffinWriter::file_size() const { return file_size_; } + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_writer.h b/src/iceberg/puffin/puffin_writer.h new file mode 100644 index 000000000..7642606a2 --- /dev/null +++ b/src/iceberg/puffin/puffin_writer.h @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_writer.h +/// Puffin file writer. + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_data_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg { +class OutputFile; +class PositionOutputStream; +} // namespace iceberg + +namespace iceberg::puffin { + +/// \brief Writer for Puffin files. +/// +/// Supports two modes: +/// - Stream mode: writes directly to an OutputFile/PositionOutputStream. +/// - Memory mode: builds the file in an internal buffer and returns it via Finish(). +class ICEBERG_DATA_EXPORT PuffinWriter { + public: + /// \brief Construct a memory-mode writer. + /// \param properties File-level properties to include in the footer. + /// \param default_codec Default compression codec for blobs. + explicit PuffinWriter( + std::unordered_map properties = {}, + PuffinCompressionCodec default_codec = PuffinCompressionCodec::kNone); + + /// \brief Construct a stream-mode writer from an OutputFile. + /// \param output_file The output file to write to. + /// \param properties File-level properties to include in the footer. + /// \param default_codec Default compression codec for blobs. + PuffinWriter(std::unique_ptr output_file, + std::unordered_map properties = {}, + PuffinCompressionCodec default_codec = PuffinCompressionCodec::kNone); + + ~PuffinWriter(); + + /// \brief Add a blob to be written. + Status Add(const Blob& blob); + + /// \brief Finalize the file. + /// + /// In memory mode, returns the complete serialized file bytes. + /// In stream mode, flushes and closes the stream, returns empty vector. + Result> Finish(); + + /// \brief Get metadata for all blobs written so far. + const std::vector& written_blobs_metadata() const; + + /// \brief Get the footer size after Finish() has been called. + std::optional footer_size() const; + + /// \brief Get the total file size after Finish() has been called. + std::optional file_size() const; + + private: + /// Default compression codec for blobs without explicit compression. + PuffinCompressionCodec default_codec_; + /// File-level properties to include in the footer. + std::unordered_map properties_; + /// Buffer for memory mode. + std::vector buffer_; + /// Output stream for stream mode. + std::unique_ptr stream_; + /// Metadata for all blobs written so far. + std::vector written_blobs_metadata_; + /// Whether the header magic has been written. + bool header_written_ = false; + /// Whether Finish() has been called. + bool finished_ = false; + /// Footer size, set after Finish(). + std::optional footer_size_; + /// Total file size, set after Finish(). + std::optional file_size_; + + bool IsStreamMode() const { return stream_ != nullptr; } + Status WriteBytes(std::span data); + Status WriteHeader(); + Status WriteMagic(); + Result CurrentPosition() const; +}; + +} // namespace iceberg::puffin diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index afafc4c14..f58759160 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -142,7 +142,8 @@ add_iceberg_test(puffin_test USE_DATA SOURCES puffin_format_test.cc - puffin_json_test.cc) + puffin_json_test.cc + puffin_reader_writer_test.cc) if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(avro_test diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index e168d08bf..5077472d2 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -105,7 +105,11 @@ iceberg_tests = { 'use_data': true, }, 'puffin_test': { - 'sources': files('puffin_format_test.cc', 'puffin_json_test.cc'), + 'sources': files( + 'puffin_format_test.cc', + 'puffin_json_test.cc', + 'puffin_reader_writer_test.cc', + ), 'use_data': true, }, } diff --git a/src/iceberg/test/puffin_reader_writer_test.cc b/src/iceberg/test/puffin_reader_writer_test.cc new file mode 100644 index 000000000..df2964474 --- /dev/null +++ b/src/iceberg/test/puffin_reader_writer_test.cc @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include + +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/puffin/puffin_reader.h" +#include "iceberg/puffin/puffin_writer.h" +#include "iceberg/test/matchers.h" + +namespace iceberg::puffin { + +namespace { + +std::vector ToBytes(std::initializer_list values) { + std::vector result; + result.reserve(values.size()); + for (auto v : values) { + result.push_back(static_cast(v)); + } + return result; +} + +std::vector ToBytes(std::string_view str) { + return {reinterpret_cast(str.data()), + reinterpret_cast(str.data() + str.size())}; +} + +} // namespace + +// ============================================================================ +// PuffinWriter Tests +// ============================================================================ + +TEST(PuffinWriterTest, WriteEmptyFile) { + PuffinWriter writer; + auto result = writer.Finish(); + ASSERT_THAT(result, IsOk()); + auto& data = result.value(); + + // Header magic (4) + footer start magic (4) + JSON payload + footer struct (12) + EXPECT_GE(data.size(), 20u); + // Header magic + EXPECT_EQ(data[0], std::byte{0x50}); + EXPECT_EQ(data[1], std::byte{0x46}); + EXPECT_EQ(data[2], std::byte{0x41}); + EXPECT_EQ(data[3], std::byte{0x31}); + // Footer end magic + auto sz = data.size(); + EXPECT_EQ(data[sz - 4], std::byte{0x50}); + EXPECT_EQ(data[sz - 3], std::byte{0x46}); + EXPECT_EQ(data[sz - 2], std::byte{0x41}); + EXPECT_EQ(data[sz - 1], std::byte{0x31}); + + EXPECT_TRUE(writer.written_blobs_metadata().empty()); + ASSERT_TRUE(writer.footer_size().has_value()); +} + +TEST(PuffinWriterTest, WriterRejectsAfterFinish) { + PuffinWriter writer; + ASSERT_THAT(writer.Finish(), IsOk()); + + // Double finish + EXPECT_THAT(writer.Finish(), IsError(ErrorKind::kInvalid)); + + // Add after finish + Blob blob{.type = "a", .snapshot_id = 1, .sequence_number = 0}; + EXPECT_THAT(writer.Add(blob), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinWriterTest, WriteEmptyBlobData) { + PuffinWriter writer; + Blob blob{ + .type = "empty-blob", + .input_fields = {1}, + .snapshot_id = 1, + .sequence_number = 0, + .data = {}, + }; + ASSERT_THAT(writer.Add(blob), IsOk()); + ASSERT_EQ(writer.written_blobs_metadata().size(), 1); + EXPECT_EQ(writer.written_blobs_metadata()[0].offset, 4); + EXPECT_EQ(writer.written_blobs_metadata()[0].length, 0); + + auto result = writer.Finish(); + ASSERT_THAT(result, IsOk()); + + PuffinReader reader(result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + ASSERT_EQ(fm.value().blobs.size(), 1); + + auto blob_result = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob_result, IsOk()); + EXPECT_TRUE(blob_result.value().second.empty()); +} + +TEST(PuffinWriterTest, WriteLargeBlob) { + PuffinWriter writer; + std::vector large_data(4096); + for (size_t i = 0; i < large_data.size(); ++i) { + large_data[i] = static_cast(i & 0xFF); + } + ASSERT_THAT(writer.Add(Blob{.type = "large-blob", + .input_fields = {1, 2, 3}, + .snapshot_id = 999, + .sequence_number = 42, + .data = large_data}), + IsOk()); + EXPECT_EQ(writer.written_blobs_metadata()[0].length, 4096); + + auto result = writer.Finish(); + ASSERT_THAT(result, IsOk()); + + PuffinReader reader(result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + + auto blob_result = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob_result, IsOk()); + auto& read_data = blob_result.value().second; + ASSERT_EQ(read_data.size(), 4096); + for (size_t i = 0; i < read_data.size(); ++i) { + EXPECT_EQ(read_data[i], static_cast(i & 0xFF)) + << "mismatch at index " << i; + } +} + +// ============================================================================ +// Round-Trip Tests +// ============================================================================ + +TEST(PuffinRoundTripTest, SingleBlob) { + PuffinWriter writer({{"created-by", "test"}}); + EXPECT_FALSE(writer.footer_size().has_value()); + + std::vector blob_data = {0x01, 0x02, 0x03, 0x04, 0x05}; + ASSERT_THAT(writer.Add(Blob{.type = "test-blob", + .input_fields = {1, 2}, + .snapshot_id = 42, + .sequence_number = 7, + .data = blob_data}), + IsOk()); + EXPECT_EQ(writer.written_blobs_metadata().size(), 1); + EXPECT_EQ(writer.written_blobs_metadata()[0].type, "test-blob"); + EXPECT_EQ(writer.written_blobs_metadata()[0].offset, 4); + EXPECT_EQ(writer.written_blobs_metadata()[0].length, 5); + + auto file_result = writer.Finish(); + ASSERT_THAT(file_result, IsOk()); + ASSERT_TRUE(writer.footer_size().has_value()); + EXPECT_GT(writer.footer_size().value(), 0); + + PuffinReader reader(file_result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + ASSERT_EQ(fm.value().blobs.size(), 1); + EXPECT_EQ(fm.value().blobs[0].type, "test-blob"); + EXPECT_EQ(fm.value().properties.at("created-by"), "test"); + + auto blob_result = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob_result, IsOk()); + EXPECT_EQ(blob_result.value().second, ToBytes({0x01, 0x02, 0x03, 0x04, 0x05})); +} + +TEST(PuffinRoundTripTest, MultipleBlobs) { + PuffinWriter writer; + EXPECT_TRUE(writer.written_blobs_metadata().empty()); + + // Add first blob (no properties) + ASSERT_THAT(writer.Add(Blob{.type = "first", + .input_fields = {1}, + .snapshot_id = 1, + .sequence_number = 0, + .data = {'a', 'b', 'c'}}), + IsOk()); + EXPECT_EQ(writer.written_blobs_metadata().size(), 1); + + // Add second blob (with properties) + ASSERT_THAT(writer.Add(Blob{.type = "second", + .input_fields = {2}, + .snapshot_id = 2, + .sequence_number = 1, + .data = {'d', 'e', 'f', 'g'}, + .properties = {{"key", "val"}}}), + IsOk()); + // Second blob starts after header (4) + first blob (3) + EXPECT_EQ(writer.written_blobs_metadata()[1].offset, 7); + EXPECT_EQ(writer.written_blobs_metadata()[1].length, 4); + EXPECT_EQ(writer.written_blobs_metadata().size(), 2); + + EXPECT_FALSE(writer.footer_size().has_value()); + auto file_result = writer.Finish(); + ASSERT_THAT(file_result, IsOk()); + ASSERT_TRUE(writer.footer_size().has_value()); + + // Read back + PuffinReader reader(file_result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + ASSERT_EQ(fm.value().blobs.size(), 2); + EXPECT_TRUE(fm.value().blobs[0].properties.empty()); + EXPECT_EQ(fm.value().blobs[1].properties.at("key"), "val"); + + auto all = reader.ReadAll(fm.value().blobs); + ASSERT_THAT(all, IsOk()); + ASSERT_EQ(all.value().size(), 2); + EXPECT_EQ(all.value()[0].second, ToBytes("abc")); + EXPECT_EQ(all.value()[1].second, ToBytes("defg")); +} + +TEST(PuffinRoundTripTest, WithProperties) { + PuffinWriter writer({{"created-by", "iceberg-cpp-test"}}); + std::string text = "hello puffin"; + std::vector blob_data(text.begin(), text.end()); + ASSERT_THAT(writer.Add(Blob{.type = "text-blob", + .input_fields = {1}, + .snapshot_id = 100, + .sequence_number = 5, + .data = blob_data, + .properties = {{"encoding", "utf-8"}}}), + IsOk()); + auto file_result = writer.Finish(); + ASSERT_THAT(file_result, IsOk()); + + PuffinReader reader(file_result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + EXPECT_EQ(fm.value().properties.at("created-by"), "iceberg-cpp-test"); + ASSERT_EQ(fm.value().blobs.size(), 1); + EXPECT_EQ(fm.value().blobs[0].type, "text-blob"); + EXPECT_EQ(fm.value().blobs[0].properties.at("encoding"), "utf-8"); + + auto blob_result = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob_result, IsOk()); + EXPECT_EQ(blob_result.value().second, ToBytes("hello puffin")); +} + +// ============================================================================ +// PuffinReader Error Tests +// ============================================================================ + +TEST(PuffinReaderTest, ReadEmptyFile) { + PuffinWriter writer; + auto result = writer.Finish(); + ASSERT_THAT(result, IsOk()); + + PuffinReader reader(result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + EXPECT_TRUE(fm.value().blobs.empty()); + EXPECT_TRUE(fm.value().properties.empty()); +} + +TEST(PuffinReaderTest, InvalidMagic) { + auto bad_data = ToBytes({0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}); + PuffinReader reader(bad_data); + EXPECT_THAT(reader.ReadFileMetadata(), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinReaderTest, TruncatedFile) { + auto tiny = ToBytes({0x50, 0x46}); + PuffinReader reader(tiny); + EXPECT_THAT(reader.ReadFileMetadata(), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinReaderTest, InvalidBlobOffset) { + PuffinWriter writer; + auto file_result = writer.Finish(); + ASSERT_THAT(file_result, IsOk()); + + PuffinReader reader(file_result.value()); + BlobMetadata bad_meta{ + .type = "bad", + .snapshot_id = 1, + .sequence_number = 0, + .offset = 9999, + .length = 100, + }; + EXPECT_THAT(reader.ReadBlob(bad_meta), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinReaderTest, UnknownFlagsRejected) { + // Construct a valid puffin file but with unknown flag bits set + auto data = ToBytes({ + 0x50, 0x46, 0x41, 0x31, // header magic + 0x50, 0x46, 0x41, 0x31, // footer start magic + 0x7b, 0x22, 0x62, 0x6c, 0x6f, 0x62, + 0x73, 0x22, 0x3a, 0x5b, 0x5d, 0x7d, // {"blobs":[]} + 0x0c, 0x00, 0x00, 0x00, // payload size = 12 + 0x02, 0x00, 0x00, 0x00, // flags = bit 1 set (unknown) + 0x50, 0x46, 0x41, 0x31, // footer end magic + }); + + PuffinReader reader(data); + EXPECT_THAT(reader.ReadFileMetadata(), IsError(ErrorKind::kInvalid)); +} + +// ============================================================================ +// Java Binary Compatibility Tests +// ============================================================================ + +TEST(PuffinReaderTest, JavaEmptyPuffinCompatibility) { + auto java_empty = ToBytes({ + 0x50, 0x46, 0x41, 0x31, // header magic + 0x50, 0x46, 0x41, 0x31, // footer start magic + 0x7b, 0x22, 0x62, 0x6c, 0x6f, 0x62, + 0x73, 0x22, 0x3a, 0x5b, 0x5d, 0x7d, // {"blobs":[]} + 0x0c, 0x00, 0x00, 0x00, // payload size = 12 + 0x00, 0x00, 0x00, 0x00, // flags = 0 + 0x50, 0x46, 0x41, 0x31, // footer end magic + }); + + PuffinReader reader(java_empty); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + EXPECT_TRUE(fm.value().blobs.empty()); + EXPECT_TRUE(fm.value().properties.empty()); +} + +// Verify binary compatibility with Java's sample-metric-data-uncompressed.bin. +TEST(PuffinReaderTest, JavaSampleMetricDataCompatibility) { + // clang-format off + auto java_sample = ToBytes({ + 0x50, 0x46, 0x41, 0x31, + 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, + 0x73, 0x6f, 0x6d, 0x65, 0x20, 0x62, 0x6c, 0x6f, 0x62, 0x20, 0x00, 0x20, + 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x20, 0x64, 0x61, 0x74, 0x61, 0x20, + 0xf0, 0x9f, 0xa4, 0xaf, 0x20, 0x74, 0x68, 0x61, 0x74, 0x20, 0x69, 0x73, + 0x20, 0x6e, 0x6f, 0x74, 0x20, 0x76, 0x65, 0x72, 0x79, 0x20, 0x76, 0x65, + 0x72, 0x79, 0x20, 0x76, 0x65, 0x72, 0x79, 0x20, 0x76, 0x65, 0x72, 0x79, + 0x20, 0x76, 0x65, 0x72, 0x79, 0x20, 0x76, 0x65, 0x72, 0x79, 0x20, 0x6c, + 0x6f, 0x6e, 0x67, 0x2c, 0x20, 0x69, 0x73, 0x20, 0x69, 0x74, 0x3f, + 0x50, 0x46, 0x41, 0x31, + 0x7b, 0x22, 0x62, 0x6c, 0x6f, 0x62, 0x73, 0x22, 0x3a, 0x5b, 0x7b, 0x22, + 0x74, 0x79, 0x70, 0x65, 0x22, 0x3a, 0x22, 0x73, 0x6f, 0x6d, 0x65, 0x2d, + 0x62, 0x6c, 0x6f, 0x62, 0x22, 0x2c, 0x22, 0x66, 0x69, 0x65, 0x6c, 0x64, + 0x73, 0x22, 0x3a, 0x5b, 0x31, 0x5d, 0x2c, 0x22, 0x73, 0x6e, 0x61, 0x70, + 0x73, 0x68, 0x6f, 0x74, 0x2d, 0x69, 0x64, 0x22, 0x3a, 0x32, 0x2c, 0x22, + 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x2d, 0x6e, 0x75, 0x6d, + 0x62, 0x65, 0x72, 0x22, 0x3a, 0x31, 0x2c, 0x22, 0x6f, 0x66, 0x66, 0x73, + 0x65, 0x74, 0x22, 0x3a, 0x34, 0x2c, 0x22, 0x6c, 0x65, 0x6e, 0x67, 0x74, + 0x68, 0x22, 0x3a, 0x39, 0x7d, 0x2c, 0x7b, 0x22, 0x74, 0x79, 0x70, 0x65, + 0x22, 0x3a, 0x22, 0x73, 0x6f, 0x6d, 0x65, 0x2d, 0x6f, 0x74, 0x68, 0x65, + 0x72, 0x2d, 0x62, 0x6c, 0x6f, 0x62, 0x22, 0x2c, 0x22, 0x66, 0x69, 0x65, + 0x6c, 0x64, 0x73, 0x22, 0x3a, 0x5b, 0x32, 0x5d, 0x2c, 0x22, 0x73, 0x6e, + 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x2d, 0x69, 0x64, 0x22, 0x3a, 0x32, + 0x2c, 0x22, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x2d, 0x6e, + 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0x3a, 0x31, 0x2c, 0x22, 0x6f, 0x66, + 0x66, 0x73, 0x65, 0x74, 0x22, 0x3a, 0x31, 0x33, 0x2c, 0x22, 0x6c, 0x65, + 0x6e, 0x67, 0x74, 0x68, 0x22, 0x3a, 0x38, 0x33, 0x7d, 0x5d, 0x2c, 0x22, + 0x70, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x22, 0x3a, + 0x7b, 0x22, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x2d, 0x62, 0x79, + 0x22, 0x3a, 0x22, 0x54, 0x65, 0x73, 0x74, 0x20, 0x31, 0x32, 0x33, 0x34, + 0x22, 0x7d, 0x7d, + 0xf3, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x50, 0x46, 0x41, 0x31, + }); + // clang-format on + + PuffinReader reader(java_sample); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + ASSERT_EQ(fm.value().blobs.size(), 2); + EXPECT_EQ(fm.value().properties.at("created-by"), "Test 1234"); + + EXPECT_EQ(fm.value().blobs[0].type, "some-blob"); + EXPECT_EQ(fm.value().blobs[0].input_fields, std::vector{1}); + EXPECT_EQ(fm.value().blobs[0].snapshot_id, 2); + EXPECT_EQ(fm.value().blobs[0].offset, 4); + EXPECT_EQ(fm.value().blobs[0].length, 9); + + EXPECT_EQ(fm.value().blobs[1].type, "some-other-blob"); + EXPECT_EQ(fm.value().blobs[1].offset, 13); + EXPECT_EQ(fm.value().blobs[1].length, 83); + + auto blob1 = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob1, IsOk()); + EXPECT_EQ(blob1.value().second, ToBytes("abcdefghi")); + + auto blob2 = reader.ReadBlob(fm.value().blobs[1]); + ASSERT_THAT(blob2, IsOk()); + EXPECT_EQ(blob2.value().second.size(), 83); + EXPECT_EQ(blob2.value().second[10], std::byte{0x00}); + EXPECT_EQ(blob2.value().second[24], std::byte{0xf0}); + EXPECT_EQ(blob2.value().second[25], std::byte{0x9f}); + EXPECT_EQ(blob2.value().second[26], std::byte{0xa4}); + EXPECT_EQ(blob2.value().second[27], std::byte{0xaf}); + + auto all = reader.ReadAll(fm.value().blobs); + ASSERT_THAT(all, IsOk()); + ASSERT_EQ(all.value().size(), 2); +} + +} // namespace iceberg::puffin