Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
- name: Run Tests
run: |
cd build
./sqlEngine_tests
./cloudSQL_tests
./lock_manager_tests
./server_tests
./transaction_manager_tests
Expand Down Expand Up @@ -98,4 +98,4 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: cloudsql-bin-${{ matrix.compiler }}
path: build/sqlEngine
path: build/cloudSQL
11 changes: 10 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
option(STRICT_LINT "Enable strict linting" ON)
option(ENABLE_ASAN "Enable AddressSanitizer" OFF)
option(ENABLE_TSAN "Enable ThreadSanitizer" OFF)
option(BUILD_COVERAGE "Enable code coverage reporting" OFF)

# Add include directories
include_directories(include)
Expand Down Expand Up @@ -42,6 +43,7 @@ set(CORE_SOURCES
src/transaction/lock_manager.cpp
src/transaction/transaction_manager.cpp
src/recovery/log_manager.cpp
src/recovery/log_record.cpp
src/recovery/recovery_manager.cpp
src/distributed/raft_group.cpp
src/distributed/raft_manager.cpp
Expand All @@ -50,6 +52,12 @@ set(CORE_SOURCES

add_library(sqlEngineCore ${CORE_SOURCES})

# Coverage
if(BUILD_COVERAGE)
target_compile_options(sqlEngineCore PUBLIC --coverage -O0)
target_link_libraries(sqlEngineCore PUBLIC --coverage)
endif()

# Sanitizers
if(ENABLE_ASAN)
target_compile_options(sqlEngineCore PUBLIC -fsanitize=address)
Expand All @@ -73,9 +81,10 @@ macro(add_cloudsql_test NAME SOURCE)
endmacro()

# Tests
if(BUILD_TESTING)
if(BUILD_TESTS)
enable_testing()
add_cloudsql_test(cloudSQL_tests tests/cloudSQL_tests.cpp)
add_cloudsql_test(server_tests tests/server_tests.cpp)
add_cloudsql_test(statement_tests tests/statement_tests.cpp)
add_cloudsql_test(transaction_manager_tests tests/transaction_manager_tests.cpp)
add_cloudsql_test(lock_manager_tests tests/lock_manager_tests.cpp)
Expand Down
6 changes: 3 additions & 3 deletions include/catalog/catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ struct ShardInfo {
uint32_t shard_id;
std::string node_address;
uint16_t port;
std::vector<std::string> replicas; // List of node IDs
std::string leader_id; // Current Raft leader
std::vector<std::string> replicas; // List of node IDs
std::string leader_id; // Current Raft leader
};

/**
Expand Down Expand Up @@ -154,7 +154,7 @@ class Catalog : public raft::RaftStateMachine {
* @brief Apply a committed log entry (from RaftStateMachine)
*/
void apply(const raft::LogEntry& entry) override;

/**
* @brief Default constructor
*/
Expand Down
3 changes: 2 additions & 1 deletion include/common/cluster_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ struct NodeInfo {
*/
class ClusterManager {
public:
explicit ClusterManager(const config::Config* config) : config_(config), raft_manager_(nullptr) {
explicit ClusterManager(const config::Config* config)
: config_(config), raft_manager_(nullptr) {
// Add self to node map if in distributed mode
if (config_ != nullptr && config_->mode != config::RunMode::Standalone) {
self_node_.id = "local_node"; // Will be replaced by unique ID later
Expand Down
6 changes: 3 additions & 3 deletions include/network/rpc_client.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/**
* @file rpc_client.hpp
* @brief Internal RPC client for node-to-node communication
*/
Expand Down Expand Up @@ -29,8 +29,8 @@ class RpcClient {
/**
* @brief Send a request and wait for a response
*/
bool call(RpcType type, const std::vector<uint8_t>& payload,
std::vector<uint8_t>& response_out, uint16_t group_id = 0);
bool call(RpcType type, const std::vector<uint8_t>& payload, std::vector<uint8_t>& response_out,
uint16_t group_id = 0);

/**
* @brief Send a request without waiting for a response
Expand Down
2 changes: 2 additions & 0 deletions include/network/rpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class RpcServer {
int listen_fd_ = -1;
std::atomic<bool> running_{false};
std::thread accept_thread_;
std::vector<std::thread> worker_threads_;
std::mutex worker_mutex_;
std::unordered_map<RpcType, RpcHandler> handlers_;
std::mutex handlers_mutex_;
};
Expand Down
2 changes: 1 addition & 1 deletion plans/CPP_MIGRATION_PLAN.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# cloudSQL C++ Migration & Distributed Optimization Plan
#cloudSQL C++ Migration& Distributed Optimization Plan
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix H1 ATX syntax for markdownlint compliance.

Line 1 is missing the required space after # and is harder to read without spacing around &.

✏️ Proposed fix
-#cloudSQL C++ Migration& Distributed Optimization Plan
+# cloudSQL C++ Migration & Distributed Optimization Plan
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#cloudSQL C++ Migration& Distributed Optimization Plan
# cloudSQL C++ Migration & Distributed Optimization Plan
🧰 Tools
🪛 markdownlint-cli2 (0.21.0)

[warning] 1-1: No space after hash on atx style heading

(MD018, no-missing-space-atx)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plans/CPP_MIGRATION_PLAN.md` at line 1, The H1 heading "#cloudSQL C++
Migration& Distributed Optimization Plan" violates markdownlint H1 syntax;
update the header in the document (the H1 line) to include a space after the
hash and spaces around the ampersand so it reads: "# cloudSQL C++ Migration &
Distributed Optimization Plan" to satisfy markdownlint and improve readability.


## Phase Status Summary
- **Phase 1: Foundation (Core & Storage)** - [x] COMPLETE
Expand Down
37 changes: 19 additions & 18 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,23 @@ oid_t Catalog::create_table(const std::string& table_name, std::vector<ColumnInf
// Multi-Raft: Replicate DDL via Catalog Raft Group (ID 0)
// Serialize command: [Type:1][NameLen:4][Name][ColCount:4][Cols...]
std::vector<uint8_t> cmd;
cmd.push_back(1); // Type 1: CreateTable
cmd.push_back(1); // Type 1: CreateTable

uint32_t name_len = static_cast<uint32_t>(table_name.size());
size_t offset = cmd.size();
cmd.resize(offset + 4 + table_name.size());
std::memcpy(cmd.data() + offset, &name_len, 4);
std::memcpy(cmd.data() + offset + 4, table_name.data(), name_len);

uint32_t col_count = static_cast<uint32_t>(columns.size());
offset = cmd.size();
cmd.resize(offset + 4);
std::memcpy(cmd.data() + offset, &col_count, 4);

for (const auto& col : columns) {
uint32_t cname_len = static_cast<uint32_t>(col.name.size());
offset = cmd.size();
cmd.resize(offset + 4 + col.name.size() + 1 + 2); // len + name + type + pos
cmd.resize(offset + 4 + col.name.size() + 1 + 2); // len + name + type + pos
std::memcpy(cmd.data() + offset, &cname_len, 4);
std::memcpy(cmd.data() + offset + 4, col.name.data(), cname_len);
cmd[offset + 4 + cname_len] = static_cast<uint8_t>(col.type);
Expand All @@ -110,10 +110,7 @@ oid_t Catalog::create_table(const std::string& table_name, std::vector<ColumnInf

oid_t Catalog::create_table_local(const std::string& table_name, std::vector<ColumnInfo> columns) {
if (table_exists_by_name(table_name)) {
// Return existing OID if it exists (for idempotency in Raft replay)
for (auto& pair : tables_) {
if (pair.second->name == table_name) return pair.first;
}
throw std::runtime_error("Table already exists: " + table_name);
}

auto table = std::make_unique<TableInfo>();
Expand Down Expand Up @@ -141,10 +138,10 @@ oid_t Catalog::create_table_local(const std::string& table_name, std::vector<Col
bool Catalog::drop_table(oid_t table_id) {
if (raft_group_ != nullptr) {
std::vector<uint8_t> cmd;
cmd.push_back(2); // Type 2: DropTable
cmd.push_back(2); // Type 2: DropTable
cmd.resize(cmd.size() + 4);
std::memcpy(cmd.data() + 1, &table_id, 4);

if (raft_group_->replicate(cmd)) {
return drop_table_local(table_id);
}
Expand All @@ -164,20 +161,20 @@ bool Catalog::drop_table_local(oid_t table_id) {

void Catalog::apply(const raft::LogEntry& entry) {
if (entry.data.empty()) return;

uint8_t type = entry.data[0];
if (type == 1) { // CreateTable
if (type == 1) { // CreateTable
size_t offset = 1;
uint32_t name_len = 0;
std::memcpy(&name_len, entry.data.data() + offset, 4);
offset += 4;
std::string table_name(reinterpret_cast<const char*>(entry.data.data() + offset), name_len);
offset += name_len;

uint32_t col_count = 0;
std::memcpy(&col_count, entry.data.data() + offset, 4);
offset += 4;

std::vector<ColumnInfo> columns;
for (uint32_t i = 0; i < col_count; ++i) {
uint32_t cname_len = 0;
Expand All @@ -191,9 +188,13 @@ void Catalog::apply(const raft::LogEntry& entry) {
offset += 2;
columns.emplace_back(cname, ctype, cpos);
}

create_table_local(table_name, std::move(columns));
} else if (type == 2) { // DropTable

try {
create_table_local(table_name, std::move(columns));
} catch (const std::exception& e) {
// Ignore duplicate table errors during Raft replay
}
} else if (type == 2) { // DropTable
oid_t table_id = 0;
std::memcpy(&table_id, entry.data.data() + 1, 4);
drop_table_local(table_id);
Expand Down
19 changes: 11 additions & 8 deletions src/distributed/distributed_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,28 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
// Metadata operations (Group 0) must be routed to the Catalog Leader
std::string leader_id = cluster_manager_.get_leader(0);
auto nodes = cluster_manager_.get_coordinators();

const cluster::NodeInfo* target = nullptr;
if (!leader_id.empty()) {
for (const auto& n : nodes) {
if (n.id == leader_id) { target = &n; break; }
if (n.id == leader_id) {
target = &n;
break;
}
}
}

// Fallback: route to first coordinator if leader unknown (leader will redirect or proxy)
if (!target && !nodes.empty()) target = &nodes[0];

if (target) {
network::RpcClient client(target->address, target->cluster_port);
if (client.connect()) {
// In a full implementation, DDL would be sent as a Catalog-specific RPC
// For POC, we treat it success locally after replication initiation
}
}
return {};
return {};
Comment on lines 95 to +118
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

DDL path reports success even when no coordinator is reachable.

At Line 118, the branch returns success even if target is null or client.connect() fails. That can falsely acknowledge DDL replication/routing.

🛠️ Proposed fix
-        if (target) {
-            network::RpcClient client(target->address, target->cluster_port);
-            if (client.connect()) {
-                // In a full implementation, DDL would be sent as a Catalog-specific RPC
-                // For POC, we treat it success locally after replication initiation
-            }
-        }
-        return {};
+        if (target == nullptr) {
+            QueryResult res;
+            res.set_error("No coordinator available for DDL routing");
+            return res;
+        }
+
+        network::RpcClient client(target->address, target->cluster_port);
+        if (!client.connect()) {
+            QueryResult res;
+            res.set_error("Failed to connect to coordinator leader for DDL routing");
+            return res;
+        }
+
+        // TODO: send Catalog-specific RPC and validate response.
+        return {};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/distributed/distributed_executor.cpp` around lines 95 - 118, The current
code can return success even when no coordinator is found or the RPC connection
fails; update the logic around cluster_manager_.get_leader()/get_coordinators()
so that if target remains null you return an error status (or throw) instead of
proceeding, and if network::RpcClient client(target->address,
target->cluster_port) fails to connect (client.connect() == false) you likewise
return/propagate an error rather than treating the DDL as successful; adjust the
return path to reflect real failure outcomes and include clear error messages
tied to the target/client checks.

}

auto data_nodes = cluster_manager_.get_data_nodes();
Expand Down Expand Up @@ -319,7 +322,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,

const uint32_t shard_idx = cluster::ShardManager::compute_shard(
pk_val, static_cast<uint32_t>(data_nodes.size()));

// Leader-Aware Routing: Find shard leader
std::string leader_id = cluster_manager_.get_leader(shard_idx + 1);
bool found_leader = false;
Expand All @@ -332,7 +335,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
}
}
}

if (!found_leader) target_nodes.push_back(data_nodes[shard_idx]);
}
}
Expand All @@ -353,7 +356,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
if (try_extract_sharding_key(where_expr, pk_val)) {
const uint32_t shard_idx = cluster::ShardManager::compute_shard(
pk_val, static_cast<uint32_t>(data_nodes.size()));

// Leader-Aware Routing: Route mutations/queries to the current shard leader
std::string leader_id = cluster_manager_.get_leader(shard_idx + 1);
bool found_leader = false;
Expand Down
16 changes: 9 additions & 7 deletions src/distributed/raft_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ LogEntry deserialize_entry(const uint8_t* data, size_t& offset, size_t size) {

} // namespace

RaftGroup::RaftGroup(uint16_t group_id, std::string node_id, cluster::ClusterManager& cluster_manager,
network::RpcServer& rpc_server)
RaftGroup::RaftGroup(uint16_t group_id, std::string node_id,
cluster::ClusterManager& cluster_manager, network::RpcServer& rpc_server)
: group_id_(group_id),
node_id_(std::move(node_id)),
cluster_manager_(cluster_manager),
Expand Down Expand Up @@ -146,7 +146,8 @@ void RaftGroup::do_candidate() {
network::RpcClient client(peer.address, peer.cluster_port);
if (client.connect()) {
std::vector<uint8_t> reply_payload;
if (client.call(network::RpcType::RequestVote, args.serialize(), reply_payload, group_id_)) {
if (client.call(network::RpcType::RequestVote, args.serialize(), reply_payload,
group_id_)) {
if (reply_payload.size() >= VOTE_REPLY_SIZE) {
term_t resp_term = 0;
std::memcpy(&resp_term, reply_payload.data(), 8);
Expand Down Expand Up @@ -191,7 +192,8 @@ void RaftGroup::do_leader() {

network::RpcClient client(peer.address, peer.cluster_port);
if (client.connect()) {
static_cast<void>(client.send_only(network::RpcType::AppendEntries, payload, group_id_));
static_cast<void>(
client.send_only(network::RpcType::AppendEntries, payload, group_id_));
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(HEARTBEAT_INTERVAL_MS));
Expand Down Expand Up @@ -310,7 +312,7 @@ void RaftGroup::persist_state() {
uint64_t v_len = persistent_state_.voted_for.size();
out.write(reinterpret_cast<const char*>(&v_len), 8);
out.write(persistent_state_.voted_for.data(), v_len);

uint64_t log_size = persistent_state_.log.size();
out.write(reinterpret_cast<const char*>(&log_size), 8);
for (const auto& entry : persistent_state_.log) {
Expand All @@ -332,7 +334,7 @@ void RaftGroup::load_state() {
in.read(reinterpret_cast<char*>(&v_len), 8);
persistent_state_.voted_for.resize(v_len);
in.read(&persistent_state_.voted_for[0], v_len);

uint64_t log_size = 0;
in.read(reinterpret_cast<char*>(&log_size), 8);
for (uint64_t i = 0; i < log_size; ++i) {
Expand All @@ -348,7 +350,7 @@ void RaftGroup::load_state() {

bool RaftGroup::replicate(const std::vector<uint8_t>& data) {
if (state_.load() != NodeState::Leader) return false;

std::scoped_lock<std::mutex> lock(mutex_);
LogEntry entry;
entry.term = persistent_state_.current_term;
Expand Down
9 changes: 3 additions & 6 deletions src/distributed/raft_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ namespace cloudsql::raft {

RaftManager::RaftManager(std::string node_id, cluster::ClusterManager& cluster_manager,
network::RpcServer& rpc_server)
: node_id_(std::move(node_id)),
cluster_manager_(cluster_manager),
rpc_server_(rpc_server) {
: node_id_(std::move(node_id)), cluster_manager_(cluster_manager), rpc_server_(rpc_server) {
// Register routing handlers
rpc_server_.set_handler(network::RpcType::RequestVote,
[this](const network::RpcHeader& h, const std::vector<uint8_t>& p,
Expand Down Expand Up @@ -42,8 +40,7 @@ std::shared_ptr<RaftGroup> RaftManager::get_or_create_group(uint16_t group_id) {
return it->second;
}

auto group =
std::make_shared<RaftGroup>(group_id, node_id_, cluster_manager_, rpc_server_);
auto group = std::make_shared<RaftGroup>(group_id, node_id_, cluster_manager_, rpc_server_);
groups_[group_id] = group;
return group;
}
Expand All @@ -58,7 +55,7 @@ std::shared_ptr<RaftGroup> RaftManager::get_group(uint16_t group_id) {
}

void RaftManager::handle_raft_rpc(const network::RpcHeader& header,
const std::vector<uint8_t>& payload, int client_fd) {
const std::vector<uint8_t>& payload, int client_fd) {
std::shared_ptr<RaftGroup> group;
{
const std::scoped_lock<std::mutex> lock(mutex_);
Expand Down
Loading