Skip to content

Phase 7: Replication & High Availability (v2)#9

Merged
poyrazK merged 15 commits intomainfrom
feature/phase-7-replication-ha-v2
Mar 5, 2026
Merged

Phase 7: Replication & High Availability (v2)#9
poyrazK merged 15 commits intomainfrom
feature/phase-7-replication-ha-v2

Conversation

@poyrazK
Copy link
Owner

@poyrazK poyrazK commented Mar 4, 2026

Implements Multi-Group Raft infrastructure, binary log replication, leader-aware routing, and automatic failover. This PR contains 11 logical commits for easier review. Verified with multi-raft integration tests.

Summary by CodeRabbit

  • New Features

    • Added shard consensus integration for Data mode in distributed setups.
  • Refactor

    • Improved worker thread management with synchronized thread tracking and lifecycle handling.
  • Chores

    • Updated CI workflow to execute cloudSQL tests and build artifacts from the correct directory.
    • Added recovery log record source to build configuration.
    • Extensive code formatting and whitespace cleanup across multiple files for improved maintainability.

@poyrazK poyrazK added documentation Improvements or additions to documentation enhancement New feature or request labels Mar 4, 2026
@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

Warning

Rate limit exceeded

@poyrazK has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 2 minutes and 49 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ae21798d-1423-40af-8e0e-0e25a5666344

📥 Commits

Reviewing files that changed from the base of the PR and between 6a1e3e1 and b119709.

📒 Files selected for processing (6)
  • CMakeLists.txt
  • src/catalog/catalog.cpp
  • tests/distributed_tests.cpp
  • tests/multi_raft_tests.cpp
  • tests/raft_simulation_tests.cpp
  • tests/raft_tests.cpp
📝 Walkthrough

Walkthrough

This pull request introduces a mix of formatting adjustments and functional enhancements across the codebase. Key changes include: (1) RPC server refactoring to track worker threads with mutex-protected collection and joining; (2) shard consensus group initialization for Data mode in the main application; (3) CI workflow updates from sqlEngine to cloudSQL naming; and (4) CMakeLists.txt source additions and test condition replacement. Most remaining changes are cosmetic whitespace and formatting adjustments.

Changes

Cohort / File(s) Summary
Formatting & Alignment
include/catalog/catalog.hpp, include/common/cluster_manager.hpp, include/network/rpc_client.hpp, plans/CPP_MIGRATION_PLAN.md, src/catalog/catalog.cpp, src/distributed/distributed_executor.cpp, src/distributed/raft_group.cpp, src/distributed/raft_manager.cpp, src/executor/query_executor.cpp, src/network/rpc_client.cpp, src/network/server.cpp, tests/multi_raft_tests.cpp
Whitespace adjustments, line wrapping, and cosmetic reformatting across declarations, initializer lists, and function calls; no semantic or behavioral changes.
RPC Server Thread Management
include/network/rpc_server.hpp, src/network/rpc_server.cpp
Added worker thread tracking via worker_threads_ vector and worker_mutex_ for synchronized access. Modified stop() to collect and join threads; refactored accept_loop() to manage threads via mutex instead of detaching. No public API signature changes.
Application Initialization
src/main.cpp
Integrated shard consensus in Data mode by creating shard_group (group 1) from raft_manager and instantiating static ShardStateMachine. Includes minor formatting of surrounding code.
Build Configuration & CI
CMakeLists.txt, .github/workflows/ci.yml
Updated CMakeLists.txt with src/recovery/log_record.cpp source addition and replaced BUILD_TESTING condition with BUILD_TESTS. Updated CI workflow to execute cloudSQL_tests and upload from build/cloudSQL instead of sqlEngine equivalents.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 A rabbit hops through threads that now align,
Worker squads join ranks in synchrony fine,
Shard consensus blooms where data shall dwell,
Whitespace whispers—formatting's spell!

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 21.88% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title is vague and does not accurately reflect the specific changes in the changeset. The PR contains primarily whitespace/formatting adjustments and minor structural refactoring across multiple files, with some functional additions (worker thread management, shard group integration) that are secondary to the overall formatting focus. Consider a more specific title that accurately describes the primary changes, such as 'Refactor: Formatting and worker thread management' or align the implementation with the ambitious description in the PR objectives.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/phase-7-replication-ha-v2

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@poyrazK poyrazK force-pushed the feature/phase-7-replication-ha-v2 branch from 0c5c72d to 42ef06e Compare March 4, 2026 13:02
@poyrazK poyrazK force-pushed the feature/phase-7-replication-ha-v2 branch from ee2e685 to 41dd753 Compare March 4, 2026 13:25
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/distributed/distributed_executor.cpp (1)

