diff --git a/libs/writer/writer_wrapper/test_writer.cpp b/libs/writer/writer_wrapper/test_writer.cpp index e93d9e9..3bb7eee 100644 --- a/libs/writer/writer_wrapper/test_writer.cpp +++ b/libs/writer/writer_wrapper/test_writer.cpp @@ -126,3 +126,66 @@ TEST_F(WriterWrapperUDSWriterTest, MultithreadedWrite) EXPECT_EQ(actualIncrements, expectedIncrements); } + +// This is a unique test that attempts to create messages of exactly 10 bytes in size +// and writes to a buffer of size 10 bytes from multiple threads. The NDrive team discovered +// a deadlock scenario in this specific case where the buffer size matched the message size +// and multiple threads were writing simultaneously. This test is designed to reproduce +// that scenario to ensure it has been resolved. +TEST_F(WriterWrapperUDSWriterTest, TenThreadsBufferSize10Messages) +{ + Logger::info("Starting 10 threads with buffer size 10 test..."); + + // Create a UDS writer with buffer size of 10 + const std::string unixUrl = "/tmp/test_uds_socket"; + WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, 21); + + // Number of threads and counters to create + constexpr auto numThreads = 10; + constexpr auto countersPerThread = 1; + constexpr auto incrementsPerCounter = 10; + + // Function for worker threads - creates counter names of size 10 + auto worker = [&](int threadId) + { + // Create counters with names that result in messages of size 10 + // Format: "ctr<6-digit-padded-id>" to ensure consistent message size + for (int i = 0; i < countersPerThread; i++) + { + std::string counterName = fmt::format("ctr{:06d}", threadId * countersPerThread + i); + MeterId meterId(counterName); + Counter counter(meterId); + + // Increment each counter multiple times + for (int j = 0; j < incrementsPerCounter; j++) + { + counter.Increment(); + } + } + }; + + // Start worker threads + std::vector threads; + for (int i = 0; i < numThreads; i++) + { + threads.emplace_back(worker, i); + } + + // Wait for all threads to complete + for (auto& t : threads) + { + t.join(); + } + + // Give some time for messages to be sent + std::this_thread::sleep_for(std::chrono::milliseconds(900)); + + // Check messages + auto msgs = get_uds_messages(); + EXPECT_FALSE(msgs.empty()); + + for (const auto& msg : msgs) + { + EXPECT_EQ(msg.size(), 21); + } +} diff --git a/libs/writer/writer_wrapper/writer.cpp b/libs/writer/writer_wrapper/writer.cpp index ac7fe1f..aac4887 100644 --- a/libs/writer/writer_wrapper/writer.cpp +++ b/libs/writer/writer_wrapper/writer.cpp @@ -89,7 +89,7 @@ void Writer::ThreadSend() { std::unique_lock lock(instance.writeMutex); instance.cv_sender.wait( - lock, [&instance] { return instance.buffer.size() > instance.bufferSize || instance.shutdown.load(); }); + lock, [&instance] { return instance.buffer.size() >= instance.bufferSize || instance.shutdown.load(); }); if (instance.shutdown.load() == true) { return; @@ -119,7 +119,7 @@ void Writer::BufferedWrite(const std::string& message) instance.buffer.append(message); instance.buffer.push_back(NEW_LINE); } - instance.buffer.size() > instance.bufferSize ? instance.cv_sender.notify_one() : instance.cv_receiver.notify_one(); + instance.buffer.size() >= instance.bufferSize ? instance.cv_sender.notify_one() : instance.cv_receiver.notify_one(); } void Writer::NonBufferedWrite(const std::string& message) diff --git a/spectator/registry.cpp b/spectator/registry.cpp index 52dd226..81d394e 100644 --- a/spectator/registry.cpp +++ b/spectator/registry.cpp @@ -41,7 +41,7 @@ Registry::Registry(const Config& config) : m_config(config) if (config.GetWriterType() == WriterType::Memory) { Logger::info("Registry initializing Memory Writer"); - Writer::Initialize(config.GetWriterType()); + Writer::Initialize(config.GetWriterType(), "", 0, this->m_config.GetWriterBufferSize()); } else if (config.GetWriterType() == WriterType::UDP) {