Skip to content

Commit 8dd5428

Browse files
author
accelerated
committed
Added similar logic for ProduceTerminationCallback
1 parent 0b9b7ba commit 8dd5428

File tree

1 file changed

+41
-6
lines changed

1 file changed

+41
-6
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,26 @@ class CPPKAFKA_API BufferedProducer {
104104
* Callback to indicate a message failed to be produced by the broker.
105105
*
106106
* The returned bool indicates whether the BufferedProducer should try to produce
107-
* the message again after each failure.
107+
* the message again after each failure, subject to the maximum number of retries set. If this callback
108+
* is not set or returns false or if the number of retries reaches zero, the ProduceTerminationCallback
109+
* will be called.
108110
*/
109111
using ProduceFailureCallback = std::function<bool(const Message&)>;
110112

113+
/**
114+
* Callback to indicate a message failed to be produced by the broker and was dropped.
115+
*
116+
* The application can use this callback to track delivery failure of messages similar to the
117+
* FlushTerminationCallback. If the application is only interested in message dropped events,
118+
* then ProduceFailureCallback should not be set.
119+
*/
120+
using ProduceTerminationCallback = std::function<void(const Message&)>;
121+
111122
/**
112123
* Callback to indicate a message failed to be flushed
113124
*
114125
* If this callback returns true, the message will be re-enqueued and flushed again later subject
115-
* to the maximum number of retries set. If this callback returns false or if the number or retries
126+
* to the maximum number of retries set. If this callback is not set or returns false or if the number of retries
116127
* reaches zero, the FlushTerminationCallback will be called.
117128
*/
118129
using FlushFailureCallback = std::function<bool(const MessageBuilder&, Error error)>;
@@ -122,7 +133,8 @@ class CPPKAFKA_API BufferedProducer {
122133
* reaches zero.
123134
*
124135
* The application can use this callback to track delivery failure of messages similar to the
125-
* ProduceFailureCallback.
136+
* ProduceTerminationCallback. If the application is only interested in message dropped events,
137+
* then FlushFailureCallback should not be set.
126138
*/
127139
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;
128140

@@ -356,13 +368,24 @@ class CPPKAFKA_API BufferedProducer {
356368
*
357369
* \param callback The callback to be set
358370
*
359-
* \remark It is *highly* recommended to set this callback as your message may be produced
360-
* indefinitely if there's a remote error.
361-
*
362371
* \warning Do not call any method on the BufferedProducer while inside this callback.
363372
*/
364373
void set_produce_failure_callback(ProduceFailureCallback callback);
365374

375+
/**
376+
* \brief Sets the message produce termination callback
377+
*
378+
* This will be called when the delivery report callback is executed for a message having
379+
* an error and after all retries have expired and the message is dropped.
380+
*
381+
* \param callback The callback to be set
382+
*
383+
* \remark If the application only tracks dropped messages, the set_produce_failure_callback() should not be set.
384+
*
385+
* \warning Do not call any method on the BufferedProducer while inside this callback.
386+
*/
387+
void set_produce_termination_callback(ProduceTerminationCallback callback);
388+
366389
/**
367390
* \brief Sets the successful delivery callback
368391
*
@@ -394,6 +417,8 @@ class CPPKAFKA_API BufferedProducer {
394417
*
395418
* \param callback
396419
*
420+
* \remark If the application only tracks dropped messages, the set_flush_failure_callback() should not be set.
421+
*
397422
* \warning Do not call any method on the BufferedProducer while inside this callback
398423
*/
399424
void set_flush_termination_callback(FlushTerminationCallback callback);
@@ -469,6 +494,7 @@ class CPPKAFKA_API BufferedProducer {
469494
mutable std::mutex mutex_;
470495
ProduceSuccessCallback produce_success_callback_;
471496
ProduceFailureCallback produce_failure_callback_;
497+
ProduceTerminationCallback produce_termination_callback_;
472498
FlushFailureCallback flush_failure_callback_;
473499
FlushTerminationCallback flush_termination_callback_;
474500
ssize_t max_buffer_size_{-1};
@@ -771,6 +797,11 @@ void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(Produ
771797
produce_failure_callback_ = std::move(callback);
772798
}
773799

800+
template <typename BufferType, typename Allocator>
801+
void BufferedProducer<BufferType, Allocator>::set_produce_termination_callback(ProduceTerminationCallback callback) {
802+
produce_termination_callback_ = std::move(callback);
803+
}
804+
774805
template <typename BufferType, typename Allocator>
775806
void BufferedProducer<BufferType, Allocator>::set_produce_success_callback(ProduceSuccessCallback callback) {
776807
produce_success_callback_ = std::move(callback);
@@ -873,10 +904,14 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
873904
}
874905
else {
875906
++total_messages_dropped_;
907+
CallbackInvoker<ProduceTerminationCallback>
908+
("produce termination", produce_termination_callback_, &producer_)(message);
876909
}
877910
}
878911
else {
879912
++total_messages_dropped_;
913+
CallbackInvoker<ProduceTerminationCallback>
914+
("produce termination", produce_termination_callback_, &producer_)(message);
880915
}
881916
}
882917
else {

0 commit comments

Comments
 (0)