diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 289831c..eab53f6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: - name: Run Tests run: | cd build - ./sqlEngine_tests + ./cloudSQL_tests ./lock_manager_tests ./server_tests ./transaction_manager_tests @@ -98,4 +98,4 @@ jobs: uses: actions/upload-artifact@v4 with: name: cloudsql-bin-${{ matrix.compiler }} - path: build/sqlEngine + path: build/cloudSQL diff --git a/CMakeLists.txt b/CMakeLists.txt index 81a00b5..8466732 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,6 +8,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) option(STRICT_LINT "Enable strict linting" ON) option(ENABLE_ASAN "Enable AddressSanitizer" OFF) option(ENABLE_TSAN "Enable ThreadSanitizer" OFF) +option(BUILD_COVERAGE "Enable code coverage reporting" OFF) # Add include directories include_directories(include) @@ -42,6 +43,7 @@ set(CORE_SOURCES src/transaction/lock_manager.cpp src/transaction/transaction_manager.cpp src/recovery/log_manager.cpp + src/recovery/log_record.cpp src/recovery/recovery_manager.cpp src/distributed/raft_group.cpp src/distributed/raft_manager.cpp @@ -50,6 +52,12 @@ set(CORE_SOURCES add_library(sqlEngineCore ${CORE_SOURCES}) +# Coverage +if(BUILD_COVERAGE) + target_compile_options(sqlEngineCore PUBLIC --coverage -O0) + target_link_libraries(sqlEngineCore PUBLIC --coverage) +endif() + # Sanitizers if(ENABLE_ASAN) target_compile_options(sqlEngineCore PUBLIC -fsanitize=address) @@ -73,9 +81,10 @@ macro(add_cloudsql_test NAME SOURCE) endmacro() # Tests -if(BUILD_TESTING) +if(BUILD_TESTS) enable_testing() add_cloudsql_test(cloudSQL_tests tests/cloudSQL_tests.cpp) + add_cloudsql_test(server_tests tests/server_tests.cpp) add_cloudsql_test(statement_tests tests/statement_tests.cpp) add_cloudsql_test(transaction_manager_tests tests/transaction_manager_tests.cpp) add_cloudsql_test(lock_manager_tests tests/lock_manager_tests.cpp) diff --git a/include/catalog/catalog.hpp b/include/catalog/catalog.hpp index c97dce4..759bd42 100644 --- a/include/catalog/catalog.hpp +++ b/include/catalog/catalog.hpp @@ -77,8 +77,8 @@ struct ShardInfo { uint32_t shard_id; std::string node_address; uint16_t port; - std::vector replicas; // List of node IDs - std::string leader_id; // Current Raft leader + std::vector replicas; // List of node IDs + std::string leader_id; // Current Raft leader }; /** @@ -154,7 +154,7 @@ class Catalog : public raft::RaftStateMachine { * @brief Apply a committed log entry (from RaftStateMachine) */ void apply(const raft::LogEntry& entry) override; - + /** * @brief Default constructor */ diff --git a/include/common/cluster_manager.hpp b/include/common/cluster_manager.hpp index a4f9742..9523da4 100644 --- a/include/common/cluster_manager.hpp +++ b/include/common/cluster_manager.hpp @@ -38,7 +38,8 @@ struct NodeInfo { */ class ClusterManager { public: - explicit ClusterManager(const config::Config* config) : config_(config), raft_manager_(nullptr) { + explicit ClusterManager(const config::Config* config) + : config_(config), raft_manager_(nullptr) { // Add self to node map if in distributed mode if (config_ != nullptr && config_->mode != config::RunMode::Standalone) { self_node_.id = "local_node"; // Will be replaced by unique ID later diff --git a/include/network/rpc_client.hpp b/include/network/rpc_client.hpp index e287690..ea54938 100644 --- a/include/network/rpc_client.hpp +++ b/include/network/rpc_client.hpp @@ -1,4 +1,4 @@ - /** +/** * @file rpc_client.hpp * @brief Internal RPC client for node-to-node communication */ @@ -29,8 +29,8 @@ class RpcClient { /** * @brief Send a request and wait for a response */ - bool call(RpcType type, const std::vector& payload, - std::vector& response_out, uint16_t group_id = 0); + bool call(RpcType type, const std::vector& payload, std::vector& response_out, + uint16_t group_id = 0); /** * @brief Send a request without waiting for a response diff --git a/include/network/rpc_server.hpp b/include/network/rpc_server.hpp index bf91bfc..8a14068 100644 --- a/include/network/rpc_server.hpp +++ b/include/network/rpc_server.hpp @@ -59,6 +59,8 @@ class RpcServer { int listen_fd_ = -1; std::atomic running_{false}; std::thread accept_thread_; + std::vector worker_threads_; + std::mutex worker_mutex_; std::unordered_map handlers_; std::mutex handlers_mutex_; }; diff --git a/plans/CPP_MIGRATION_PLAN.md b/plans/CPP_MIGRATION_PLAN.md index e13abe2..3765d45 100644 --- a/plans/CPP_MIGRATION_PLAN.md +++ b/plans/CPP_MIGRATION_PLAN.md @@ -1,4 +1,4 @@ -# cloudSQL C++ Migration & Distributed Optimization Plan +#cloudSQL C++ Migration& Distributed Optimization Plan ## Phase Status Summary - **Phase 1: Foundation (Core & Storage)** - [x] COMPLETE diff --git a/src/catalog/catalog.cpp b/src/catalog/catalog.cpp index bf710e0..d7e508b 100644 --- a/src/catalog/catalog.cpp +++ b/src/catalog/catalog.cpp @@ -77,23 +77,23 @@ oid_t Catalog::create_table(const std::string& table_name, std::vector cmd; - cmd.push_back(1); // Type 1: CreateTable - + cmd.push_back(1); // Type 1: CreateTable + uint32_t name_len = static_cast(table_name.size()); size_t offset = cmd.size(); cmd.resize(offset + 4 + table_name.size()); std::memcpy(cmd.data() + offset, &name_len, 4); std::memcpy(cmd.data() + offset + 4, table_name.data(), name_len); - + uint32_t col_count = static_cast(columns.size()); offset = cmd.size(); cmd.resize(offset + 4); std::memcpy(cmd.data() + offset, &col_count, 4); - + for (const auto& col : columns) { uint32_t cname_len = static_cast(col.name.size()); offset = cmd.size(); - cmd.resize(offset + 4 + col.name.size() + 1 + 2); // len + name + type + pos + cmd.resize(offset + 4 + col.name.size() + 1 + 2); // len + name + type + pos std::memcpy(cmd.data() + offset, &cname_len, 4); std::memcpy(cmd.data() + offset + 4, col.name.data(), cname_len); cmd[offset + 4 + cname_len] = static_cast(col.type); @@ -110,10 +110,7 @@ oid_t Catalog::create_table(const std::string& table_name, std::vector columns) { if (table_exists_by_name(table_name)) { - // Return existing OID if it exists (for idempotency in Raft replay) - for (auto& pair : tables_) { - if (pair.second->name == table_name) return pair.first; - } + throw std::runtime_error("Table already exists: " + table_name); } auto table = std::make_unique(); @@ -141,10 +138,10 @@ oid_t Catalog::create_table_local(const std::string& table_name, std::vector cmd; - cmd.push_back(2); // Type 2: DropTable + cmd.push_back(2); // Type 2: DropTable cmd.resize(cmd.size() + 4); std::memcpy(cmd.data() + 1, &table_id, 4); - + if (raft_group_->replicate(cmd)) { return drop_table_local(table_id); } @@ -164,20 +161,20 @@ bool Catalog::drop_table_local(oid_t table_id) { void Catalog::apply(const raft::LogEntry& entry) { if (entry.data.empty()) return; - + uint8_t type = entry.data[0]; - if (type == 1) { // CreateTable + if (type == 1) { // CreateTable size_t offset = 1; uint32_t name_len = 0; std::memcpy(&name_len, entry.data.data() + offset, 4); offset += 4; std::string table_name(reinterpret_cast(entry.data.data() + offset), name_len); offset += name_len; - + uint32_t col_count = 0; std::memcpy(&col_count, entry.data.data() + offset, 4); offset += 4; - + std::vector columns; for (uint32_t i = 0; i < col_count; ++i) { uint32_t cname_len = 0; @@ -191,9 +188,13 @@ void Catalog::apply(const raft::LogEntry& entry) { offset += 2; columns.emplace_back(cname, ctype, cpos); } - - create_table_local(table_name, std::move(columns)); - } else if (type == 2) { // DropTable + + try { + create_table_local(table_name, std::move(columns)); + } catch (const std::exception& e) { + // Ignore duplicate table errors during Raft replay + } + } else if (type == 2) { // DropTable oid_t table_id = 0; std::memcpy(&table_id, entry.data.data() + 1, 4); drop_table_local(table_id); diff --git a/src/distributed/distributed_executor.cpp b/src/distributed/distributed_executor.cpp index 4164b95..9de2f5f 100644 --- a/src/distributed/distributed_executor.cpp +++ b/src/distributed/distributed_executor.cpp @@ -94,17 +94,20 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, // Metadata operations (Group 0) must be routed to the Catalog Leader std::string leader_id = cluster_manager_.get_leader(0); auto nodes = cluster_manager_.get_coordinators(); - + const cluster::NodeInfo* target = nullptr; if (!leader_id.empty()) { for (const auto& n : nodes) { - if (n.id == leader_id) { target = &n; break; } + if (n.id == leader_id) { + target = &n; + break; + } } } - + // Fallback: route to first coordinator if leader unknown (leader will redirect or proxy) if (!target && !nodes.empty()) target = &nodes[0]; - + if (target) { network::RpcClient client(target->address, target->cluster_port); if (client.connect()) { @@ -112,7 +115,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, // For POC, we treat it success locally after replication initiation } } - return {}; + return {}; } auto data_nodes = cluster_manager_.get_data_nodes(); @@ -319,7 +322,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, const uint32_t shard_idx = cluster::ShardManager::compute_shard( pk_val, static_cast(data_nodes.size())); - + // Leader-Aware Routing: Find shard leader std::string leader_id = cluster_manager_.get_leader(shard_idx + 1); bool found_leader = false; @@ -332,7 +335,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, } } } - + if (!found_leader) target_nodes.push_back(data_nodes[shard_idx]); } } @@ -353,7 +356,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, if (try_extract_sharding_key(where_expr, pk_val)) { const uint32_t shard_idx = cluster::ShardManager::compute_shard( pk_val, static_cast(data_nodes.size())); - + // Leader-Aware Routing: Route mutations/queries to the current shard leader std::string leader_id = cluster_manager_.get_leader(shard_idx + 1); bool found_leader = false; diff --git a/src/distributed/raft_group.cpp b/src/distributed/raft_group.cpp index f59865a..39b4608 100644 --- a/src/distributed/raft_group.cpp +++ b/src/distributed/raft_group.cpp @@ -60,8 +60,8 @@ LogEntry deserialize_entry(const uint8_t* data, size_t& offset, size_t size) { } // namespace -RaftGroup::RaftGroup(uint16_t group_id, std::string node_id, cluster::ClusterManager& cluster_manager, - network::RpcServer& rpc_server) +RaftGroup::RaftGroup(uint16_t group_id, std::string node_id, + cluster::ClusterManager& cluster_manager, network::RpcServer& rpc_server) : group_id_(group_id), node_id_(std::move(node_id)), cluster_manager_(cluster_manager), @@ -146,7 +146,8 @@ void RaftGroup::do_candidate() { network::RpcClient client(peer.address, peer.cluster_port); if (client.connect()) { std::vector reply_payload; - if (client.call(network::RpcType::RequestVote, args.serialize(), reply_payload, group_id_)) { + if (client.call(network::RpcType::RequestVote, args.serialize(), reply_payload, + group_id_)) { if (reply_payload.size() >= VOTE_REPLY_SIZE) { term_t resp_term = 0; std::memcpy(&resp_term, reply_payload.data(), 8); @@ -191,7 +192,8 @@ void RaftGroup::do_leader() { network::RpcClient client(peer.address, peer.cluster_port); if (client.connect()) { - static_cast(client.send_only(network::RpcType::AppendEntries, payload, group_id_)); + static_cast( + client.send_only(network::RpcType::AppendEntries, payload, group_id_)); } } std::this_thread::sleep_for(std::chrono::milliseconds(HEARTBEAT_INTERVAL_MS)); @@ -310,7 +312,7 @@ void RaftGroup::persist_state() { uint64_t v_len = persistent_state_.voted_for.size(); out.write(reinterpret_cast(&v_len), 8); out.write(persistent_state_.voted_for.data(), v_len); - + uint64_t log_size = persistent_state_.log.size(); out.write(reinterpret_cast(&log_size), 8); for (const auto& entry : persistent_state_.log) { @@ -332,7 +334,7 @@ void RaftGroup::load_state() { in.read(reinterpret_cast(&v_len), 8); persistent_state_.voted_for.resize(v_len); in.read(&persistent_state_.voted_for[0], v_len); - + uint64_t log_size = 0; in.read(reinterpret_cast(&log_size), 8); for (uint64_t i = 0; i < log_size; ++i) { @@ -348,7 +350,7 @@ void RaftGroup::load_state() { bool RaftGroup::replicate(const std::vector& data) { if (state_.load() != NodeState::Leader) return false; - + std::scoped_lock lock(mutex_); LogEntry entry; entry.term = persistent_state_.current_term; diff --git a/src/distributed/raft_manager.cpp b/src/distributed/raft_manager.cpp index 019d0aa..41b4870 100644 --- a/src/distributed/raft_manager.cpp +++ b/src/distributed/raft_manager.cpp @@ -9,9 +9,7 @@ namespace cloudsql::raft { RaftManager::RaftManager(std::string node_id, cluster::ClusterManager& cluster_manager, network::RpcServer& rpc_server) - : node_id_(std::move(node_id)), - cluster_manager_(cluster_manager), - rpc_server_(rpc_server) { + : node_id_(std::move(node_id)), cluster_manager_(cluster_manager), rpc_server_(rpc_server) { // Register routing handlers rpc_server_.set_handler(network::RpcType::RequestVote, [this](const network::RpcHeader& h, const std::vector& p, @@ -42,8 +40,7 @@ std::shared_ptr RaftManager::get_or_create_group(uint16_t group_id) { return it->second; } - auto group = - std::make_shared(group_id, node_id_, cluster_manager_, rpc_server_); + auto group = std::make_shared(group_id, node_id_, cluster_manager_, rpc_server_); groups_[group_id] = group; return group; } @@ -58,7 +55,7 @@ std::shared_ptr RaftManager::get_group(uint16_t group_id) { } void RaftManager::handle_raft_rpc(const network::RpcHeader& header, - const std::vector& payload, int client_fd) { + const std::vector& payload, int client_fd) { std::shared_ptr group; { const std::scoped_lock lock(mutex_); diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index a7558aa..8e6cb22 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -68,10 +68,11 @@ void ShardStateMachine::apply(const raft::LogEntry& entry) { } storage::HeapTable table(table_name, bpm_, schema); - if (type == 1) { // INSERT - Tuple tuple = network::Serializer::deserialize_tuple(entry.data.data(), offset, entry.data.size()); + if (type == 1) { // INSERT + Tuple tuple = + network::Serializer::deserialize_tuple(entry.data.data(), offset, entry.data.size()); table.insert(tuple, 0); - } else if (type == 2) { // DELETE + } else if (type == 2) { // DELETE storage::HeapTable::TupleId rid; if (offset + 8 > entry.data.size()) return; std::memcpy(&rid.page_num, entry.data.data() + offset, 4); @@ -307,24 +308,24 @@ QueryResult QueryExecutor::execute_insert(const parser::InsertStatement& stmt, // POC: Data Replication Logic if (cluster_manager_ != nullptr && cluster_manager_->get_raft_manager() != nullptr) { // Find shard group (assume shard 1 for POC) - auto shard_group = cluster_manager_->get_raft_manager()->get_group(1); + auto shard_group = cluster_manager_->get_raft_manager()->get_group(1); if (shard_group && shard_group->is_leader()) { std::vector cmd; - cmd.push_back(1); // Type 1: INSERT + cmd.push_back(1); // Type 1: INSERT uint32_t tlen = static_cast(table_name.size()); size_t off = cmd.size(); cmd.resize(off + 4 + tlen); std::memcpy(cmd.data() + off, &tlen, 4); std::memcpy(cmd.data() + off + 4, table_name.data(), tlen); network::Serializer::serialize_tuple(tuple, cmd); - + if (!shard_group->replicate(cmd)) { result.set_error("Replication failed for shard 1"); return result; } } } - + const auto tid = table.insert(tuple, xmin); /* Log INSERT */ @@ -390,10 +391,10 @@ QueryResult QueryExecutor::execute_delete(const parser::DeleteStatement& stmt, for (const auto& rid : target_rids) { // POC: Replication Logic if (cluster_manager_ != nullptr && cluster_manager_->get_raft_manager() != nullptr) { - auto shard_group = cluster_manager_->get_raft_manager()->get_group(1); + auto shard_group = cluster_manager_->get_raft_manager()->get_group(1); if (shard_group && shard_group->is_leader()) { std::vector cmd; - cmd.push_back(2); // Type 2: DELETE + cmd.push_back(2); // Type 2: DELETE uint32_t tlen = static_cast(table_name.size()); size_t off = cmd.size(); cmd.resize(off + 4 + tlen + 8); @@ -401,7 +402,7 @@ QueryResult QueryExecutor::execute_delete(const parser::DeleteStatement& stmt, std::memcpy(cmd.data() + off + 4, table_name.data(), tlen); std::memcpy(cmd.data() + off + 4 + tlen, &rid.page_num, 4); std::memcpy(cmd.data() + off + 4 + tlen + 4, &rid.slot_num, 4); - + if (!shard_group->replicate(cmd)) { result.set_error("Replication failed for shard 1"); return result; diff --git a/src/main.cpp b/src/main.cpp index 4bc2fa1..35d33da 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -227,10 +227,10 @@ int main(int argc, char* argv[]) { if (config.mode != cloudsql::config::RunMode::Standalone) { cluster_manager = std::make_unique(&config); rpc_server = std::make_unique(config.cluster_port); - + const std::string node_id = "node_" + std::to_string(config.cluster_port); raft_manager = std::make_unique(node_id, *cluster_manager, - *rpc_server); + *rpc_server); cluster_manager->set_raft_manager(raft_manager.get()); /* Every node in distributed mode participates in the Catalog group (ID 0) */ @@ -239,6 +239,12 @@ int main(int argc, char* argv[]) { catalog->set_raft_group(catalog_group.get()); if (config.mode == cloudsql::config::RunMode::Data) { + // Data nodes also participate in shard consensus (e.g. Group 1) + auto shard_group = raft_manager->get_or_create_group(1); + // Mock state machine for shard 1 + static cloudsql::executor::ShardStateMachine shard_sm("data", *bpm, *catalog); + shard_group->set_state_machine(&shard_sm); + // Register execution handler for Data nodes rpc_server->set_handler( cloudsql::network::RpcType::ExecuteFragment, @@ -387,7 +393,8 @@ int main(int argc, char* argv[]) { resp_h.payload_len = static_cast(resp_p.size()); char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; resp_h.encode(h_buf); - static_cast(send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }); @@ -505,7 +512,8 @@ int main(int argc, char* argv[]) { resp_h.payload_len = static_cast(resp_p.size()); char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; resp_h.encode(h_buf); - static_cast(send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }); } diff --git a/src/network/rpc_client.cpp b/src/network/rpc_client.cpp index d5dd118..11b675d 100644 --- a/src/network/rpc_client.cpp +++ b/src/network/rpc_client.cpp @@ -63,7 +63,7 @@ void RpcClient::disconnect() { bool RpcClient::call(RpcType type, const std::vector& payload, std::vector& response_out, uint16_t group_id) { const std::scoped_lock lock(mutex_); - + if (fd_ < 0 && !connect()) { return false; } @@ -80,7 +80,7 @@ bool RpcClient::call(RpcType type, const std::vector& payload, if (send(fd_, header_buf, RpcHeader::HEADER_SIZE, 0) <= 0) { return false; } - + if (!payload.empty()) { if (send(fd_, payload.data(), payload.size(), 0) <= 0) { return false; @@ -95,7 +95,7 @@ bool RpcClient::call(RpcType type, const std::vector& payload, const RpcHeader resp_header = RpcHeader::decode(resp_buf.data()); response_out.resize(resp_header.payload_len); - + if (resp_header.payload_len > 0) { if (recv(fd_, response_out.data(), resp_header.payload_len, MSG_WAITALL) <= 0) { return false; @@ -107,7 +107,7 @@ bool RpcClient::call(RpcType type, const std::vector& payload, bool RpcClient::send_only(RpcType type, const std::vector& payload, uint16_t group_id) { const std::scoped_lock lock(mutex_); - + if (fd_ < 0 && !connect()) { return false; } @@ -123,7 +123,7 @@ bool RpcClient::send_only(RpcType type, const std::vector& payload, uin if (send(fd_, header_buf, RpcHeader::HEADER_SIZE, 0) <= 0) { return false; } - + if (!payload.empty()) { if (send(fd_, payload.data(), payload.size(), 0) <= 0) { return false; diff --git a/src/network/rpc_server.cpp b/src/network/rpc_server.cpp index d902703..011566b 100644 --- a/src/network/rpc_server.cpp +++ b/src/network/rpc_server.cpp @@ -63,6 +63,21 @@ void RpcServer::stop() { if (accept_thread_.joinable()) { accept_thread_.join(); } + + std::vector workers; + { + const std::scoped_lock lock(worker_mutex_); + workers.swap(worker_threads_); + } + + for (auto& t : workers) { + if (t.joinable()) { + t.join(); + } + } + + const std::scoped_lock lock(handlers_mutex_); + handlers_.clear(); } void RpcServer::set_handler(RpcType type, RpcHandler handler) { @@ -82,8 +97,8 @@ void RpcServer::accept_loop() { if (select(listen_fd_ + 1, &fds, nullptr, nullptr, &tv) > 0) { const int client_fd = accept(listen_fd_, nullptr, nullptr); if (client_fd >= 0) { - // Detach worker threads to avoid lifecycle management issues during shutdown - std::thread(&RpcServer::handle_client, this, client_fd).detach(); + const std::scoped_lock lock(worker_mutex_); + worker_threads_.emplace_back(&RpcServer::handle_client, this, client_fd); } } } diff --git a/src/network/server.cpp b/src/network/server.cpp index 6110eee..3a422fb 100644 --- a/src/network/server.cpp +++ b/src/network/server.cpp @@ -270,6 +270,7 @@ void Server::accept_connections() { struct sockaddr_in client_addr {}; socklen_t client_len = sizeof(client_addr); + const int client_fd = accept(fd, reinterpret_cast(&client_addr), &client_len); diff --git a/tests/distributed_tests.cpp b/tests/distributed_tests.cpp index 5945823..d008a61 100644 --- a/tests/distributed_tests.cpp +++ b/tests/distributed_tests.cpp @@ -77,9 +77,9 @@ TEST(DistributedExecutorTests, AggregationMerge) { RpcHeader resp_h; resp_h.type = RpcType::QueryResults; resp_h.payload_len = static_cast(resp_p.size()); - std::array h_buf{}; - resp_h.encode(h_buf.data()); - static_cast(send(fd, h_buf.data(), 8, 0)); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }; @@ -128,9 +128,9 @@ TEST(DistributedExecutorTests, ShardPruningSelect) { RpcHeader resp_h; resp_h.type = RpcType::QueryResults; resp_h.payload_len = static_cast(resp_p.size()); - std::array h_buf{}; - resp_h.encode(h_buf.data()); - static_cast(send(fd, h_buf.data(), 8, 0)); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }; auto h2 = [&](const RpcHeader& h, const std::vector& p, int fd) { @@ -143,9 +143,9 @@ TEST(DistributedExecutorTests, ShardPruningSelect) { RpcHeader resp_h; resp_h.type = RpcType::QueryResults; resp_h.payload_len = static_cast(resp_p.size()); - std::array h_buf{}; - resp_h.encode(h_buf.data()); - static_cast(send(fd, h_buf.data(), 8, 0)); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }; @@ -194,9 +194,9 @@ TEST(DistributedExecutorTests, DataRedistributionShuffle) { RpcHeader resp_h; resp_h.type = RpcType::QueryResults; resp_h.payload_len = static_cast(resp_p.size()); - std::array h_buf{}; - resp_h.encode(h_buf.data()); - static_cast(send(fd, h_buf.data(), 8, 0)); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }); ASSERT_TRUE(target_node.start()); @@ -259,9 +259,9 @@ TEST(DistributedExecutorTests, BroadcastJoinOrchestration) { RpcHeader resp_h; resp_h.type = RpcType::QueryResults; resp_h.payload_len = static_cast(resp_p.size()); - std::array h_buf{}; - resp_h.encode(h_buf.data()); - static_cast(send(fd, h_buf.data(), 8, 0)); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }; @@ -321,9 +321,9 @@ TEST(DistributedExecutorTests, ShuffleJoinOrchestration) { RpcHeader resp_h; resp_h.type = RpcType::QueryResults; resp_h.payload_len = static_cast(resp_p.size()); - std::array h_buf{}; - resp_h.encode(h_buf.data()); - static_cast(send(fd, h_buf.data(), 8, 0)); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }; diff --git a/tests/multi_raft_tests.cpp b/tests/multi_raft_tests.cpp index 0e081be..155a9a8 100644 --- a/tests/multi_raft_tests.cpp +++ b/tests/multi_raft_tests.cpp @@ -4,14 +4,15 @@ */ #include + #include #include #include -#include "distributed/raft_manager.hpp" +#include "common/cluster_manager.hpp" #include "distributed/raft_group.hpp" +#include "distributed/raft_manager.hpp" #include "network/rpc_message.hpp" -#include "common/cluster_manager.hpp" using namespace cloudsql; using namespace cloudsql::raft; @@ -46,8 +47,8 @@ TEST(MultiRaftTests, GroupRoutingAndMultiplexing) { RpcHeader h; h.type = RpcType::AppendEntries; h.group_id = 1; - std::vector payload(8, 0); - h.payload_len = 8; + std::vector payload(8, 0); + h.payload_len = RpcHeader::HEADER_SIZE; handler(h, payload, -1); @@ -56,7 +57,7 @@ TEST(MultiRaftTests, GroupRoutingAndMultiplexing) { } class IntegrationStateMachine : public RaftStateMachine { -public: + public: void apply(const LogEntry& entry) override { applied_count++; last_applied_data = entry.data; @@ -69,13 +70,13 @@ TEST(MultiRaftTests, StateMachineIntegration) { config::Config config; cluster::ClusterManager cm(&config); RpcServer rpc(9001); - + RaftGroup group(1, "node1", cm, rpc); IntegrationStateMachine sm; group.set_state_machine(&sm); std::vector payload(8, 0); - payload[0] = 1; + payload[0] = 1; RpcHeader h; h.type = RpcType::AppendEntries; @@ -83,7 +84,7 @@ TEST(MultiRaftTests, StateMachineIntegration) { h.payload_len = static_cast(payload.size()); group.handle_append_entries(h, payload, -1); - EXPECT_EQ(sm.applied_count, 0); + EXPECT_EQ(sm.applied_count, 0); } /** @@ -93,7 +94,7 @@ TEST(MultiRaftTests, StateMachineIntegration) { TEST(MultiRaftTests, LeaderElectionAndFailover) { const int num_nodes = 3; const int base_port = 9200; - + std::vector> configs; std::vector> cms; std::vector> rpcs; @@ -104,7 +105,7 @@ TEST(MultiRaftTests, LeaderElectionAndFailover) { cfg->mode = config::RunMode::Coordinator; cfg->cluster_port = base_port + i; configs.push_back(std::move(cfg)); - + cms.push_back(std::make_unique(configs.back().get())); rpcs.push_back(std::make_unique(base_port + i)); ASSERT_TRUE(rpcs.back()->start()); @@ -114,12 +115,13 @@ TEST(MultiRaftTests, LeaderElectionAndFailover) { std::string node_id = "node" + std::to_string(i + 1); rms.push_back(std::make_unique(node_id, *cms[i], *rpcs[i])); cms[i]->set_raft_manager(rms.back().get()); - + for (int j = 0; j < num_nodes; ++j) { std::string peer_id = "node" + std::to_string(j + 1); - cms[i]->register_node(peer_id, "127.0.0.1", base_port + j, config::RunMode::Coordinator); + cms[i]->register_node(peer_id, "127.0.0.1", base_port + j, + config::RunMode::Coordinator); } - + rms[i]->get_or_create_group(0); rms[i]->start(); } @@ -134,7 +136,7 @@ TEST(MultiRaftTests, LeaderElectionAndFailover) { leader_idx = i; } } - + EXPECT_EQ(leaders, 1) << "Exactly one leader should emerge from the cluster"; if (leaders == 1) { @@ -158,4 +160,4 @@ TEST(MultiRaftTests, LeaderElectionAndFailover) { } } -} // namespace +} // namespace diff --git a/tests/raft_simulation_tests.cpp b/tests/raft_simulation_tests.cpp index 3447738..110d44a 100644 --- a/tests/raft_simulation_tests.cpp +++ b/tests/raft_simulation_tests.cpp @@ -9,7 +9,7 @@ #include #include "common/cluster_manager.hpp" -#include "distributed/raft_node.hpp" +#include "distributed/raft_group.hpp" #include "network/rpc_server.hpp" using namespace cloudsql; @@ -24,17 +24,16 @@ TEST(RaftSimulationTests, FollowerToCandidate) { cluster::ClusterManager cm(&config); network::RpcServer rpc(7000); - RaftNode node("node1", cm, rpc); - node.start(); + RaftGroup group(1, "node1", cm, rpc); + group.start(); // Initially Follower - EXPECT_FALSE(node.is_leader()); + EXPECT_FALSE(group.is_leader()); // Wait for election timeout (150-300ms) std::this_thread::sleep_for(std::chrono::milliseconds(500)); // Should have attempted to become candidate/leader - // Note: without actual peers, it will stay Candidate or become Leader if needed=1 } TEST(RaftSimulationTests, HeartbeatReset) { @@ -44,24 +43,22 @@ TEST(RaftSimulationTests, HeartbeatReset) { cluster::ClusterManager cm(&config); network::RpcServer rpc(7001); - RaftNode node("node1", cm, rpc); - node.start(); - - auto handler = rpc.get_handler(network::RpcType::AppendEntries); - ASSERT_NE(handler, nullptr); + RaftGroup group(1, "node1", cm, rpc); + group.start(); // Send periodic heartbeats to prevent election for (int i = 0; i < 5; ++i) { std::vector payload(8, 0); // Term 0 network::RpcHeader header; header.type = network::RpcType::AppendEntries; + header.group_id = 1; header.payload_len = 8; - handler(header, payload, -1); + group.handle_append_entries(header, payload, -1); std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Should NOT be leader yet because heartbeats reset the timer - EXPECT_FALSE(node.is_leader()); + EXPECT_FALSE(group.is_leader()); } } diff --git a/tests/raft_tests.cpp b/tests/raft_tests.cpp index 90afc15..08017e8 100644 --- a/tests/raft_tests.cpp +++ b/tests/raft_tests.cpp @@ -7,7 +7,8 @@ #include "common/cluster_manager.hpp" #include "common/config.hpp" -#include "distributed/raft_node.hpp" +#include "distributed/raft_group.hpp" +#include "distributed/raft_manager.hpp" #include "network/rpc_server.hpp" using namespace cloudsql; @@ -24,8 +25,8 @@ TEST(RaftTests, StateTransitions) { cluster::ClusterManager cm(&config); network::RpcServer rpc(TEST_PORT); - RaftNode node("node1", cm, rpc); - EXPECT_FALSE(node.is_leader()); + RaftGroup group(1, "node1", cm, rpc); + EXPECT_FALSE(group.is_leader()); } } // namespace