Phase 8: Analytics (Columnar Storage & Vectorized Execution)#10
Phase 8: Analytics (Columnar Storage & Vectorized Execution)#10
Conversation
…oject, Aggregate)
|
Caution Review failedPull request was closed or merged during review 📝 WalkthroughWalkthroughAdds a columnar storage layer, vectorized execution engine and operators, vectorized expression evaluation, columnar table I/O, related tests, docs for distributed joins/replication/analytics, small StorageManager/path helpers, and minor build/test/workflow updates. Several public types and APIs are added or moved. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant SeqScan as VectorizedSeqScan
participant Columnar as ColumnarTable
participant Filter as VectorizedFilter
participant Project as VectorizedProject
participant Batch as VectorBatch
Client->>SeqScan: next_batch()
SeqScan->>Columnar: read_batch(start_row, batch_size)
Columnar-->>SeqScan: VectorBatch (columns)
SeqScan-->>Filter: pass VectorBatch
Filter->>Filter: evaluate_vectorized(condition)
Filter-->>Project: filtered VectorBatch
Project->>Project: evaluate_vectorized(expressions)
Project-->>Client: projected VectorBatch
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested Labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…ptimize analytics pipeline - Fix critical RPC typo in RpcClient. - Implement 'evaluate_vectorized' interface across all Expression subclasses. - Add optimized INT64 comparison paths for BinaryExpr. - Refactor VectorizedFilter and VectorizedProject to leverage batch-level evaluation. - Update NumericVector<bool> to use uint8_t for raw data access compatibility. - Expand VectorBatch support for varied integer types. - All tests passing with improved performance.
- Fix heap-buffer-overflow in RequestVoteArgs::serialize() by correcting BASE_SIZE. - Fix stack-buffer-overflow in distributed_txn_tests by using RpcHeader::HEADER_SIZE instead of hardcoded 8. - Ensure all tests pass with AddressSanitizer enabled.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
include/distributed/raft_types.hpp (1)
58-69:⚠️ Potential issue | 🔴 CriticalCritical: Deserializer does not read newly serialized log fields, breaking Raft's log-up-to-date requirement.
The
serialize()method outputslast_log_indexandlast_log_term(at offsets16+id_lenand24+id_len), buthandle_request_vote()insrc/distributed/raft_group.cpp:202-227:
- Checks for minimum 24-byte payload (insufficient for the new 32 + id_len format)
- Reads only
term,id_len, andcandidate_id- Never deserializes
last_log_indexandlast_log_term- Never checks whether the candidate's log is up-to-date before granting votes
Raft requires votes only for candidates whose logs are at least as up-to-date as the voter's. Without this check, any candidate can become leader even with stale logs, violating Raft safety guarantees.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@include/distributed/raft_types.hpp` around lines 58 - 69, The serialize() for the vote RPC now appends last_log_index and last_log_term after candidate_id but handle_request_vote() still parses only term, id_len and candidate_id and enforces a 24-byte minimum; update handle_request_vote() to expect the new BASE_SIZE (32)+id_len minimum, deserialize last_log_index and last_log_term (reading the fields written at offsets 16+id_len and 24+id_len), and enforce the Raft up-to-date check by comparing the candidate's last_log_term/last_log_index against the local log's last term/index (use your local variables that track the voter's last log term/index) before granting a vote. Ensure error handling for short payloads and malformed id_len remains consistent.
🧹 Nitpick comments (2)
tests/analytics_tests.cpp (1)
23-56: Broaden coverage beyond the INT64 happy path.These tests only exercise non-null
TYPE_INT64columns, so they will not catch the nullable-comparison bug or the read/write type mismatch in the new analytics path. Adding one nullable predicate case and oneTYPE_FLOAT64round-trip would make this suite much more protective.Also applies to: 58-116, 118-152
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/analytics_tests.cpp` around lines 23 - 56, Extend the ColumnarTableLifecycle test to cover nullable comparisons and a TYPE_FLOAT64 round-trip: add a second nullable column in Schema (e.g., "maybe_val" with nullable flag), populate some rows with null and non-null values using Tuple/Value::make_null()/make_double(), then use VectorizedSeqScanOperator with a predicate that filters on the nullable column to assert correct handling of NULL vs value comparisons; also add a column of TYPE_FLOAT64 (use NumericVector<double>), append and read back via scan and ASSERT/EXPECT that floating values round-trip exactly (or within tolerance) to catch read/write type mismatch issues; use existing helpers/functions ColumnarTable::append_batch, VectorizedSeqScanOperator::next_batch, NumericVector, and common::ValueType::TYPE_FLOAT64 to locate where to modify tests.include/executor/vectorized_operator.hpp (1)
112-118: Row-by-row append is not vectorized.The comment says "Optimized" but this loop builds a
std::vector<Value>, copies each column value individually, constructs aTuple, and appends row-by-row. This defeats the vectorization benefits.A truly vectorized approach would use a selection vector (array of indices for passing rows) and batch-copy only the selected rows, avoiding per-row allocations.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@include/executor/vectorized_operator.hpp` around lines 112 - 118, The current "Optimized" block does per-row allocation and append via building a Tuple and calling out_batch.append_tuple, which defeats vectorization; instead build a selection vector of row indices for the rows to keep (e.g. collect r into std::vector<size_t> sel) and then copy columns in column-major fashion: for each column from input_batch_->get_column(c) call the column/batch-level API to append only the selected indices into out_batch (avoid constructing Tuple and per-row push_backs); replace the Tuple/append_tuple path with a batch-level append (e.g. an append_selected or append_values_from method) so out_batch is populated by per-column bulk copies rather than row-by-row copies.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/phases/PHASE_8_ANALYTICS.md`:
- Around line 8-23: The docs reference incorrect source paths; update the three
file references so they point to the actual locations in the PR: change
storage/columnar_table.cpp to src/storage/columnar_table.cpp, executor/types.hpp
to include/executor/types.hpp, and executor/vectorized_operator.hpp to
include/executor/vectorized_operator.hpp in PHASE_8_ANALYTICS.md (preserve the
same descriptions and unique symbols like ColumnVector, VectorBatch, and
Vectorized Operators).
In `@include/executor/types.hpp`:
- Line 159: The is_null(size_t index) method reads null_bitmap_[index] without
validating bounds, risking UB; update is_null in the class to check index <
size_ (and optionally index >= 0 if needed) before accessing null_bitmap_,
returning false or throwing/asserting on invalid index; reference the is_null
method, null_bitmap_ member and size_ member when adding the guard so callers
cannot cause out-of-bounds access.
- Around line 279-284: append_tuple currently assumes Tuple::size() equals
columns_.size(), which can cause out-of-bounds access or inconsistent column
lengths; update append_tuple to validate sizes before mutating state (e.g.,
check if tuple.size() == columns_.size() or tuple.size() <= columns_.size()
depending on intended semantics), and either assert/throw a clear error
referencing append_tuple, columns_ and Tuple::size(), or pad/fill missing
columns as required; perform the size check first and only increment row_count_
after all column append operations succeed to avoid leaving the object in a
partially-updated state.
- Around line 262-265: The default branch currently creates a
NumericVector<int64_t> for unknown types (via
add_column(std::make_unique<NumericVector<int64_t>>(col.type()))), which will
corrupt string values (e.g., TYPE_STRING); update the switch so string types are
handled explicitly: detect TYPE_STRING and call add_column with the appropriate
string vector implementation (e.g., make_unique<StringVector>(col.type())) or,
if no string vector exists, throw a clear runtime_error/logic_error for
unsupported column types instead of falling back to NumericVector<int64_t>;
ensure you modify the switch/default near add_column and NumericVector<int64_t>
to prevent silent data corruption.
- Around line 198-205: NumericVector::get currently reads null_bitmap_[index]
and data_[index] without checking bounds causing potential OOB; mimic
Tuple::get() behavior by first verifying index < size_ and if not return
common::Value::make_null(), otherwise proceed to check null_bitmap_[index] and
construct the appropriate common::Value from data_ (handle int64_t, double,
bool) so all accesses to null_bitmap_ and data_ are guarded by the bounds check.
In `@include/executor/vectorized_operator.hpp`:
- Around line 207-216: The SUM branch currently force-casts the input column to
NumericVector<int64_t> (in the AggregateType::Sum handling) which will throw on
other numeric types and also treats all-null as zero; update the logic in
vectorized_operator.hpp so Sum is type-safe: inspect col.type() and handle each
numeric ValueType (e.g., TYPE_INT64, TYPE_INT32, TYPE_DOUBLE, etc.) with the
appropriate NumericVector<T> read path instead of an unconditional dynamic_cast,
accumulate into the correct result accumulator (results_int_ or a double
accumulator) and introduce a per-aggregate has_value_ flag (e.g.,
std::vector<bool> has_value_) that you set true whenever a non-null value is
seen; finally, when materializing SUM outputs, emit NULL for aggregates where
has_value_ is false to match SQL semantics.
- Around line 110-118: The loop in vectorized_operator that currently calls
selection_mask_->get(r).as_bool() can throw on NULL Values; update the condition
in the for-loop inside evaluate_vectorized (the block using input_batch_,
selection_mask_, out_batch and append_tuple/Tuple) to first check the Value for
NULL (e.g., use Value::is_null() or equivalent) and treat NULL as false (skip
the row) instead of calling as_bool(); only call as_bool() when the Value is
non-null so NULLs in WHERE-like expressions are excluded rather than causing a
runtime_error.
- Around line 203-217: The loop over aggregates_ in VectorizedOperator (in
include/executor/vectorized_operator.hpp) only handles AggregateType::Count and
AggregateType::Sum, leaving results_int_[i] unchanged for other types; update
the for-loop to explicitly handle unsupported/unknown AggregateType values by
either setting an error flag or throwing an exception (e.g., throw
std::runtime_error or call set_error) when encountering types other than
Count/Sum, and ensure the check occurs where aggregates_ and results_int_ are
updated (reference symbols: aggregates_, AggregateType, results_int_,
input_batch_, NumericVector<int64_t>, get_column, is_null) so silent zeros
cannot occur.
In `@src/parser/expression.cpp`:
- Around line 83-109: The optimized INT64 path in expression.cpp (`src_col`,
`const_expr`, `op_` handling for TokenType::Gt/TokenType::Eq) currently reads
`raw_data()` without checking `src_col.is_null(i)`, causing NULLs to be treated
as false; update the TokenType::Eq branch to mirror the TokenType::Gt
null-handling: after resizing `bool_res` and obtaining `res_data` (and any null
bitmap access on `bool_res`), loop over rows and if `src_col.is_null(i)` mark
the result null (or set the null bit) for that index, otherwise write the
comparison `src_data[i] == const_val` into `res_data[i]`; use the same helper
calls used in the Gt branch to set nulls so semantics match exactly.
In `@src/storage/columnar_table.cpp`:
- Around line 57-65: append_batch() only serializes TYPE_INT64 and TYPE_FLOAT64
but read_batch() only reconstructs TYPE_INT64, enabling silent corruption;
update read_batch() to mirror the same type switch used in append_batch()
(inspect schema_.get_column(i).type() and handle common::ValueType::TYPE_INT64
and TYPE_FLOAT64 by reading into executor::NumericVector<int64_t> and
executor::NumericVector<double> respectively) and for any other types
immediately fail fast (throw or return an error) rather than advancing
row_count_ and skipping payloads; ensure the same symmetric type handling is
applied in the other affected block (lines ~93-114) so serialization and
deserialization are consistent.
- Around line 76-90: ColumnarTable::read_batch clears out_batch then immediately
calls out_batch.get_column(i) which can index into an empty VectorBatch; before
the for-loop ensure out_batch is initialized to the schema's column count and
types (e.g. call whatever initializer/resize/setup routine exists on VectorBatch
to create schema_.column_count() columns or populate columns based on schema_)
and validate that those calls succeeded (or return false) so get_column(i) is
safe; reference symbols: ColumnarTable::read_batch, out_batch.clear(),
out_batch.get_column(i), schema_.column_count().
- Around line 15-27: The code builds file names directly from name_ (e.g.,
meta_path = name_ + ".meta.bin" and column files built inside the loop that uses
schema_.column_count()), which bypasses StorageManager and writes into the
process cwd; change these sites to resolve/obtain file locations via the
StorageManager API (use the table's StorageManager instance to convert name_ +
suffix into a storage-rooted path or to open output streams through
StorageManager) instead of string-concatenating names, and apply the same change
to the other occurrences mentioned (the other meta/column open calls at the
indicated ranges) so all meta, .nulls.bin and .data.bin files are created/opened
under the configured storage root and tracked by StorageManager.
---
Outside diff comments:
In `@include/distributed/raft_types.hpp`:
- Around line 58-69: The serialize() for the vote RPC now appends last_log_index
and last_log_term after candidate_id but handle_request_vote() still parses only
term, id_len and candidate_id and enforces a 24-byte minimum; update
handle_request_vote() to expect the new BASE_SIZE (32)+id_len minimum,
deserialize last_log_index and last_log_term (reading the fields written at
offsets 16+id_len and 24+id_len), and enforce the Raft up-to-date check by
comparing the candidate's last_log_term/last_log_index against the local log's
last term/index (use your local variables that track the voter's last log
term/index) before granting a vote. Ensure error handling for short payloads and
malformed id_len remains consistent.
---
Nitpick comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 112-118: The current "Optimized" block does per-row allocation and
append via building a Tuple and calling out_batch.append_tuple, which defeats
vectorization; instead build a selection vector of row indices for the rows to
keep (e.g. collect r into std::vector<size_t> sel) and then copy columns in
column-major fashion: for each column from input_batch_->get_column(c) call the
column/batch-level API to append only the selected indices into out_batch (avoid
constructing Tuple and per-row push_backs); replace the Tuple/append_tuple path
with a batch-level append (e.g. an append_selected or append_values_from method)
so out_batch is populated by per-column bulk copies rather than row-by-row
copies.
In `@tests/analytics_tests.cpp`:
- Around line 23-56: Extend the ColumnarTableLifecycle test to cover nullable
comparisons and a TYPE_FLOAT64 round-trip: add a second nullable column in
Schema (e.g., "maybe_val" with nullable flag), populate some rows with null and
non-null values using Tuple/Value::make_null()/make_double(), then use
VectorizedSeqScanOperator with a predicate that filters on the nullable column
to assert correct handling of NULL vs value comparisons; also add a column of
TYPE_FLOAT64 (use NumericVector<double>), append and read back via scan and
ASSERT/EXPECT that floating values round-trip exactly (or within tolerance) to
catch read/write type mismatch issues; use existing helpers/functions
ColumnarTable::append_batch, VectorizedSeqScanOperator::next_batch,
NumericVector, and common::ValueType::TYPE_FLOAT64 to locate where to modify
tests.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5e46ffd6-688b-4516-9dee-5880790701c7
📒 Files selected for processing (18)
CMakeLists.txtREADME.mddocs/phases/PHASE_6_DISTRIBUTED_JOIN.mddocs/phases/PHASE_7_REPLICATION_HA.mddocs/phases/PHASE_8_ANALYTICS.mddocs/phases/README.mdinclude/distributed/raft_types.hppinclude/executor/operator.hppinclude/executor/types.hppinclude/executor/vectorized_operator.hppinclude/parser/expression.hppinclude/storage/columnar_table.hppplans/CPP_MIGRATION_PLAN.mdsrc/network/rpc_client.cppsrc/parser/expression.cppsrc/storage/columnar_table.cpptests/analytics_tests.cpptests/distributed_txn_tests.cpp
💤 Files with no reviewable changes (1)
- include/executor/operator.hpp
- Implement Raft up-to-date safety check in handle_request_vote. - Add bounds checks and robust initialization to VectorBatch/ColumnVector. - Optimize VectorizedFilter with selection vectors and batch appending. - Enhance VectorizedAggregate with FLOAT64 support and proper NULL semantics. - Fix NULL propagation in optimized expression paths. - Ensure ColumnarTable uses StorageManager for path resolution. - Professionalize code comments and extend integration tests.
This PR completes Phase 8 by implementing columnar storage and a vectorized execution engine for high-performance analytical queries. Key features include:
Summary by CodeRabbit
New Features
Documentation
Tests