Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ set(ICEBERG_DATA_SOURCES
deletes/roaring_position_bitmap.cc
puffin/file_metadata.cc
puffin/json_serde.cc
puffin/puffin_format.cc)
puffin/puffin_format.cc
puffin/puffin_reader.cc
puffin/puffin_writer.cc)

set(ICEBERG_DATA_STATIC_BUILD_INTERFACE_LIBS)
set(ICEBERG_DATA_SHARED_BUILD_INTERFACE_LIBS)
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ iceberg_data_sources = files(
'puffin/file_metadata.cc',
'puffin/json_serde.cc',
'puffin/puffin_format.cc',
'puffin/puffin_reader.cc',
'puffin/puffin_writer.cc',
)

# CRoaring does not export symbols, so on Windows it must
Expand Down
8 changes: 7 additions & 1 deletion src/iceberg/puffin/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
# under the License.

install_headers(
['file_metadata.h', 'puffin_format.h', 'type_fwd.h'],
[
'file_metadata.h',
'puffin_format.h',
'puffin_reader.h',
'puffin_writer.h',
'type_fwd.h',
],
subdir: 'iceberg/puffin',
)
24 changes: 12 additions & 12 deletions src/iceberg/puffin/puffin_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ constexpr std::pair<int, int> GetFlagPosition(PuffinFlag flag) {
std::unreachable();
}

} // namespace
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason to move both Result<std::vector<std::byte>> Compress and Result<std::vector<std::byte>> Decompress out of the namespace?


bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
return (flags[byte_num] & (1 << bit_num)) != 0;
}

void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
flags[byte_num] |= (1 << bit_num);
}

// TODO(zhaoxuan1994): Move compression logic to a unified codec interface.
Result<std::vector<std::byte>> Compress(PuffinCompressionCodec codec,
std::span<const std::byte> input) {
Expand Down Expand Up @@ -63,16 +75,4 @@ Result<std::vector<std::byte>> Decompress(PuffinCompressionCodec codec,
std::unreachable();
}

} // namespace

bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
return (flags[byte_num] & (1 << bit_num)) != 0;
}

void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
flags[byte_num] |= (1 << bit_num);
}

} // namespace iceberg::puffin
10 changes: 10 additions & 0 deletions src/iceberg/puffin/puffin_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
/// Puffin file format constants and utilities.

#include <array>
#include <cstddef>
#include <cstdint>
#include <span>
#include <vector>

#include "iceberg/iceberg_data_export.h"
#include "iceberg/puffin/file_metadata.h"
Expand Down Expand Up @@ -66,4 +68,12 @@ ICEBERG_DATA_EXPORT bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag
/// \brief Set a flag in the flags bytes.
ICEBERG_DATA_EXPORT void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag);

/// \brief Compress data using the specified codec.
ICEBERG_DATA_EXPORT Result<std::vector<std::byte>> Compress(
PuffinCompressionCodec codec, std::span<const std::byte> input);

/// \brief Decompress data using the specified codec.
ICEBERG_DATA_EXPORT Result<std::vector<std::byte>> Decompress(
PuffinCompressionCodec codec, std::span<const std::byte> input);

} // namespace iceberg::puffin
204 changes: 204 additions & 0 deletions src/iceberg/puffin/puffin_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/puffin/puffin_reader.h"

#include <algorithm>
#include <array>
#include <cstdint>
#include <cstring>
#include <string_view>

#include "iceberg/file_io.h"
#include "iceberg/puffin/json_serde_internal.h"
#include "iceberg/puffin/puffin_format.h"
#include "iceberg/util/endian.h"
#include "iceberg/util/macros.h"

namespace iceberg::puffin {

namespace {

// Validate magic bytes in a buffer at the given offset.
Status CheckMagic(std::span<const std::byte> data, int64_t offset) {
if (offset < 0 ||
offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) {
return Invalid("Invalid file: cannot read magic at offset {}", offset);
}
auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset);
if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) {
return Invalid(
"Invalid file: expected magic at offset {}, got [{:#04x}, {:#04x}, "
"{:#04x}, {:#04x}]",
offset, begin[0], begin[1], begin[2], begin[3]);
}
return {};
}

