Skip to content

Commit f14a4b9

Browse files
authored
Merge pull request #150 from accelerated/flush_bug
Bug with message leak in BufferedProducer::flush(timeout)
2 parents ccc6738 + 8b431c5 commit f14a4b9

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,12 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
633633
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
634634
(std::chrono::high_resolution_clock::now() - start_time);
635635
} while (!flush_queue.empty() && (remaining.count() > 0));
636+
637+
// Re-enqueue remaining messages in original order
638+
if (!flush_queue.empty()) {
639+
std::lock_guard<std::mutex> lock(mutex_);
640+
messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end()));
641+
}
636642
}
637643
else {
638644
async_flush();

0 commit comments

Comments
 (0)