@@ -482,9 +482,10 @@ class CPPKAFKA_API BufferedProducer {
482482#endif
483483
484484private:
485- enum class MessagePriority { Low, High };
486485 enum class SenderType { Sync, Async };
487-
486+ enum class QueueKind { Retry, Regular };
487+ enum class FlushAction { DontFlush, DoFlush };
488+
488489 template <typename T>
489490 struct CounterGuard {
490491 CounterGuard (std::atomic<T>& counter) : counter_(counter) { ++counter_; }
@@ -519,18 +520,21 @@ class CPPKAFKA_API BufferedProducer {
519520 return nullptr ;
520521 }
521522 template <typename BuilderType>
522- void do_add_message (BuilderType&& builder, MessagePriority priority, bool do_flush );
523+ void do_add_message (BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action );
523524 template <typename BuilderType>
524525 void produce_message (BuilderType&& builder);
525526 Configuration prepare_configuration (Configuration config);
526527 void on_delivery_report (const Message& message);
527528 template <typename BuilderType>
528529 void async_produce (BuilderType&& message, bool throw_on_error);
529-
530+ static void swap_queues (QueueType & queue1, QueueType & queue2, std::mutex & mutex);
531+
530532 // Members
531533 Producer producer_;
532534 QueueType messages_;
535+ QueueType retry_messages_;
533536 mutable std::mutex mutex_;
537+ mutable std::mutex retry_mutex_;
534538 ProduceSuccessCallback produce_success_callback_;
535539 ProduceFailureCallback produce_failure_callback_;
536540 ProduceTerminationCallback produce_termination_callback_;
@@ -565,7 +569,8 @@ template <typename BufferType, typename Allocator>
565569BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
566570 const Allocator& alloc)
567571: producer_(prepare_configuration(std::move(config))),
568- messages_ (alloc) {
572+ messages_ (alloc),
573+ retry_messages_(alloc) {
569574 producer_.set_payload_policy (get_default_payload_policy<BufferType>());
570575#ifdef KAFKA_TEST_INSTANCE
571576 test_params_ = nullptr ;
@@ -580,7 +585,7 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder&
580585template <typename BufferType, typename Allocator>
581586void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
582587 add_tracker (SenderType::Async, builder);
583- do_add_message (move (builder), MessagePriority::Low, true );
588+ do_add_message (move (builder), QueueKind::Regular, FlushAction::DoFlush );
584589}
585590
586591template <typename BufferType, typename Allocator>
@@ -624,30 +629,36 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
624629template <typename BufferType, typename Allocator>
625630void BufferedProducer<BufferType, Allocator>::async_flush() {
626631 CounterGuard<size_t > counter_guard (flushes_in_progress_);
627- QueueType flush_queue; // flush from temporary queue
632+ auto queue_flusher = [ this ](QueueType& queue, std::mutex & mutex)-> void
628633 {
629- std::lock_guard<std::mutex> lock (mutex_);
630- std::swap (messages_, flush_queue);
631- }
632- while (!flush_queue.empty ()) {
633- async_produce (std::move (flush_queue.front ()), false );
634- flush_queue.pop_front ();
635- }
634+ QueueType flush_queue; // flush from temporary queue
635+ swap_queues (queue, flush_queue, mutex);
636+
637+ while (!flush_queue.empty ()) {
638+ async_produce (std::move (flush_queue.front ()), false );
639+ flush_queue.pop_front ();
640+ }
641+ };
642+ queue_flusher (retry_messages_, retry_mutex_);
643+ queue_flusher (messages_, mutex_);
636644}
637645
638646template <typename BufferType, typename Allocator>
639647void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
640648 if (preserve_order) {
641649 CounterGuard<size_t > counter_guard (flushes_in_progress_);
642- QueueType flush_queue; // flush from temporary queue
650+ auto queue_flusher = [ this ](QueueType& queue, std::mutex & mutex)-> void
643651 {
644- std::lock_guard<std::mutex> lock (mutex_);
645- std::swap (messages_, flush_queue);
646- }
647- while (!flush_queue.empty ()) {
648- sync_produce (flush_queue.front ());
649- flush_queue.pop_front ();
650- }
652+ QueueType flush_queue; // flush from temporary queue
653+ swap_queues (queue, flush_queue, mutex);
654+
655+ while (!flush_queue.empty ()) {
656+ sync_produce (flush_queue.front ());
657+ flush_queue.pop_front ();
658+ }
659+ };
660+ queue_flusher (retry_messages_, retry_mutex_);
661+ queue_flusher (messages_, mutex_);
651662 }
652663 else {
653664 async_flush ();
@@ -661,25 +672,42 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
661672 if (preserve_order) {
662673 CounterGuard<size_t > counter_guard (flushes_in_progress_);
663674 QueueType flush_queue; // flush from temporary queue
675+ swap_queues (messages_, flush_queue, mutex_);
676+ QueueType retry_flush_queue; // flush from temporary retry queue
677+ swap_queues (retry_messages_, retry_flush_queue, retry_mutex_);
678+
679+ auto queue_flusher = [this ](QueueType& queue)->bool
664680 {
665- std::lock_guard<std::mutex> lock (mutex_);
666- std::swap (messages_, flush_queue);
667- }
681+ if (!queue.empty ()) {
682+ sync_produce (queue.front ());
683+ queue.pop_front ();
684+ return true ;
685+ }
686+ return false ;
687+ };
668688 auto remaining = timeout;
669689 auto start_time = std::chrono::high_resolution_clock::now ();
670690 do {
671- sync_produce (flush_queue.front ());
672- flush_queue.pop_front ();
691+ if (!queue_flusher (retry_flush_queue) && !queue_flusher (flush_queue)) {
692+ break ;
693+ }
673694 // calculate remaining time
674695 remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
675696 (std::chrono::high_resolution_clock::now () - start_time);
676- } while (!flush_queue. empty () && ( remaining.count () > 0 ) );
697+ } while (remaining.count () > 0 );
677698
678699 // Re-enqueue remaining messages in original order
679- if (!flush_queue.empty ()) {
680- std::lock_guard<std::mutex> lock (mutex_);
681- messages_.insert (messages_.begin (), std::make_move_iterator (flush_queue.begin ()), std::make_move_iterator (flush_queue.end ()));
682- }
700+ auto re_enqueuer = [this ](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void
701+ {
702+ if (!src_queue.empty ()) {
703+ std::lock_guard<std::mutex> lock (mutex);
704+ dst_queue.insert (dst_queue.begin (),
705+ std::make_move_iterator (src_queue.begin ()),
706+ std::make_move_iterator (src_queue.end ()));
707+ }
708+ };
709+ re_enqueuer (retry_flush_queue, retry_messages_, retry_mutex_);
710+ re_enqueuer (flush_queue, messages_, mutex_);
683711 }
684712 else {
685713 async_flush ();
@@ -732,14 +760,15 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
732760
733761template <typename BufferType, typename Allocator>
734762void BufferedProducer<BufferType, Allocator>::clear() {
735- std::lock_guard<std::mutex> lock (mutex_);
736763 QueueType tmp;
737- std::swap (tmp, messages_);
764+ swap_queues (messages_, tmp, mutex_);
765+ QueueType retry_tmp;
766+ swap_queues (retry_messages_, retry_tmp, retry_mutex_);
738767}
739768
740769template <typename BufferType, typename Allocator>
741770size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
742- return messages_.size ();
771+ return messages_.size () + retry_messages_. size () ;
743772}
744773
745774template <typename BufferType, typename Allocator>
@@ -769,18 +798,20 @@ BufferedProducer<BufferType, Allocator>::get_flush_method() const {
769798template <typename BufferType, typename Allocator>
770799template <typename BuilderType>
771800void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder,
772- MessagePriority priority,
773- bool do_flush) {
774- {
801+ QueueKind queue_kind,
802+ FlushAction flush_action) {
803+ if (queue_kind == QueueKind::Retry) {
804+ std::lock_guard<std::mutex> lock (retry_mutex_);
805+ retry_messages_.emplace_back (std::forward<BuilderType>(builder));
806+ }
807+ else {
775808 std::lock_guard<std::mutex> lock (mutex_);
776- if (priority == MessagePriority::High) {
777- messages_.emplace_front (std::forward<BuilderType>(builder));
778- }
779- else {
780- messages_.emplace_back (std::forward<BuilderType>(builder));
781- }
809+ messages_.emplace_back (std::forward<BuilderType>(builder));
782810 }
783- if (do_flush && (max_buffer_size_ >= 0 ) && (max_buffer_size_ <= (ssize_t )messages_.size ())) {
811+
812+ // Flush the queues only if a regular message is added. Retry messages may be added
813+ // from rdkafka callbacks, and flush/async_flush is a user-level call
814+ if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0 ) && (max_buffer_size_ <= get_buffer_size ())) {
784815 if (flush_method_ == FlushMethod::Sync) {
785816 flush ();
786817 }
@@ -928,7 +959,7 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
928959 TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal ());
929960 if (tracker && tracker->num_retries_ > 0 ) {
930961 --tracker->num_retries_ ;
931- do_add_message (std::forward<BuilderType>(builder), MessagePriority::High, false );
962+ do_add_message (std::forward<BuilderType>(builder), QueueKind::Retry, FlushAction::DontFlush );
932963 return ;
933964 }
934965 }
@@ -967,7 +998,7 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
967998 --tracker->num_retries_ ;
968999 if (tracker->sender_ == SenderType::Async) {
9691000 // Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
970- do_add_message (Builder (message), MessagePriority::High, false );
1001+ do_add_message (Builder (message), QueueKind::Retry, FlushAction::DontFlush );
9711002 }
9721003 should_retry = true ;
9731004 }
@@ -999,6 +1030,13 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
9991030 }
10001031}
10011032
1033+ template <typename BufferType, typename Allocator>
1034+ void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1, BufferedProducer<BufferType, Allocator>::QueueType & queue2, std::mutex & mutex)
1035+ {
1036+ std::lock_guard<std::mutex> lock (mutex);
1037+ std::swap (queue1, queue2);
1038+ }
1039+
10021040} // cppkafka
10031041
10041042#endif // CPPKAFKA_BUFFERED_PRODUCER_H
0 commit comments