diff --git a/libs/writer/writer_types/include/udp_writer.h b/libs/writer/writer_types/include/udp_writer.h index 3737482..ce64bbd 100644 --- a/libs/writer/writer_types/include/udp_writer.h +++ b/libs/writer/writer_types/include/udp_writer.h @@ -1,6 +1,8 @@ #pragma once #include + +#include #include #include @@ -18,4 +20,8 @@ class UDPWriter final : public BaseWriter std::unique_ptr m_io_context; std::unique_ptr m_socket; boost::asio::ip::udp::endpoint m_endpoint; + bool m_socketEstablished; + + bool CreateSocket(); + bool TryToSend(const std::string& message); }; diff --git a/libs/writer/writer_types/include/uds_writer.h b/libs/writer/writer_types/include/uds_writer.h index 16c6ea9..e2ea37b 100644 --- a/libs/writer/writer_types/include/uds_writer.h +++ b/libs/writer/writer_types/include/uds_writer.h @@ -19,8 +19,8 @@ class UDSWriter final : public BaseWriter std::unique_ptr m_ioContext; std::unique_ptr m_socket; boost::asio::local::datagram_protocol::endpoint m_endpoint; - bool m_isOpen; - - // Helper method to initialize the connection - bool connect(); + bool m_socketEstablished; + + bool CreateSocket(); + bool TryToSend(const std::string& message); }; diff --git a/libs/writer/writer_types/src/udp_writer.cpp b/libs/writer/writer_types/src/udp_writer.cpp index 967b76d..b3ad9ee 100644 --- a/libs/writer/writer_types/src/udp_writer.cpp +++ b/libs/writer/writer_types/src/udp_writer.cpp @@ -2,72 +2,98 @@ #include -UDPWriter::UDPWriter(const std::string& host, int port) : m_host(host), m_port(port) +UDPWriter::UDPWriter(const std::string& host, int port) : + m_host(host), + m_port(port), + m_io_context(std::make_unique()), + m_socket(nullptr), + m_socketEstablished(false) { - try + if (false == CreateSocket()) { - // Create io_context - m_io_context = std::make_unique(); - - // Create socket - m_socket = std::make_unique(*m_io_context); - m_socket->open(boost::asio::ip::udp::v4()); - - // Resolve the endpoint - boost::asio::ip::udp::resolver resolver(*m_io_context); - m_endpoint = *resolver.resolve(boost::asio::ip::udp::v4(), m_host, std::to_string(m_port)).begin(); - } - catch (const boost::system::system_error& e) - { - Logger::error("UDPWriter: Failed to initialize connection: {}", e.what()); - Close(); + Logger::error("UDPWriter: Failed to create socket for {}:{} during construction", m_host, m_port); } } UDPWriter::~UDPWriter() { Close(); } -void UDPWriter::Write(const std::string& message) try +bool UDPWriter::CreateSocket() try { - if (m_socket == nullptr || m_socket->is_open() == false) + if (m_socketEstablished) { - Logger::error("UDPWriter: Socket not initialized or closed"); - return; + this->Close(); } boost::system::error_code ec; - size_t sent = m_socket->send_to(boost::asio::buffer(message.data(), message.size()), m_endpoint, 0, ec); - + m_socket = std::make_unique(*m_io_context); + m_socket->open(boost::asio::ip::udp::v4(), ec); if (ec) { - Logger::error("UDPWriter: Failed to send message: {}", ec.message()); + Logger::error("UDPWriter: Failed to create socket - {}", ec.message()); + return false; } - else if (sent != message.size()) + + m_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string(m_host), m_port); + m_socketEstablished = true; + Logger::info("UDPWriter: Socket created for {}:{}", m_host, m_port); + return true; +} +catch (const boost::system::system_error& ex) +{ + Logger::error("UDP Writer: Boost exception: {}", ex.what()); + return false; +} + +bool UDPWriter::TryToSend(const std::string& message) try +{ + boost::system::error_code ec; + for (int i = 0; i < 3; i++) { - Logger::error("UDPWriter: Sent only {} bytes out of {} bytes", sent, message.size()); + size_t sent = m_socket->send_to(boost::asio::buffer(message.data(), message.size()), m_endpoint, 0, ec); + if (ec || sent < message.size()) + { + Logger::error("UDP Writer: Failed to send message - {}, sent {} bytes out of {}", ec.message(), sent, message.size()); + continue; + } + return true; } + return false; } -catch (const std::exception& e) +catch (const boost::system::system_error& ex) { - Logger::error("UDPWriter: Exception during write: {}", e.what()); + Logger::error("UDP Writer: Boost exception: {}", ex.what()); + return false; +} + +void UDPWriter::Write(const std::string& message) +{ + if (false == this->m_socketEstablished && false == this->CreateSocket()) + { + Logger::error("UDPWriter: Failed to write message, socket not established {}:{}", m_host, m_port); + return; + } + + if (TryToSend(message) == false) + { + Logger::error("UDP Writer: Failed to send message: {}", message); + this->Close(); + } } void UDPWriter::Close() try { - if (m_socket && m_socket->is_open()) + this->m_socketEstablished = false; + if(m_socket && m_socket->is_open()) { boost::system::error_code ec; m_socket->close(ec); if (ec) { - Logger::error("UDPWriter: Error when closing socket: {}", ec.message()); + Logger::error("UDP Writer: Error closing existing socket - {}", ec.message()); } } - - // Reset the unique_ptr to deallocate resources - m_socket.reset(); - m_io_context.reset(); } -catch (const std::exception& e) +catch (const boost::system::system_error& ex) { - Logger::error("UDPWriter: Exception during close: {}", e.what()); + Logger::error("UDP Writer: Boost exception: {}", ex.what()); } \ No newline at end of file diff --git a/libs/writer/writer_types/src/uds_writer.cpp b/libs/writer/writer_types/src/uds_writer.cpp index 17942f9..8126f06 100644 --- a/libs/writer/writer_types/src/uds_writer.cpp +++ b/libs/writer/writer_types/src/uds_writer.cpp @@ -2,108 +2,97 @@ #include -#include - -namespace local = boost::asio::local; - UDSWriter::UDSWriter(const std::string& socketPath) : m_socketPath(socketPath), m_ioContext(std::make_unique()), m_socket(nullptr), - m_isOpen(false) + m_socketEstablished(false) { - connect(); + if (false == CreateSocket()) + { + Logger::error("UDS Writer: Failed to create socket for {} during construction", m_socketPath); + } } UDSWriter::~UDSWriter() { Close(); } -bool UDSWriter::connect() +bool UDSWriter::CreateSocket() try { - if (m_isOpen) + if (m_socketEstablished) { - return true; // Already connected + this->Close(); } - - try + + boost::system::error_code ec; + m_socket = std::make_unique(*m_ioContext); + m_socket->open(boost::asio::local::datagram_protocol(), ec); + if (ec) { - // Create a new socket if needed - if (!m_socket) - { - m_socket = std::make_unique(*m_ioContext); - } + Logger::error("UDS Writer: Failed to create socket - {}", ec.message()); + return false; + } - // Open the socket and prepare the endpoint - boost::system::error_code ec; - m_socket->open(local::datagram_protocol(), ec); - - // Set up the server endpoint - m_endpoint = local::datagram_protocol::endpoint(m_socketPath); + m_endpoint = boost::asio::local::datagram_protocol::endpoint(m_socketPath); + m_socketEstablished = true; + Logger::info("UDS Writer: Socket created for {}", m_socketPath); + return true; +} +catch (const boost::system::system_error& ex) +{ + Logger::error("UDS Writer: Boost exception: {}", ex.what()); + return false; +} - if (ec) - { - Logger::error("UDS Writer: Failed to connect to {} - {}", m_socketPath, ec.message()); - return false; +bool UDSWriter::TryToSend(const std::string& message) try +{ + boost::system::error_code ec; + for (int i = 0; i < 3; i++) + { + size_t sent = m_socket->send_to(boost::asio::buffer(message), m_endpoint, 0, ec); + if (ec || sent < message.size()) + { + Logger::error("UDS Writer: Failed to send message - {}, sent {} bytes out of {}", ec.message(), sent, message.size()); + continue; } - - m_isOpen = true; return true; } - catch (const std::exception& e) - { - Logger::error("UDS Writer: Exception while connecting - {}", e.what()); - m_isOpen = false; - return false; - } + return false; +} +catch (const boost::system::system_error& ex) +{ + Logger::error("UDS Writer: Boost exception: {}", ex.what()); + return false; } void UDSWriter::Write(const std::string& message) { - if (!m_isOpen && !connect()) + if (false == this->m_socketEstablished && false == this->CreateSocket()) { - Logger::error("UDS Writer: Cannot write - not connected to {}", m_socketPath); + Logger::error("UDS Writer: Failed to write message, socket not established {}", m_socketPath); return; } - try - { - boost::system::error_code ec; - size_t sent = m_socket->send_to(boost::asio::buffer(message), m_endpoint, 0, ec); - - if (ec) - { - Logger::error("UDS Writer: Failed to send message - {}", ec.message()); - m_isOpen = false; // Mark as disconnected on error - } - if (sent < message.size()) - { - Logger::error("UDS Writer: Sent only {} bytes out of {} bytes", sent, message.size()); - } - } - catch (const std::exception& e) + if (false == this->TryToSend(message)) { - Logger::error("UDS Writer: Exception while sending message - {}", e.what()); - m_isOpen = false; // Mark as disconnected on exception + Logger::error("UDS Writer: Failed to send message: {}", message); + this->Close(); } } -void UDSWriter::Close() +void UDSWriter::Close() try { - if (m_socket && m_isOpen) + this->m_socketEstablished = false; + if (m_socket != nullptr && m_socket->is_open()) { - try - { - boost::system::error_code ec; - m_socket->close(ec); - - if (ec) - { - Logger::error("UDS Writer: Error closing socket - {}", ec.message()); - } - } - catch (const std::exception& e) + boost::system::error_code ec; + m_socket->close(ec); + if (ec) { - Logger::error("UDS Writer: Exception while closing socket - {}", e.what()); + Logger::error("UDS Writer: Error closing existing socket - {}", ec.message()); } } - m_isOpen = false; +} +catch (const boost::system::system_error& ex) +{ + Logger::error("UDS Writer: Boost exception: {}", ex.what()); } diff --git a/libs/writer/writer_types/test/test_udp_writer.cpp b/libs/writer/writer_types/test/test_udp_writer.cpp index 4d0ef91..f34102c 100644 --- a/libs/writer/writer_types/test/test_udp_writer.cpp +++ b/libs/writer/writer_types/test/test_udp_writer.cpp @@ -13,28 +13,43 @@ class UDPWriterTest : public testing::Test protected: void SetUp() override { - // Set the server to run - server_running = true; - // Clear any existing messages from previous tests - clear_messages(); - - // Start the UDP server in a separate thread - server_thread = std::thread( - [] - { - // This calls our server function directly - listen_for_udp_messages(); - }); - - // Give the server time to start - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + clear_udp_messages(); + + // By default, start the server for most tests + StartServer(); } void TearDown() override + { + StopServer(); + } + + + void StartServer() + { + if (!server_thread.joinable()) // Only start if not already running + { + // Set the server to run + udp_server_running = true; + + // Start the UDS server in a separate thread + server_thread = std::thread( + [] + { + // This calls our server function directly + listen_for_udp_messages(); + }); + + // Give the server time to start + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + } + + void StopServer() { // Signal the server to stop - server_running = false; + udp_server_running = false; // Terminate the server thread if (server_thread.joinable()) @@ -61,7 +76,7 @@ TEST_F(UDPWriterTest, SendMessage) std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Get current messages and verify the message was received - const auto messages = get_messages(); + const auto messages = get_udp_messages(); ASSERT_FALSE(messages.empty()); // Check that our message is in the vector @@ -87,7 +102,7 @@ TEST_F(UDPWriterTest, CloseAndReopen) std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Verify first message - auto messages = get_messages(); + auto messages = get_udp_messages(); ASSERT_FALSE(messages.empty()); ASSERT_EQ(messages.back(), message1); @@ -103,7 +118,7 @@ TEST_F(UDPWriterTest, CloseAndReopen) std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Verify second message - messages = get_messages(); + messages = get_udp_messages(); ASSERT_GE(messages.size(), 2); ASSERT_EQ(messages.back(), message2); } @@ -125,7 +140,7 @@ TEST_F(UDPWriterTest, SendMultipleMessages) std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Get received messages and verify - const auto received_messages = get_messages(); + const auto received_messages = get_udp_messages(); // Verify we received at least the number of messages we sent ASSERT_EQ(received_messages.size(), test_messages.size()); @@ -133,4 +148,45 @@ TEST_F(UDPWriterTest, SendMultipleMessages) ASSERT_EQ(test_messages.at(0), received_messages.at(0)); ASSERT_EQ(test_messages.at(1), received_messages.at(1)); ASSERT_EQ(test_messages.at(2), received_messages.at(2)); +} + +TEST_F(UDPWriterTest, ClientReconnectsWhenServerStartsLater) +{ + // First, stop the server that was started in SetUp + StopServer(); + + // Clear any existing messages + clear_udp_messages(); + + // Create a client before the server is available + UDPWriter writer("127.0.0.1", 12345); + + // Try to send a message while server is not running + // This should fail silently (UDSWriter logs errors but doesn't throw) + for (int i = 0; i < 3; ++i) + { + const std::string test_message_before = "Message sent before server starts"; + writer.Write(test_message_before); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + // Verify no messages were received (server wasn't running) + auto messages = get_udp_messages(); + EXPECT_TRUE(messages.empty()) << "Expected no messages when server is not running"; + + // Now start the server + StartServer(); + + // Try to send a message after server is started + // The UDSWriter should automatically reconnect on the next Write call + const std::string test_message_after = "Message sent after server starts"; + writer.Write(test_message_after); + + // Give time for the message to be processed + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + // Verify the message was received + messages = get_udp_messages(); + EXPECT_TRUE(messages.size() == 1); + EXPECT_TRUE(messages.at(0) == test_message_after); } \ No newline at end of file diff --git a/libs/writer/writer_types/test/test_uds_writer.cpp b/libs/writer/writer_types/test/test_uds_writer.cpp index e8588f3..bd6276f 100644 --- a/libs/writer/writer_types/test/test_uds_writer.cpp +++ b/libs/writer/writer_types/test/test_uds_writer.cpp @@ -1,4 +1,3 @@ - #include #include @@ -13,25 +12,39 @@ class UDSWriterTest : public testing::Test protected: void SetUp() override { - // Set the server to run - uds_server_running = true; - // Clear any existing messages from previous tests clear_uds_messages(); + + // By default, start the server for most tests + StartServer(); + } - // Start the UDS server in a separate thread - server_thread = std::thread( - [] - { - // This calls our server function directly - listen_for_uds_messages(); - }); + void TearDown() override + { + StopServer(); + } - // Give the server time to start - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + void StartServer() + { + if (!server_thread.joinable()) // Only start if not already running + { + // Set the server to run + uds_server_running = true; + + // Start the UDS server in a separate thread + server_thread = std::thread( + [] + { + // This calls our server function directly + listen_for_uds_messages(); + }); + + // Give the server time to start + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } } - void TearDown() override + void StopServer() { // Signal the server to stop uds_server_running = false; @@ -136,4 +149,45 @@ TEST_F(UDSWriterTest, SendMultipleMessages) { ASSERT_EQ(test_messages.at(i), received_messages.at(i)); } +} + +TEST_F(UDSWriterTest, ClientReconnectsWhenServerStartsLater) +{ + // First, stop the server that was started in SetUp + StopServer(); + + // Clear any existing messages + clear_uds_messages(); + + // Create a client before the server is available + UDSWriter writer("/tmp/test_uds_socket"); + + // Try to send a message while server is not running + // This should fail silently (UDSWriter logs errors but doesn't throw) + for (int i = 0; i < 3; ++i) + { + const std::string test_message_before = "Message sent before server starts"; + writer.Write(test_message_before); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + // Verify no messages were received (server wasn't running) + auto messages = get_uds_messages(); + EXPECT_TRUE(messages.empty()) << "Expected no messages when server is not running"; + + // Now start the server + StartServer(); + + // Try to send a message after server is started + // The UDSWriter should automatically reconnect on the next Write call + const std::string test_message_after = "Message sent after server starts"; + writer.Write(test_message_after); + + // Give time for the message to be processed + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + // Verify the message was received + messages = get_uds_messages(); + EXPECT_TRUE(messages.size() == 1); + EXPECT_TRUE(messages.at(0) == test_message_after); } \ No newline at end of file diff --git a/libs/writer/writer_types/test_utils/udp_server/udp_server.cpp b/libs/writer/writer_types/test_utils/udp_server/udp_server.cpp index 549bf80..fb6f12b 100644 --- a/libs/writer/writer_types/test_utils/udp_server/udp_server.cpp +++ b/libs/writer/writer_types/test_utils/udp_server/udp_server.cpp @@ -15,16 +15,16 @@ std::vector messages = {}; std::mutex messages_mutex; // Flag to control server shutdown -std::atomic server_running(true); +std::atomic udp_server_running(true); // Expose functions to interact with the messages vector -std::vector get_messages() +std::vector get_udp_messages() { std::lock_guard lock(messages_mutex); return messages; } -void clear_messages() +void clear_udp_messages() { std::lock_guard lock(messages_mutex); messages.clear(); @@ -68,7 +68,7 @@ try // Configure socket with a small timeout so we can check the run flag socket.non_blocking(true); - while (server_running) + while (udp_server_running) { try { @@ -86,7 +86,7 @@ try add_message(message); // Get the current count of messages - auto current_messages = get_messages(); + auto current_messages = get_udp_messages(); std::cout << "Received from " << sender_endpoint << ": " << message << std::endl; std::cout << "Total messages stored: " << current_messages.size() << std::endl; diff --git a/libs/writer/writer_types/test_utils/udp_server/udp_server.h b/libs/writer/writer_types/test_utils/udp_server/udp_server.h index 76ffac8..8934f38 100644 --- a/libs/writer/writer_types/test_utils/udp_server/udp_server.h +++ b/libs/writer/writer_types/test_utils/udp_server/udp_server.h @@ -6,12 +6,12 @@ #include // Functions to interact with the UDP server's message storage -std::vector get_messages(); -void clear_messages(); +std::vector get_udp_messages(); +void clear_udp_messages(); void add_message(const std::string& message); // Function to run the server in a thread - can be used by both the server executable and tests void listen_for_udp_messages(); // Flag to control server shutdown - used to gracefully stop the server -extern std::atomic server_running; +extern std::atomic udp_server_running; diff --git a/spectator/test_registry.cpp b/spectator/test_registry.cpp index 77a7d65..b7b56f6 100644 --- a/spectator/test_registry.cpp +++ b/spectator/test_registry.cpp @@ -166,7 +166,7 @@ TEST(RegistryTest, GaugeWithIdWithTtlSeconds) } TEST(RegistryTest, GaugeWithTtlSeconds) { - Config config(WriterConfig(WriterTypes::Memory)); + Config config{WriterConfig(WriterTypes::Memory)}; auto r = Registry(config); auto memoryWriter = static_cast(WriterTestHelper::GetImpl());