Skip to content

Commit 97d1bb9

Browse files
author
accelerated
committed
Added queue full notify callback
1 parent ed81ce4 commit 97d1bb9

File tree

1 file changed

+32
-23
lines changed

1 file changed

+32
-23
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ class CPPKAFKA_API BufferedProducer {
9292
Async ///< Empty the buffer and don't wait for acks.
9393
};
9494
enum class QueueFullNotification {
95-
None, ///< Don't notify
96-
EdgeTriggered, ///< Notify once. Application must call queue_full_trigger_reset() to enable again.
97-
EachOccurence ///< Notify on each occurence.
95+
None, ///< Don't notify (default).
96+
OncePerMessage, ///< Notify once per message.
97+
EachOccurence ///< Notify on each occurence.
9898
};
9999
/**
100100
* Concrete builder
@@ -144,6 +144,14 @@ class CPPKAFKA_API BufferedProducer {
144144
* then FlushFailureCallback should not be set.
145145
*/
146146
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;
147+
148+
/**
149+
* Callback to indicate a queue full error was received when producing.
150+
*
151+
* The MessageBuilder instance represents the message which triggered the error. This callback will be called
152+
* according to the set_queue_full_notification() setting.
153+
*/
154+
using QueueFullCallback = std::function<void(const MessageBuilder&)>;
147155

148156
/**
149157
* \brief Constructs a buffered producer using the provided configuration
@@ -377,13 +385,6 @@ class CPPKAFKA_API BufferedProducer {
377385
* Get the queue full notification type.
378386
*/
379387
QueueFullNotification get_queue_full_notification() const;
380-
381-
/**
382-
* Reset the queue full notification trigger.
383-
*
384-
* This function has no effect unless QueueFullNotification == EdgeTriggered.
385-
*/
386-
void queue_full_trigger_reset();
387388

388389
/**
389390
* \brief Sets the message produce failure callback
@@ -449,6 +450,18 @@ class CPPKAFKA_API BufferedProducer {
449450
*/
450451
void set_flush_termination_callback(FlushTerminationCallback callback);
451452

453+
/**
454+
* \brief Sets the local queue full error callback
455+
*
456+
* This callback will be called when local message production fails during a produce() operation according to the
457+
* set_queue_full_notification() setting.
458+
*
459+
* \param callback
460+
*
461+
* \warning Do not call any method on the BufferedProducer while inside this callback
462+
*/
463+
void set_queue_full_callback(QueueFullCallback callback);
464+
452465
struct TestParameters {
453466
bool force_delivery_error_;
454467
bool force_produce_error_;
@@ -523,6 +536,7 @@ class CPPKAFKA_API BufferedProducer {
523536
ProduceTerminationCallback produce_termination_callback_;
524537
FlushFailureCallback flush_failure_callback_;
525538
FlushTerminationCallback flush_termination_callback_;
539+
QueueFullCallback queue_full_callback_;
526540
ssize_t max_buffer_size_{-1};
527541
FlushMethod flush_method_{FlushMethod::Sync};
528542
std::atomic<size_t> pending_acks_{0};
@@ -532,7 +546,6 @@ class CPPKAFKA_API BufferedProducer {
532546
int max_number_retries_{0};
533547
bool has_internal_data_{false};
534548
QueueFullNotification queue_full_notification_{QueueFullNotification::None};
535-
bool queue_full_trigger_{true};
536549
#ifdef KAFKA_TEST_INSTANCE
537550
TestParameters* test_params_;
538551
#endif
@@ -837,11 +850,6 @@ BufferedProducer<BufferType, Allocator>::get_queue_full_notification() const {
837850
return queue_full_notification_;
838851
}
839852

840-
template <typename BufferType, typename Allocator>
841-
void BufferedProducer<BufferType, Allocator>::queue_full_trigger_reset() {
842-
queue_full_trigger_ = true;
843-
}
844-
845853
template <typename BufferType, typename Allocator>
846854
void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(ProduceFailureCallback callback) {
847855
produce_failure_callback_ = std::move(callback);
@@ -867,13 +875,16 @@ void BufferedProducer<BufferType, Allocator>::set_flush_termination_callback(Flu
867875
flush_termination_callback_ = std::move(callback);
868876
}
869877

878+
template <typename BufferType, typename Allocator>
879+
void BufferedProducer<BufferType, Allocator>::set_queue_full_callback(QueueFullCallback callback) {
880+
queue_full_callback_ = std::move(callback);
881+
}
882+
870883
template <typename BufferType, typename Allocator>
871884
template <typename BuilderType>
872885
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
873886
using builder_type = typename std::decay<BuilderType>::type;
874-
bool queue_full_notify = (queue_full_notification_ == QueueFullNotification::None) ? false :
875-
(queue_full_notification_ == QueueFullNotification::EdgeTriggered) ?
876-
queue_full_trigger_ : true;
887+
bool queue_full_notify = queue_full_notification_ != QueueFullNotification::None;
877888
while (true) {
878889
try {
879890
MessageInternalGuard<builder_type> internal_guard(const_cast<builder_type&>(builder));
@@ -889,10 +900,8 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil
889900
producer_.poll();
890901
// Notify application so it can slow-down production
891902
if (queue_full_notify) {
892-
queue_full_notify = queue_full_trigger_ = false; //clear trigger and local state
893-
CallbackInvoker<Configuration::ErrorCallback>
894-
("error", get_producer().get_configuration().get_error_callback(), &get_producer())
895-
(get_producer(), static_cast<int>(ex.get_error().get_error()), ex.what());
903+
queue_full_notify = queue_full_notification_ == QueueFullNotification::EachOccurence;
904+
CallbackInvoker<QueueFullCallback>("queue full", queue_full_callback_, &producer_)(builder);
896905
}
897906
}
898907
else {

0 commit comments

Comments
 (0)