323-339: ⚠️ Potential issue | 🟠 Major

Shard fallback routing is nondeterministic due unstable node ordering.

shard_idx is used as a positional index into data_nodes, but that vector is built from unordered_map iteration order. This can route the same key to different nodes across processes/restarts.

🛠️ Minimal deterministic fix
     auto data_nodes = cluster_manager_.get_data_nodes();
+    std::sort(data_nodes.begin(), data_nodes.end(),
+              [](const cluster::NodeInfo& a, const cluster::NodeInfo& b) {
+                  return a.id < b.id;
+              });

A stronger long-term fix is routing via explicit shard→node metadata instead of vector index.

Also applies to: 357-373

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/distributed/distributed_executor.cpp` around lines 323 - 339, The
fallback routing uses shard_idx as a positional index into data_nodes (built
from unordered_map), which is nondeterministic; make the fallback deterministic
by establishing a stable ordering before indexing — e.g., sort data_nodes by a
stable key (node.id) or use explicit shard→node metadata instead of positional
indexing; update the logic around cluster::ShardManager::compute_shard,
data_nodes, shard_idx, cluster_manager_.get_leader and target_nodes to use the
stable ordering (apply the same change to the other occurrence around the
357-373 block).
src/network/rpc_client.cpp (1)

80-88: ⚠️ Potential issue | 🔴 Critical

Handle partial TCP I/O; current framing can break under load.

At lines 80, 85, 123, 128, send() is checked for <= 0 but doesn't verify full transmission—partial writes (0 < n < requested_size) cause the code to continue with incomplete frames. At lines 92 and 100, recv(..., MSG_WAITALL) is checked for <= 0 but doesn't verify exact byte count—partial reads can be accepted, leaving incomplete frame data. This corrupts RPC serialization and causes silent protocol failures under network stress.

The issue affects both call() and send_only() methods. Implement loop-based helpers send_all() and recv_exact() to ensure complete transmission and reception before proceeding.

🛠️ Proposed fix pattern (send_all / recv_exact)
+namespace {
+bool send_all(int fd, const uint8_t* data, size_t len) {
+    size_t sent = 0;
+    while (sent < len) {
+        const ssize_t n = send(fd, data + sent, len - sent, 0);
+        if (n <= 0) return false;
+        sent += static_cast<size_t>(n);
+    }
+    return true;
+}
+
+bool recv_exact(int fd, uint8_t* data, size_t len) {
+    size_t got = 0;
+    while (got < len) {
+        const ssize_t n = recv(fd, data + got, len - got, MSG_WAITALL);
+        if (n <= 0) return false;
+        got += static_cast<size_t>(n);
+    }
+    return true;
+}
+}  // namespace
...
-    if (send(fd_, header_buf, RpcHeader::HEADER_SIZE, 0) <= 0) {
+    if (!send_all(fd_, reinterpret_cast<const uint8_t*>(header_buf), RpcHeader::HEADER_SIZE)) {
         return false;
     }
...
-        if (send(fd_, payload.data(), payload.size(), 0) <= 0) {
+        if (!send_all(fd_, payload.data(), payload.size())) {
             return false;
         }
...
-    if (recv(fd_, resp_buf.data(), RpcHeader::HEADER_SIZE, MSG_WAITALL) <= 0) {
+    if (!recv_exact(fd_, reinterpret_cast<uint8_t*>(resp_buf.data()), RpcHeader::HEADER_SIZE)) {
         return false;
     }
...
-        if (recv(fd_, response_out.data(), resp_header.payload_len, MSG_WAITALL) <= 0) {
+        if (!recv_exact(fd_, response_out.data(), resp_header.payload_len)) {
             return false;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/network/rpc_client.cpp` around lines 80 - 88, The code currently assumes
