Skip to content

Commit ccc6738

Browse files
authored
Merge pull request #148 from accelerated/flush_failure
Added flush/produce termination callbacks
2 parents ab002fe + 8dd5428 commit ccc6738

File tree

1 file changed

+75
-6
lines changed

1 file changed

+75
-6
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,39 @@ class CPPKAFKA_API BufferedProducer {
104104
* Callback to indicate a message failed to be produced by the broker.
105105
*
106106
* The returned bool indicates whether the BufferedProducer should try to produce
107-
* the message again after each failure.
107+
* the message again after each failure, subject to the maximum number of retries set. If this callback
108+
* is not set or returns false or if the number of retries reaches zero, the ProduceTerminationCallback
109+
* will be called.
108110
*/
109111
using ProduceFailureCallback = std::function<bool(const Message&)>;
110112

113+
/**
114+
* Callback to indicate a message failed to be produced by the broker and was dropped.
115+
*
116+
* The application can use this callback to track delivery failure of messages similar to the
117+
* FlushTerminationCallback. If the application is only interested in message dropped events,
118+
* then ProduceFailureCallback should not be set.
119+
*/
120+
using ProduceTerminationCallback = std::function<void(const Message&)>;
121+
111122
/**
112123
* Callback to indicate a message failed to be flushed
124+
*
125+
* If this callback returns true, the message will be re-enqueued and flushed again later subject
126+
* to the maximum number of retries set. If this callback is not set or returns false or if the number of retries
127+
* reaches zero, the FlushTerminationCallback will be called.
113128
*/
114129
using FlushFailureCallback = std::function<bool(const MessageBuilder&, Error error)>;
130+
131+
/**
132+
* Callback to indicate a message was dropped after multiple flush attempts or when the retry count
133+
* reaches zero.
134+
*
135+
* The application can use this callback to track delivery failure of messages similar to the
136+
* ProduceTerminationCallback. If the application is only interested in message dropped events,
137+
* then FlushFailureCallback should not be set.
138+
*/
139+
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;
115140

116141
/**
117142
* \brief Constructs a buffered producer using the provided configuration
@@ -343,13 +368,24 @@ class CPPKAFKA_API BufferedProducer {
343368
*
344369
* \param callback The callback to be set
345370
*
346-
* \remark It is *highly* recommended to set this callback as your message may be produced
347-
* indefinitely if there's a remote error.
348-
*
349371
* \warning Do not call any method on the BufferedProducer while inside this callback.
350372
*/
351373
void set_produce_failure_callback(ProduceFailureCallback callback);
352374

375+
/**
376+
* \brief Sets the message produce termination callback
377+
*
378+
* This will be called when the delivery report callback is executed for a message having
379+
* an error and after all retries have expired and the message is dropped.
380+
*
381+
* \param callback The callback to be set
382+
*
383+
* \remark If the application only tracks dropped messages, the set_produce_failure_callback() should not be set.
384+
*
385+
* \warning Do not call any method on the BufferedProducer while inside this callback.
386+
*/
387+
void set_produce_termination_callback(ProduceTerminationCallback callback);
388+
353389
/**
354390
* \brief Sets the successful delivery callback
355391
*
@@ -360,19 +396,33 @@ class CPPKAFKA_API BufferedProducer {
360396
void set_produce_success_callback(ProduceSuccessCallback callback);
361397

362398
/**
363-
* \brief Sets the local message produce failure callback
399+
* \brief Sets the local flush failure callback
364400
*
365401
* This callback will be called when local message production fails during a flush() operation.
366402
* Failure errors are typically payload too large, unknown topic or unknown partition.
367403
* Note that if the callback returns false, the message will be dropped from the buffer,
368-
* otherwise it will be re-enqueued for later retry.
404+
* otherwise it will be re-enqueued for later retry subject to the message retry count.
369405
*
370406
* \param callback
371407
*
372408
* \warning Do not call any method on the BufferedProducer while inside this callback
373409
*/
374410
void set_flush_failure_callback(FlushFailureCallback callback);
375411

412+
/**
413+
* \brief Sets the local flush termination callback
414+
*
415+
* This callback will be called when local message production fails during a flush() operation after
416+
* all previous flush attempts have failed. The message will be dropped after this callback.
417+
*
418+
* \param callback
419+
*
420+
* \remark If the application only tracks dropped messages, the set_flush_failure_callback() should not be set.
421+
*
422+
* \warning Do not call any method on the BufferedProducer while inside this callback
423+
*/
424+
void set_flush_termination_callback(FlushTerminationCallback callback);
425+
376426
struct TestParameters {
377427
bool force_delivery_error_;
378428
bool force_produce_error_;
@@ -444,7 +494,9 @@ class CPPKAFKA_API BufferedProducer {
444494
mutable std::mutex mutex_;
445495
ProduceSuccessCallback produce_success_callback_;
446496
ProduceFailureCallback produce_failure_callback_;
497+
ProduceTerminationCallback produce_termination_callback_;
447498
FlushFailureCallback flush_failure_callback_;
499+
FlushTerminationCallback flush_termination_callback_;
448500
ssize_t max_buffer_size_{-1};
449501
FlushMethod flush_method_{FlushMethod::Sync};
450502
std::atomic<size_t> pending_acks_{0};
@@ -745,6 +797,11 @@ void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(Produ
745797
produce_failure_callback_ = std::move(callback);
746798
}
747799

800+
template <typename BufferType, typename Allocator>
801+
void BufferedProducer<BufferType, Allocator>::set_produce_termination_callback(ProduceTerminationCallback callback) {
802+
produce_termination_callback_ = std::move(callback);
803+
}
804+
748805
template <typename BufferType, typename Allocator>
749806
void BufferedProducer<BufferType, Allocator>::set_produce_success_callback(ProduceSuccessCallback callback) {
750807
produce_success_callback_ = std::move(callback);
@@ -755,6 +812,11 @@ void BufferedProducer<BufferType, Allocator>::set_flush_failure_callback(FlushFa
755812
flush_failure_callback_ = std::move(callback);
756813
}
757814

815+
template <typename BufferType, typename Allocator>
816+
void BufferedProducer<BufferType, Allocator>::set_flush_termination_callback(FlushTerminationCallback callback) {
817+
flush_termination_callback_ = std::move(callback);
818+
}
819+
758820
template <typename BufferType, typename Allocator>
759821
template <typename BuilderType>
760822
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
@@ -802,6 +864,9 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
802864
}
803865
}
804866
++total_messages_dropped_;
867+
// Call the flush termination callback
868+
CallbackInvoker<FlushTerminationCallback>("flush termination", flush_termination_callback_, &producer_)
869+
(builder, ex.get_error());
805870
if (throw_on_error) {
806871
throw;
807872
}
@@ -839,10 +904,14 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
839904
}
840905
else {
841906
++total_messages_dropped_;
907+
CallbackInvoker<ProduceTerminationCallback>
908+
("produce termination", produce_termination_callback_, &producer_)(message);
842909
}
843910
}
844911
else {
845912
++total_messages_dropped_;
913+
CallbackInvoker<ProduceTerminationCallback>
914+
("produce termination", produce_termination_callback_, &producer_)(message);
846915
}
847916
}
848917
else {

0 commit comments

Comments
 (0)