Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions libs/writer/writer_wrapper/test_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,66 @@ TEST_F(WriterWrapperUDSWriterTest, MultithreadedWrite)

EXPECT_EQ(actualIncrements, expectedIncrements);
}

// This is a unique test that attempts to create messages of exactly 10 bytes in size
// and writes to a buffer of size 10 bytes from multiple threads. The NDrive team discovered
// a deadlock scenario in this specific case where the buffer size matched the message size
// and multiple threads were writing simultaneously. This test is designed to reproduce
// that scenario to ensure it has been resolved.
TEST_F(WriterWrapperUDSWriterTest, TenThreadsBufferSize10Messages)
{
Logger::info("Starting 10 threads with buffer size 10 test...");

// Create a UDS writer with buffer size of 10
const std::string unixUrl = "/tmp/test_uds_socket";
WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, 21);

// Number of threads and counters to create
constexpr auto numThreads = 10;
constexpr auto countersPerThread = 1;
constexpr auto incrementsPerCounter = 10;

// Function for worker threads - creates counter names of size 10
auto worker = [&](int threadId)
{
// Create counters with names that result in messages of size 10
// Format: "ctr<6-digit-padded-id>" to ensure consistent message size
for (int i = 0; i < countersPerThread; i++)
{
std::string counterName = fmt::format("ctr{:06d}", threadId * countersPerThread + i);
MeterId meterId(counterName);
Counter counter(meterId);

// Increment each counter multiple times
for (int j = 0; j < incrementsPerCounter; j++)
{
counter.Increment();
}
}
};

// Start worker threads
std::vector<std::thread> threads;
for (int i = 0; i < numThreads; i++)
{
threads.emplace_back(worker, i);
}

// Wait for all threads to complete
for (auto& t : threads)
{
t.join();
}

// Give some time for messages to be sent
std::this_thread::sleep_for(std::chrono::milliseconds(900));

// Check messages
auto msgs = get_uds_messages();
EXPECT_FALSE(msgs.empty());

for (const auto& msg : msgs)
{
EXPECT_EQ(msg.size(), 21);
}
}
4 changes: 2 additions & 2 deletions libs/writer/writer_wrapper/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void Writer::ThreadSend()
{
std::unique_lock<std::mutex> lock(instance.writeMutex);
instance.cv_sender.wait(
lock, [&instance] { return instance.buffer.size() > instance.bufferSize || instance.shutdown.load(); });
lock, [&instance] { return instance.buffer.size() >= instance.bufferSize || instance.shutdown.load(); });
if (instance.shutdown.load() == true)
{
return;
Expand Down Expand Up @@ -119,7 +119,7 @@ void Writer::BufferedWrite(const std::string& message)
instance.buffer.append(message);
instance.buffer.push_back(NEW_LINE);
}
instance.buffer.size() > instance.bufferSize ? instance.cv_sender.notify_one() : instance.cv_receiver.notify_one();
instance.buffer.size() >= instance.bufferSize ? instance.cv_sender.notify_one() : instance.cv_receiver.notify_one();
}

void Writer::NonBufferedWrite(const std::string& message)
Expand Down
2 changes: 1 addition & 1 deletion spectator/registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Registry::Registry(const Config& config) : m_config(config)
if (config.GetWriterType() == WriterType::Memory)
{
Logger::info("Registry initializing Memory Writer");
Writer::Initialize(config.GetWriterType());
Writer::Initialize(config.GetWriterType(), "", 0, this->m_config.GetWriterBufferSize());
}
else if (config.GetWriterType() == WriterType::UDP)
{
Expand Down
Loading