Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libs/writer/writer_types/include/udp_writer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <base_writer.h>

#include <memory>
#include <string>
#include <boost/asio.hpp>

Expand All @@ -18,4 +20,8 @@ class UDPWriter final : public BaseWriter
std::unique_ptr<boost::asio::io_context> m_io_context;
std::unique_ptr<boost::asio::ip::udp::socket> m_socket;
boost::asio::ip::udp::endpoint m_endpoint;
bool m_socketEstablished;

bool CreateSocket();
bool TryToSend(const std::string& message);
};
8 changes: 4 additions & 4 deletions libs/writer/writer_types/include/uds_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class UDSWriter final : public BaseWriter
std::unique_ptr<boost::asio::io_context> m_ioContext;
std::unique_ptr<boost::asio::local::datagram_protocol::socket> m_socket;
boost::asio::local::datagram_protocol::endpoint m_endpoint;
bool m_isOpen;

// Helper method to initialize the connection
bool connect();
bool m_socketEstablished;
bool CreateSocket();
bool TryToSend(const std::string& message);
};
98 changes: 62 additions & 36 deletions libs/writer/writer_types/src/udp_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,98 @@

#include <logger.h>

UDPWriter::UDPWriter(const std::string& host, int port) : m_host(host), m_port(port)
UDPWriter::UDPWriter(const std::string& host, int port) :
m_host(host),
m_port(port),
m_io_context(std::make_unique<boost::asio::io_context>()),
m_socket(nullptr),
m_socketEstablished(false)
{
try
if (false == CreateSocket())
{
// Create io_context
m_io_context = std::make_unique<boost::asio::io_context>();

// Create socket
m_socket = std::make_unique<boost::asio::ip::udp::socket>(*m_io_context);
m_socket->open(boost::asio::ip::udp::v4());

// Resolve the endpoint
boost::asio::ip::udp::resolver resolver(*m_io_context);
m_endpoint = *resolver.resolve(boost::asio::ip::udp::v4(), m_host, std::to_string(m_port)).begin();
}
catch (const boost::system::system_error& e)
{
Logger::error("UDPWriter: Failed to initialize connection: {}", e.what());
Close();
Logger::error("UDPWriter: Failed to create socket for {}:{} during construction", m_host, m_port);
}
}

UDPWriter::~UDPWriter() { Close(); }

void UDPWriter::Write(const std::string& message) try
bool UDPWriter::CreateSocket() try
{
if (m_socket == nullptr || m_socket->is_open() == false)
if (m_socketEstablished)
{
Logger::error("UDPWriter: Socket not initialized or closed");
return;
this->Close();
}

boost::system::error_code ec;
size_t sent = m_socket->send_to(boost::asio::buffer(message.data(), message.size()), m_endpoint, 0, ec);

m_socket = std::make_unique<boost::asio::ip::udp::socket>(*m_io_context);
m_socket->open(boost::asio::ip::udp::v4(), ec);
if (ec)
{
Logger::error("UDPWriter: Failed to send message: {}", ec.message());
Logger::error("UDPWriter: Failed to create socket - {}", ec.message());
return false;
}
else if (sent != message.size())

m_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string(m_host), m_port);
m_socketEstablished = true;
Logger::info("UDPWriter: Socket created for {}:{}", m_host, m_port);
return true;
}
catch (const boost::system::system_error& ex)
{
Logger::error("UDP Writer: Boost exception: {}", ex.what());
return false;
}

bool UDPWriter::TryToSend(const std::string& message) try
{
boost::system::error_code ec;
for (int i = 0; i < 3; i++)
{
Logger::error("UDPWriter: Sent only {} bytes out of {} bytes", sent, message.size());
size_t sent = m_socket->send_to(boost::asio::buffer(message.data(), message.size()), m_endpoint, 0, ec);
if (ec || sent < message.size())
{
Logger::error("UDP Writer: Failed to send message - {}, sent {} bytes out of {}", ec.message(), sent, message.size());
continue;
}
return true;
}
return false;
}
catch (const std::exception& e)
catch (const boost::system::system_error& ex)
{
Logger::error("UDPWriter: Exception during write: {}", e.what());
Logger::error("UDP Writer: Boost exception: {}", ex.what());
return false;
}

void UDPWriter::Write(const std::string& message)
{
if (false == this->m_socketEstablished && false == this->CreateSocket())
{
Logger::error("UDPWriter: Failed to write message, socket not established {}:{}", m_host, m_port);
return;
}

if (TryToSend(message) == false)
{
Logger::error("UDP Writer: Failed to send message: {}", message);
this->Close();
}
}