send() and recv(MSG_WAITALL) always transfer the full buffer in
RpcClient::call() and RpcClient::send_only(), which leads to partial-frame
corruption under load; implement and use helper functions (e.g., send_all(int
fd, const void* buf, size_t len) and recv_exact(int fd, void* buf, size_t len))
that loop until the full length is sent/received or a fatal error/EOF occurs,
properly handling short writes/reads, EINTR/EAGAIN, and returning false on
incomplete transfers; replace the direct send(...) and recv(..., MSG_WAITALL)
usages in call() and send_only() with these helpers and ensure error
logging/cleanup occurs when they return failure.
src/catalog/catalog.cpp (1)

165-200: ⚠️ Potential issue | 🔴 Critical

Add strict bounds checks when decoding replicated catalog commands.

Line 165–200 parses entry.data with unchecked memcpy/offset arithmetic. A truncated or malformed log entry can read past buffer bounds and crash.

🔧 Proposed hardening for `Catalog::apply`
 void Catalog::apply(const raft::LogEntry& entry) {
     if (entry.data.empty()) return;

+    auto require = [&](size_t offset, size_t need) -> bool {
+        return offset <= entry.data.size() && need <= (entry.data.size() - offset);
+    };
+
     uint8_t type = entry.data[0];
     if (type == 1) {  // CreateTable
         size_t offset = 1;
         uint32_t name_len = 0;
+        if (!require(offset, 4)) return;
         std::memcpy(&name_len, entry.data.data() + offset, 4);
         offset += 4;
+        if (!require(offset, name_len)) return;
         std::string table_name(reinterpret_cast<const char*>(entry.data.data() + offset), name_len);
         offset += name_len;

         uint32_t col_count = 0;
+        if (!require(offset, 4)) return;
         std::memcpy(&col_count, entry.data.data() + offset, 4);
         offset += 4;

         std::vector<ColumnInfo> columns;
         for (uint32_t i = 0; i < col_count; ++i) {
             uint32_t cname_len = 0;
+            if (!require(offset, 4)) return;
             std::memcpy(&cname_len, entry.data.data() + offset, 4);
             offset += 4;
+            if (!require(offset, cname_len + 1 + 2)) return;
             std::string cname(reinterpret_cast<const char*>(entry.data.data() + offset), cname_len);
             offset += cname_len;
             common::ValueType ctype = static_cast<common::ValueType>(entry.data[offset++]);
             uint16_t cpos = 0;
             std::memcpy(&cpos, entry.data.data() + offset, 2);
             offset += 2;
             columns.emplace_back(cname, ctype, cpos);
         }

         create_table_local(table_name, std::move(columns));
     } else if (type == 2) {  // DropTable
+        if (entry.data.size() < 1 + 4) return;
         oid_t table_id = 0;
         std::memcpy(&table_id, entry.data.data() + 1, 4);
         drop_table_local(table_id);
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/catalog/catalog.cpp` around lines 165 - 200, Catalog::apply currently
reads from entry.data with unchecked memcpy/offset arithmetic; add strict bounds
checks before each read to prevent buffer overrun: validate entry.data.size() >
0, then before reading name_len/cname_len/col_count/ctype/cpos/table_id ensure
offset + sizeof(type) or offset + length <= entry.data.size(); verify name_len
and each cname_len and col_count are reasonable (e.g., do not cause overflow
when added to offset) and abort gracefully (log error and return) on
malformed/truncated input; apply the same guarded-offset pattern when parsing
the DropTable path (use a local offset variable rather than hardcoding +1) and
only call create_table_local or drop_table_local after all validations succeed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plans/CPP_MIGRATION_PLAN.md`:
- Line 1: The H1 heading "#cloudSQL C++ Migration& Distributed Optimization
Plan" violates markdownlint H1 syntax; update the header in the document (the H1
line) to include a space after the hash and spaces around the ampersand so it
reads: "# cloudSQL C++ Migration & Distributed Optimization Plan" to satisfy
markdownlint and improve readability.

In `@src/distributed/distributed_executor.cpp`:
- Around line 95-118: The current code can return success even when no
coordinator is found or the RPC connection fails; update the logic around
cluster_manager_.get_leader()/get_coordinators() so that if target remains null
you return an error status (or throw) instead of proceeding, and if
network::RpcClient client(target->address, target->cluster_port) fails to
connect (client.connect() == false) you likewise return/propagate an error
rather than treating the DDL as successful; adjust the return path to reflect
real failure outcomes and include clear error messages tied to the target/client
checks.

In `@src/executor/query_executor.cpp`:
- Around line 312-329: The current logic allows local application of writes even
when a shard_group exists but the node is not the leader; update the INSERT and
DELETE paths (around the shard_group/is_leader() checks and where
table.insert(...) and the corresponding table.delete(...) are invoked) to guard
local mutation: if shard_group is non-null and shard_group->is_leader() is
false, do not perform local table.insert or delete and instead return an
appropriate error/result (same behavior as when replicate(...) fails); ensure
you reference the existing shard_group, is_leader(), replicate(cmd),
table.insert(tuple, xmin) and the delete path functions so the check is applied
symmetrically in both code paths.
- Around line 71-76: The replication currently only handles types 1/2
(INSERT/DELETE); add type-3 (UPDATE) handling by updating execute_update to emit
a type value of 3 and serialize the UPDATE payload (e.g., primary key or old
tuple and the new tuple) using the existing Serializer APIs when creating the
replication command, and update ShardStateMachine::apply to handle type == 3:
deserialize the UPDATE payload (using network::Serializer::deserialize_* helpers
similar to deserialize_tuple) and invoke the table update method (e.g.,
table.update(...) or the appropriate update_by_rid/key API) to apply the
mutation; mirror this change at the other emit/apply sites that create
replication commands so UPDATEs are serialized and applied consistently.

In `@src/network/rpc_server.cpp`:
- Around line 73-77: The stop() method must close active client sockets so
worker threads blocked in handle_client()'s recv(..., MSG_WAITALL) can unblock;
add a thread-safe container (e.g., std::unordered_set<int> guarded by a mutex)
that handle_client() inserts the client fd into when starting and removes on
exit, then in stop() iterate that set and call shutdown(fd, SHUT_RDWR) (and
optionally close(fd)) for each fd before setting running_ = false and joining
threads in workers; ensure access to the container is synchronized when
adding/removing fds to avoid races.
- Around line 67-71: Add the missing private members to the RpcServer class:
declare std::vector<std::thread> worker_threads_ and std::mutex worker_mutex_ in
the private section of class RpcServer so the uses in RpcServer::... (e.g., the
workers.swap(worker_threads_) and the scoped_lock on worker_mutex_) compile;
ensure the names exactly match worker_threads_ and worker_mutex_ to align with
existing references in src/network/rpc_server.cpp.

---

Outside diff comments:
In `@src/catalog/catalog.cpp`:
- Around line 165-200: Catalog::apply currently reads from entry.data with
unchecked memcpy/offset arithmetic; add strict bounds checks before each read to
prevent buffer overrun: validate entry.data.size() > 0, then before reading
name_len/cname_len/col_count/ctype/cpos/table_id ensure offset + sizeof(type) or
offset + length <= entry.data.size(); verify name_len and each cname_len and
col_count are reasonable (e.g., do not cause overflow when added to offset) and
abort gracefully (log error and return) on malformed/truncated input; apply the
same guarded-offset pattern when parsing the DropTable path (use a local offset
variable rather than hardcoding +1) and only call create_table_local or
drop_table_local after all validations succeed.

In `@src/distributed/distributed_executor.cpp`:
- Around line 323-339: The fallback routing uses shard_idx as a positional index
into data_nodes (built from unordered_map), which is nondeterministic; make the
fallback deterministic by establishing a stable ordering before indexing — e.g.,
sort data_nodes by a stable key (node.id) or use explicit shard→node metadata
instead of positional indexing; update the logic around
cluster::ShardManager::compute_shard, data_nodes, shard_idx,
cluster_manager_.get_leader and target_nodes to use the stable ordering (apply
the same change to the other occurrence around the 357-373 block).

In `@src/network/rpc_client.cpp`:
- Around line 80-88: The code currently assumes send() and recv(MSG_WAITALL)
always transfer the full buffer in RpcClient::call() and RpcClient::send_only(),
which leads to partial-frame corruption under load; implement and use helper
functions (e.g., send_all(int fd, const void* buf, size_t len) and
recv_exact(int fd, void* buf, size_t len)) that loop until the full length is
sent/received or a fatal error/EOF occurs, properly handling short writes/reads,
EINTR/EAGAIN, and returning false on incomplete transfers; replace the direct
send(...) and recv(..., MSG_WAITALL) usages in call() and send_only() with these
helpers and ensure error logging/cleanup occurs when they return failure.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 51758874-ecdc-4e7f-9e3b-b7d049ff5265

📥 Commits

Reviewing files that changed from the base of the PR and between de1827d and ecd8474.

📒 Files selected for processing (16)
  • include/catalog/catalog.hpp
  • include/common/cluster_manager.hpp
  • include/network/rpc_client.hpp
  • plans/CPP_MIGRATION_PLAN.md
  • src/catalog/catalog.cpp
  • src/distributed/distributed_executor.cpp
  • src/distributed/raft_group.cpp
  • src/distributed/raft_manager.cpp
  • src/executor/query_executor.cpp
  • src/main.cpp
  • src/network/rpc_client.cpp
  • src/network/rpc_server.cpp
  • src/network/server.cpp
  • src/storage/storage_manager.cpp
  • tests/multi_raft_tests.cpp
  • tests/server_tests.cpp

@@ -1,4 +1,4 @@
# cloudSQL C++ Migration & Distributed Optimization Plan
#cloudSQL C++ Migration& Distributed Optimization Plan
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix H1 ATX syntax for markdownlint compliance.

Line 1 is missing the required space after # and is harder to read without spacing around &.

✏️ Proposed fix
-#cloudSQL C++ Migration& Distributed Optimization Plan
+# cloudSQL C++ Migration & Distributed Optimization Plan
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#cloudSQL C++ Migration& Distributed Optimization Plan
# cloudSQL C++ Migration & Distributed Optimization Plan
🧰 Tools
🪛 markdownlint-cli2 (0.21.0)

[warning] 1-1: No space after hash on atx style heading

(MD018, no-missing-space-atx)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plans/CPP_MIGRATION_PLAN.md` at line 1, The H1 heading "#cloudSQL C++
Migration& Distributed Optimization Plan" violates markdownlint H1 syntax;
update the header in the document (the H1 line) to include a space after the
hash and spaces around the ampersand so it reads: "# cloudSQL C++ Migration &
Distributed Optimization Plan" to satisfy markdownlint and improve readability.

Comment on lines 95 to +118
std::string leader_id = cluster_manager_.get_leader(0);
auto nodes = cluster_manager_.get_coordinators();

const cluster::NodeInfo* target = nullptr;
if (!leader_id.empty()) {
for (const auto& n : nodes) {
if (n.id == leader_id) { target = &n; break; }
if (n.id == leader_id) {
target = &n;
break;
}
}
}

// Fallback: route to first coordinator if leader unknown (leader will redirect or proxy)
if (!target && !nodes.empty()) target = &nodes[0];

if (target) {
network::RpcClient client(target->address, target->cluster_port);
if (client.connect()) {
// In a full implementation, DDL would be sent as a Catalog-specific RPC
// For POC, we treat it success locally after replication initiation
}
}
return {};
return {};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

DDL path reports success even when no coordinator is reachable.

At Line 118, the branch returns success even if target is null or client.connect() fails. That can falsely acknowledge DDL replication/routing.

🛠️ Proposed fix
-        if (target) {
-            network::RpcClient client(target->address, target->cluster_port);
-            if (client.connect()) {
-                // In a full implementation, DDL would be sent as a Catalog-specific RPC
-                // For POC, we treat it success locally after replication initiation
-            }
-        }
-        return {};
+        if (target == nullptr) {
+            QueryResult res;
+            res.set_error("No coordinator available for DDL routing");
+            return res;
+        }
+
+        network::RpcClient client(target->address, target->cluster_port);
+        if (!client.connect()) {
+            QueryResult res;
+            res.set_error("Failed to connect to coordinator leader for DDL routing");
+            return res;
+        }
+
+        // TODO: send Catalog-specific RPC and validate response.
+        return {};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/distributed/distributed_executor.cpp` around lines 95 - 118, The current
code can return success even when no coordinator is found or the RPC connection
fails; update the logic around cluster_manager_.get_leader()/get_coordinators()
so that if target remains null you return an error status (or throw) instead of
proceeding, and if network::RpcClient client(target->address,
target->cluster_port) fails to connect (client.connect() == false) you likewise
return/propagate an error rather than treating the DDL as successful; adjust the
return path to reflect real failure outcomes and include clear error messages
tied to the target/client checks.

Comment on lines +71 to 76
if (type == 1) { // INSERT
Tuple tuple =
network::Serializer::deserialize_tuple(entry.data.data(), offset, entry.data.size());
table.insert(tuple, 0);
} else if (type == 2) { // DELETE
} else if (type == 2) { // DELETE
storage::HeapTable::TupleId rid;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Replication protocol defines UPDATE but implementation omits it.

ShardStateMachine::apply handles only INSERT/DELETE, and command emission also covers only types 1/2. UPDATE mutations are not replicated, which will desynchronize replicas.

Please add type-3 command serialization in execute_update and matching apply logic in ShardStateMachine::apply.

Also applies to: 314-315, 397-398

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/executor/query_executor.cpp` around lines 71 - 76, The replication
currently only handles types 1/2 (INSERT/DELETE); add type-3 (UPDATE) handling
by updating execute_update to emit a type value of 3 and serialize the UPDATE
payload (e.g., primary key or old tuple and the new tuple) using the existing
Serializer APIs when creating the replication command, and update
ShardStateMachine::apply to handle type == 3: deserialize the UPDATE payload
(using network::Serializer::deserialize_* helpers similar to deserialize_tuple)
and invoke the table update method (e.g., table.update(...) or the appropriate
update_by_rid/key API) to apply the mutation; mirror this change at the other
emit/apply sites that create replication commands so UPDATEs are serialized and
applied consistently.

Comment on lines 312 to 329
if (shard_group && shard_group->is_leader()) {
std::vector<uint8_t> cmd;
cmd.push_back(1); // Type 1: INSERT
cmd.push_back(1); // Type 1: INSERT
uint32_t tlen = static_cast<uint32_t>(table_name.size());
size_t off = cmd.size();
cmd.resize(off + 4 + tlen);
std::memcpy(cmd.data() + off, &tlen, 4);
std::memcpy(cmd.data() + off + 4, table_name.data(), tlen);
network::Serializer::serialize_tuple(tuple, cmd);

if (!shard_group->replicate(cmd)) {
result.set_error("Replication failed for shard 1");
return result;
}
}
}

const auto tid = table.insert(tuple, xmin);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Writes can be applied locally on non-leader nodes without Raft.

If shard_group exists but is_leader() is false, replication is skipped, yet local mutation still executes later (e.g., Line 329 and delete path). This can violate replication consistency.

🛠️ Proposed guard
         if (cluster_manager_ != nullptr && cluster_manager_->get_raft_manager() != nullptr) {
             auto shard_group = cluster_manager_->get_raft_manager()->get_group(1);
-            if (shard_group && shard_group->is_leader()) {
+            if (shard_group == nullptr) {
+                result.set_error("Shard group not available");
+                return result;
+            }
+            if (!shard_group->is_leader()) {
+                result.set_error("Write rejected on follower; route to shard leader");
+                return result;
+            }
+            {
                 std::vector<uint8_t> cmd;
                 cmd.push_back(1);  // Type 1: INSERT
                 ...
                 if (!shard_group->replicate(cmd)) {
                     result.set_error("Replication failed for shard 1");
                     return result;
                 }
             }
         }

Apply the same guard pattern in both INSERT and DELETE paths.

Also applies to: 394-412

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/executor/query_executor.cpp` around lines 312 - 329, The current logic
allows local application of writes even when a shard_group exists but the node
is not the leader; update the INSERT and DELETE paths (around the
shard_group/is_leader() checks and where table.insert(...) and the corresponding
table.delete(...) are invoked) to guard local mutation: if shard_group is
non-null and shard_group->is_leader() is false, do not perform local
table.insert or delete and instead return an appropriate error/result (same
behavior as when replicate(...) fails); ensure you reference the existing
shard_group, is_leader(), replicate(cmd), table.insert(tuple, xmin) and the
delete path functions so the check is applied symmetrically in both code paths.

Comment on lines +73 to +77
for (auto& t : workers) {
if (t.joinable()) {
t.join();
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cat -n src/network/rpc_server.cpp | head -100

Repository: poyrazK/cloudSQL

Length of output: 3216


🏁 Script executed:

rg -n "void RpcServer::stop\(\)|join\(\)|recv\(" src/network/rpc_server.cpp -B 2 -A 5

Repository: poyrazK/cloudSQL

Length of output: 1345


🏁 Script executed:

rg -n "handle_client|worker" src/network/rpc_server.cpp -B 2 -A 10

Repository: poyrazK/cloudSQL

Length of output: 1355


🏁 Script executed:

rg -n "shutdown\(|close\(" src/network/rpc_server.cpp

Repository: poyrazK/cloudSQL

Length of output: 314


stop() blocks indefinitely when worker threads are stuck in blocking recv() calls.

Worker threads in handle_client() block at recv(..., MSG_WAITALL) waiting for client data. Setting running_ = false does not interrupt these blocking calls. If clients keep sockets open without sending data, the threads never exit their while(running_) loop, causing join() at lines 73–77 to hang forever.

The listening socket is correctly shut down at line 59, but active client file descriptors held by worker threads are never closed before joining, leaving threads blocked in recv().

Practical fix: Track active client file descriptors and shutdown each one (shutdown(fd, SHUT_RDWR)) in stop() before joining worker threads, allowing blocked recv() calls to return.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/network/rpc_server.cpp` around lines 73 - 77, The stop() method must
close active client sockets so worker threads blocked in handle_client()'s
recv(..., MSG_WAITALL) can unblock; add a thread-safe container (e.g.,
std::unordered_set<int> guarded by a mutex) that handle_client() inserts the
client fd into when starting and removes on exit, then in stop() iterate that
set and call shutdown(fd, SHUT_RDWR) (and optionally close(fd)) for each fd
before setting running_ = false and joining threads in workers; ensure access to
the container is synchronized when adding/removing fds to avoid races.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
.github/workflows/ci.yml (1)

72-79: ⚠️ Potential issue | 🟠 Major

server_tests is not defined in CMakeLists.txt — CI will fail when executing line 74.

Although tests/server_tests.cpp exists, no server_tests target is registered in CMakeLists.txt, so the binary won't be built. Additionally, this PR introduces raft and distributed functionality, but the following test targets are missing from the CI execution:

  • raft_tests, distributed_tests, raft_sim_tests, multi_raft_tests, distributed_txn_tests
Suggested fix
         ./cloudSQL_tests
         ./lock_manager_tests
-        ./server_tests
         ./transaction_manager_tests
         ./statement_tests
         ./recovery_tests
         ./recovery_manager_tests
         ./buffer_pool_tests
+        ./raft_tests
+        ./distributed_tests
+        ./raft_sim_tests
+        ./multi_raft_tests
+        ./distributed_txn_tests

Alternatively, use ctest to run all registered tests automatically:

-        ./cloudSQL_tests
-        ./lock_manager_tests
-        ./server_tests
-        ./transaction_manager_tests
-        ./statement_tests
-        ./recovery_tests
-        ./recovery_manager_tests
-        ./buffer_pool_tests
+        ctest --output-on-failure
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.github/workflows/ci.yml around lines 72 - 79, CI references test target
"server_tests" which is not registered in CMakeLists.txt and several new test
targets from the raft/distributed work are missing (raft_tests,
distributed_tests, raft_sim_tests, multi_raft_tests, distributed_txn_tests);
update CMakeLists.txt to add/enable those test targets (ensure the targets that
build tests like server_tests, raft_tests, distributed_tests, raft_sim_tests,
multi_raft_tests, distributed_txn_tests are defined and added via
add_executable/add_test or equivalent) so the binaries exist for the workflow,
or modify the CI step in .github/workflows/ci.yml to run ctest instead of
enumerating targets so all registered tests run automatically.
🧹 Nitpick comments (1)
CMakeLists.txt (1)

77-77: Consider using CMake's standard BUILD_TESTING variable or defining BUILD_TESTS as an option.

BUILD_TESTING is the standard CMake variable (automatically created by include(CTest)). The custom BUILD_TESTS variable works when passed via CLI (-DBUILD_TESTS=ON), but it's not declared as an option() in this file, which means:

  1. It won't appear in CMake GUIs/ccmake for discovery
  2. It lacks a default value and documentation
Option 1: Use CMake standard variable
-if(BUILD_TESTS)
+include(CTest)
+if(BUILD_TESTING)
     enable_testing()
Option 2: Declare BUILD_TESTS as an option
 option(ENABLE_TSAN "Enable ThreadSanitizer" OFF)
+option(BUILD_TESTS "Build unit tests" OFF)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@CMakeLists.txt` at line 77, The CMake check uses a non-declared BUILD_TESTS
variable; either switch to the standard CMake variable BUILD_TESTING (by calling
include(CTest) earlier and using if(BUILD_TESTING)) or explicitly declare
BUILD_TESTS as an option (option(BUILD_TESTS "Build tests" OFF) at top) so it
appears in GUIs and has a default; update the if(BUILD_TESTS) conditional to use
the chosen symbol and ensure include(CTest) is added when opting for
BUILD_TESTING.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In @.github/workflows/ci.yml:
- Around line 72-79: CI references test target "server_tests" which is not
registered in CMakeLists.txt and several new test targets from the
raft/distributed work are missing (raft_tests, distributed_tests,
raft_sim_tests, multi_raft_tests, distributed_txn_tests); update CMakeLists.txt
to add/enable those test targets (ensure the targets that build tests like
server_tests, raft_tests, distributed_tests, raft_sim_tests, multi_raft_tests,
distributed_txn_tests are defined and added via add_executable/add_test or
equivalent) so the binaries exist for the workflow, or modify the CI step in
.github/workflows/ci.yml to run ctest instead of enumerating targets so all
registered tests run automatically.

---

Nitpick comments:
In `@CMakeLists.txt`:
- Line 77: The CMake check uses a non-declared BUILD_TESTS variable; either
switch to the standard CMake variable BUILD_TESTING (by calling include(CTest)
earlier and using if(BUILD_TESTING)) or explicitly declare BUILD_TESTS as an
option (option(BUILD_TESTS "Build tests" OFF) at top) so it appears in GUIs and
has a default; update the if(BUILD_TESTS) conditional to use the chosen symbol
and ensure include(CTest) is added when opting for BUILD_TESTING.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cfd60479-38d2-40fd-be10-8cd8c8205125

📥 Commits

Reviewing files that changed from the base of the PR and between ecd8474 and 6a1e3e1.

📒 Files selected for processing (6)
  • .github/workflows/ci.yml
  • CMakeLists.txt
  • include/network/rpc_server.hpp
  • src/network/rpc_client.cpp
  • src/network/rpc_server.cpp
  • src/network/server.cpp
✅ Files skipped from review due to trivial changes (1)
  • src/network/server.cpp
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/network/rpc_client.cpp
  • src/network/rpc_server.cpp

@poyrazK poyrazK merged commit 1402b26 into main Mar 5, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant