Skip to content

Commit 4ba6b38

Browse files
authored
Merge pull request #149 from accelerated/queue_full
Added queue full notification
2 parents 5204655 + 4a6b677 commit 4ba6b38

File tree

1 file changed

+65
-2
lines changed

1 file changed

+65
-2
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,15 @@ template <typename BufferType,
8787
typename Allocator = std::allocator<ConcreteMessageBuilder<BufferType>>>
8888
class CPPKAFKA_API BufferedProducer {
8989
public:
90-
enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker
91-
Async }; ///< Empty the buffer and don't wait for acks
90+
enum class FlushMethod {
91+
Sync, ///< Empty the buffer and wait for acks from the broker.
92+
Async ///< Empty the buffer and don't wait for acks.
93+
};
94+
enum class QueueFullNotification {
95+
None, ///< Don't notify (default).
96+
OncePerMessage, ///< Notify once per message.
97+
EachOccurence ///< Notify on each occurence.
98+
};
9299
/**
93100
* Concrete builder
94101
*/
@@ -137,6 +144,14 @@ class CPPKAFKA_API BufferedProducer {
137144
* then FlushFailureCallback should not be set.
138145
*/
139146
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;
147+
148+
/**
149+
* Callback to indicate a RD_KAFKA_RESP_ERR__QUEUE_FULL was received when producing.
150+
*
151+
* The MessageBuilder instance represents the message which triggered the error. This callback will be called
152+
* according to the set_queue_full_notification() setting.
153+
*/
154+
using QueueFullCallback = std::function<void(const MessageBuilder&)>;
140155

141156
/**
142157
* \brief Constructs a buffered producer using the provided configuration
@@ -358,6 +373,18 @@ class CPPKAFKA_API BufferedProducer {
358373
* Simple helper to construct a builder object
359374
*/
360375
Builder make_builder(std::string topic);
376+
377+
/**
378+
* Set the type of notification when RD_KAFKA_RESP_ERR__QUEUE_FULL is received.
379+
*
380+
* This will call the error callback for this producer. By default this is set to QueueFullNotification::None.
381+
*/
382+
void set_queue_full_notification(QueueFullNotification notification);
383+
384+
/**
385+
* Get the queue full notification type.
386+
*/
387+
QueueFullNotification get_queue_full_notification() const;
361388

362389
/**
363390
* \brief Sets the message produce failure callback
@@ -423,6 +450,18 @@ class CPPKAFKA_API BufferedProducer {
423450
*/
424451
void set_flush_termination_callback(FlushTerminationCallback callback);
425452

453+
/**
454+
* \brief Sets the local queue full error callback
455+
*
456+
* This callback will be called when local message production fails during a produce() operation according to the
457+
* set_queue_full_notification() setting.
458+
*
459+
* \param callback
460+
*
461+
* \warning Do not call any method on the BufferedProducer while inside this callback
462+
*/
463+
void set_queue_full_callback(QueueFullCallback callback);
464+
426465
struct TestParameters {
427466
bool force_delivery_error_;
428467
bool force_produce_error_;
@@ -497,6 +536,7 @@ class CPPKAFKA_API BufferedProducer {
497536
ProduceTerminationCallback produce_termination_callback_;
498537
FlushFailureCallback flush_failure_callback_;
499538
FlushTerminationCallback flush_termination_callback_;
539+
QueueFullCallback queue_full_callback_;
500540
ssize_t max_buffer_size_{-1};
501541
FlushMethod flush_method_{FlushMethod::Sync};
502542
std::atomic<size_t> pending_acks_{0};
@@ -505,6 +545,7 @@ class CPPKAFKA_API BufferedProducer {
505545
std::atomic<size_t> total_messages_dropped_{0};
506546
int max_number_retries_{0};
507547
bool has_internal_data_{false};
548+
QueueFullNotification queue_full_notification_{QueueFullNotification::None};
508549
#ifdef KAFKA_TEST_INSTANCE
509550
TestParameters* test_params_;
510551
#endif
@@ -798,6 +839,17 @@ BufferedProducer<BufferType, Allocator>::make_builder(std::string topic) {
798839
return Builder(std::move(topic));
799840
}
800841

842+
template <typename BufferType, typename Allocator>
843+
void BufferedProducer<BufferType, Allocator>::set_queue_full_notification(QueueFullNotification notification) {
844+
queue_full_notification_ = notification;
845+
}
846+
847+
template <typename BufferType, typename Allocator>
848+
typename BufferedProducer<BufferType, Allocator>::QueueFullNotification
849+
BufferedProducer<BufferType, Allocator>::get_queue_full_notification() const {
850+
return queue_full_notification_;
851+
}
852+
801853
template <typename BufferType, typename Allocator>
802854
void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(ProduceFailureCallback callback) {
803855
produce_failure_callback_ = std::move(callback);
@@ -823,10 +875,16 @@ void BufferedProducer<BufferType, Allocator>::set_flush_termination_callback(Flu
823875
flush_termination_callback_ = std::move(callback);
824876
}
825877

878+
template <typename BufferType, typename Allocator>
879+
void BufferedProducer<BufferType, Allocator>::set_queue_full_callback(QueueFullCallback callback) {
880+
queue_full_callback_ = std::move(callback);
881+
}
882+
826883
template <typename BufferType, typename Allocator>
827884
template <typename BuilderType>
828885
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
829886
using builder_type = typename std::decay<BuilderType>::type;
887+
bool queue_full_notify = queue_full_notification_ != QueueFullNotification::None;
830888
while (true) {
831889
try {
832890
MessageInternalGuard<builder_type> internal_guard(const_cast<builder_type&>(builder));
@@ -840,6 +898,11 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil
840898
if (ex.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
841899
// If the output queue is full, then just poll
842900
producer_.poll();
901+
// Notify application so it can slow-down production
902+
if (queue_full_notify) {
903+
queue_full_notify = queue_full_notification_ == QueueFullNotification::EachOccurence;
904+
CallbackInvoker<QueueFullCallback>("queue full", queue_full_callback_, &producer_)(builder);
905+
}
843906
}
844907
else {
845908
throw;

0 commit comments

Comments
 (0)