void UDPWriter::Close() try
{
if (m_socket && m_socket->is_open())
this->m_socketEstablished = false;
if(m_socket && m_socket->is_open())
{
boost::system::error_code ec;
m_socket->close(ec);
if (ec)
{
Logger::error("UDPWriter: Error when closing socket: {}", ec.message());
Logger::error("UDP Writer: Error closing existing socket - {}", ec.message());
}
}

// Reset the unique_ptr to deallocate resources
m_socket.reset();
m_io_context.reset();
}
catch (const std::exception& e)
catch (const boost::system::system_error& ex)
{
Logger::error("UDPWriter: Exception during close: {}", e.what());
Logger::error("UDP Writer: Boost exception: {}", ex.what());
}
127 changes: 58 additions & 69 deletions libs/writer/writer_types/src/uds_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,108 +2,97 @@

#include <logger.h>

#include <boost/asio.hpp>

namespace local = boost::asio::local;

UDSWriter::UDSWriter(const std::string& socketPath)
: m_socketPath(socketPath),
m_ioContext(std::make_unique<boost::asio::io_context>()),
m_socket(nullptr),
m_isOpen(false)
m_socketEstablished(false)
{
connect();
if (false == CreateSocket())
{
Logger::error("UDS Writer: Failed to create socket for {} during construction", m_socketPath);
}
}

UDSWriter::~UDSWriter() { Close(); }

bool UDSWriter::connect()
bool UDSWriter::CreateSocket() try
{
if (m_isOpen)
if (m_socketEstablished)
{
return true; // Already connected
this->Close();
}

try

boost::system::error_code ec;
m_socket = std::make_unique<boost::asio::local::datagram_protocol::socket>(*m_ioContext);
m_socket->open(boost::asio::local::datagram_protocol(), ec);
if (ec)
{
// Create a new socket if needed
if (!m_socket)
{
m_socket = std::make_unique<local::datagram_protocol::socket>(*m_ioContext);
}
Logger::error("UDS Writer: Failed to create socket - {}", ec.message());
return false;
}

// Open the socket and prepare the endpoint
boost::system::error_code ec;
m_socket->open(local::datagram_protocol(), ec);

// Set up the server endpoint
m_endpoint = local::datagram_protocol::endpoint(m_socketPath);
m_endpoint = boost::asio::local::datagram_protocol::endpoint(m_socketPath);
m_socketEstablished = true;
Logger::info("UDS Writer: Socket created for {}", m_socketPath);
return true;
}
catch (const boost::system::system_error& ex)
{
Logger::error("UDS Writer: Boost exception: {}", ex.what());
return false;
}

if (ec)
{
Logger::error("UDS Writer: Failed to connect to {} - {}", m_socketPath, ec.message());
return false;
bool UDSWriter::TryToSend(const std::string& message) try
{
boost::system::error_code ec;
for (int i = 0; i < 3; i++)
{
size_t sent = m_socket->send_to(boost::asio::buffer(message), m_endpoint, 0, ec);
if (ec || sent < message.size())
{
Logger::error("UDS Writer: Failed to send message - {}, sent {} bytes out of {}", ec.message(), sent, message.size());
continue;
}

m_isOpen = true;
return true;
}
catch (const std::exception& e)
{
Logger::error("UDS Writer: Exception while connecting - {}", e.what());
m_isOpen = false;
return false;
}
return false;
}
catch (const boost::system::system_error& ex)
{
Logger::error("UDS Writer: Boost exception: {}", ex.what());
return false;
}

void UDSWriter::Write(const std::string& message)
{
if (!m_isOpen && !connect())
if (false == this->m_socketEstablished && false == this->CreateSocket())
{
Logger::error("UDS Writer: Cannot write - not connected to {}", m_socketPath);
Logger::error("UDS Writer: Failed to write message, socket not established {}", m_socketPath);
return;
}

try
{
boost::system::error_code ec;
size_t sent = m_socket->send_to(boost::asio::buffer(message), m_endpoint, 0, ec);

if (ec)
{
Logger::error("UDS Writer: Failed to send message - {}", ec.message());
m_isOpen = false; // Mark as disconnected on error
}
if (sent < message.size())
{
Logger::error("UDS Writer: Sent only {} bytes out of {} bytes", sent, message.size());
}
}
catch (const std::exception& e)
if (false == this->TryToSend(message))
{
Logger::error("UDS Writer: Exception while sending message - {}", e.what());
m_isOpen = false; // Mark as disconnected on exception
Logger::error("UDS Writer: Failed to send message: {}", message);
this->Close();
}
}

void UDSWriter::Close()
void UDSWriter::Close() try
{
if (m_socket && m_isOpen)
this->m_socketEstablished = false;
if (m_socket != nullptr && m_socket->is_open())
{
try
{
boost::system::error_code ec;
m_socket->close(ec);

if (ec)
{
Logger::error("UDS Writer: Error closing socket - {}", ec.message());
}
}
catch (const std::exception& e)
boost::system::error_code ec;
m_socket->close(ec);
if (ec)
{
Logger::error("UDS Writer: Exception while closing socket - {}", e.what());
Logger::error("UDS Writer: Error closing existing socket - {}", ec.message());
}
}
m_isOpen = false;
}
catch (const boost::system::system_error& ex)
{
Logger::error("UDS Writer: Boost exception: {}", ex.what());
}
Loading
Loading