// Validate that no unknown flag bits are set.
Status CheckUnknownFlags(std::span<const uint8_t, 4> flags) {
constexpr uint8_t kKnownBitsMask = 0x01;
if ((flags[0] & ~kKnownBitsMask) != 0 || flags[1] != 0 || flags[2] != 0 ||
flags[3] != 0) {
return Invalid(
"Invalid file: unknown footer flags set [{:#04x}, {:#04x}, {:#04x}, {:#04x}]",
flags[0], flags[1], flags[2], flags[3]);
}
return {};
}

} // namespace

PuffinReader::PuffinReader(std::span<const std::byte> data)
: data_(data), file_size_(static_cast<int64_t>(data.size())) {}

PuffinReader::PuffinReader(std::unique_ptr<InputFile> input_file)
: input_file_(std::move(input_file)) {}

PuffinReader::~PuffinReader() = default;

Result<std::vector<std::byte>> PuffinReader::ReadBytes(int64_t offset, int64_t length) {
if (IsFileMode()) {
if (!stream_) {
ICEBERG_ASSIGN_OR_RAISE(stream_, input_file_->Open());
}
std::vector<std::byte> buf(length);
ICEBERG_RETURN_UNEXPECTED(stream_->ReadFully(offset, buf));
return buf;
}
// Memory mode
if (offset < 0 || length < 0 || offset > file_size_ || length > file_size_ - offset) {
return Invalid("Read out of bounds: offset {} + length {} exceeds file size {}",
offset, length, file_size_);
}
return std::vector<std::byte>(data_.data() + offset, data_.data() + offset + length);
}

Result<FileMetadata> PuffinReader::ReadFileMetadata() {
// Get file size
if (IsFileMode()) {
ICEBERG_ASSIGN_OR_RAISE(file_size_, input_file_->Size());
}

if (file_size_ < PuffinFormat::kFooterStructLength) {
return Invalid("Invalid file: file length {} is less than minimal footer size {}",
file_size_, PuffinFormat::kFooterStructLength);
}

// Validate header magic
ICEBERG_ASSIGN_OR_RAISE(auto header_bytes, ReadBytes(0, PuffinFormat::kMagicLength));
ICEBERG_RETURN_UNEXPECTED(CheckMagic(header_bytes, 0));

// Read footer struct from end of file
auto footer_struct_offset = file_size_ - PuffinFormat::kFooterStructLength;
ICEBERG_ASSIGN_OR_RAISE(
auto footer_struct,
ReadBytes(footer_struct_offset, PuffinFormat::kFooterStructLength));

// Validate footer end magic
ICEBERG_RETURN_UNEXPECTED(
CheckMagic(footer_struct, PuffinFormat::kFooterStructMagicOffset));

// Read payload size
auto payload_size = ReadLittleEndian<int32_t>(
footer_struct.data() + PuffinFormat::kFooterStructPayloadSizeOffset);

if (payload_size < 0) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should payload_size == 0 also be considered as an error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

payload_size == 0 is valid — it represents an empty footer payload (e.g. {"blobs":[]}). Java's PuffinReader also does not reject zero payload size. Only negative values indicate corruption.

return Invalid("Invalid file: negative payload size {}", payload_size);
}

// Calculate total footer size and validate
int64_t footer_size = PuffinFormat::kFooterStartMagicLength +
static_cast<int64_t>(payload_size) +
PuffinFormat::kFooterStructLength;
auto footer_offset = file_size_ - footer_size;
if (footer_offset < 0) {
return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size,
file_size_);
}

// Validate footer start magic
ICEBERG_ASSIGN_OR_RAISE(auto footer_start_magic,
ReadBytes(footer_offset, PuffinFormat::kMagicLength));
ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer_start_magic, 0));

// Check flags
std::array<uint8_t, 4> flags{};
std::memcpy(flags.data(), footer_struct.data() + PuffinFormat::kFooterStructFlagsOffset,
4);
ICEBERG_RETURN_UNEXPECTED(CheckUnknownFlags(flags));

PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone;
if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please validate that all unknown/reserved footer flag bits are unset. Java's PuffinReader rejects unknown flags, and the spec says reserved bits should be 0; silently ignoring them may cause C++ to accept future or invalid files and interpret the footer incorrectly.

footer_compression = PuffinFormat::kDefaultFooterCompressionCodec;
}

// Read and decompress footer payload
auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength;
ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes, ReadBytes(payload_offset, payload_size));
ICEBERG_ASSIGN_OR_RAISE(auto decompressed,
Decompress(footer_compression, payload_bytes));

// Parse JSON
std::string_view json_str(reinterpret_cast<const char*>(decompressed.data()),
decompressed.size());
return FileMetadataFromJsonString(json_str);
}

Result<std::pair<BlobMetadata, std::vector<std::byte>>> PuffinReader::ReadBlob(
const BlobMetadata& blob_metadata) {
if (blob_metadata.offset < 0 || blob_metadata.length < 0 ||
blob_metadata.offset > file_size_ ||
blob_metadata.length > file_size_ - blob_metadata.offset) {
return Invalid("Invalid blob: offset {} + length {} exceeds file size {}",
blob_metadata.offset, blob_metadata.length, file_size_);
}

ICEBERG_ASSIGN_OR_RAISE(auto raw_data,
ReadBytes(blob_metadata.offset, blob_metadata.length));

// Determine compression codec
ICEBERG_ASSIGN_OR_RAISE(
auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec));
ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data));

return std::pair{blob_metadata, std::move(decompressed)};
}

Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>>
PuffinReader::ReadAll(const std::vector<BlobMetadata>& blobs) {
// Sort by offset for sequential I/O access pattern
std::vector<const BlobMetadata*> sorted;
sorted.reserve(blobs.size());
for (const auto& blob : blobs) {
sorted.push_back(&blob);
}
std::sort(sorted.begin(), sorted.end(),

Check warning on line 192 in src/iceberg/puffin/puffin_reader.cc

View workflow job for this annotation

GitHub Actions / cpp-linter

src/iceberg/puffin/puffin_reader.cc:192:3 [modernize-use-ranges]

use a ranges version of this algorithm
[](const auto* a, const auto* b) { return a->offset < b->offset; });

std::vector<std::pair<BlobMetadata, std::vector<std::byte>>> results;
results.reserve(blobs.size());
for (const auto* blob : sorted) {
ICEBERG_ASSIGN_OR_RAISE(auto blob_pair, ReadBlob(*blob));
results.push_back(std::move(blob_pair));
}
return results;
}

} // namespace iceberg::puffin
86 changes: 86 additions & 0 deletions src/iceberg/puffin/puffin_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/puffin/puffin_reader.h
/// Puffin file reader.

#include <cstddef>
#include <cstdint>
#include <memory>
#include <span>
#include <utility>
#include <vector>

#include "iceberg/iceberg_data_export.h"
#include "iceberg/puffin/file_metadata.h"
#include "iceberg/result.h"

namespace iceberg {
class InputFile;
class SeekableInputStream;
} // namespace iceberg

namespace iceberg::puffin {

/// \brief Reader for Puffin files.
///
/// Supports two modes:
/// - Memory mode: parses from an in-memory buffer.
/// - File mode: reads from an InputFile with seek support (for DV use case).
class ICEBERG_DATA_EXPORT PuffinReader {
public:
/// \brief Construct a memory-mode reader from file data.
explicit PuffinReader(std::span<const std::byte> data);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue of this ctor is that we lose the capability of seeking into a segment of a puffin file, which is the main use case of v3 deletion vector.


/// \brief Construct a file-mode reader from an InputFile.
explicit PuffinReader(std::unique_ptr<InputFile> input_file);

~PuffinReader();

/// \brief Read and return the file metadata from the footer.
Result<FileMetadata> ReadFileMetadata();

/// \brief Read a specific blob's data by its metadata.
/// \param blob_metadata The metadata describing the blob to read.
/// \return A pair of (BlobMetadata, decompressed data), or an error.
Result<std::pair<BlobMetadata, std::vector<std::byte>>> ReadBlob(
const BlobMetadata& blob_metadata);

/// \brief Read all blobs described in the file metadata.
/// \return A vector of (BlobMetadata, decompressed data) pairs, or an error.
Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>> ReadAll(
const std::vector<BlobMetadata>& blobs);

private:
/// In-memory data for memory mode.
std::span<const std::byte> data_;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::span<const std::byte> data_;
const std::span<const std::byte> data_;

/// Input file for file mode.
std::unique_ptr<InputFile> input_file_;
/// Opened stream (lazily opened in file mode).
std::unique_ptr<SeekableInputStream> stream_;
/// Cached file size.
int64_t file_size_ = 0;

bool IsFileMode() const { return input_file_ != nullptr; }
Result<std::vector<std::byte>> ReadBytes(int64_t offset, int64_t length);
};

} // namespace iceberg::puffin
Loading
Loading