@@ -87,8 +87,15 @@ template <typename BufferType,
8787 typename Allocator = std::allocator<ConcreteMessageBuilder<BufferType>>>
8888class CPPKAFKA_API BufferedProducer {
8989public:
90- enum class FlushMethod { Sync, // /< Empty the buffer and wait for acks from the broker
91- Async }; // /< Empty the buffer and don't wait for acks
90+ enum class FlushMethod {
91+ Sync, // /< Empty the buffer and wait for acks from the broker.
92+ Async // /< Empty the buffer and don't wait for acks.
93+ };
94+ enum class QueueFullNotification {
95+ None, // /< Don't notify
96+ EdgeTriggered, // /< Notify once. Application must call queue_full_trigger_reset() to enable again.
97+ EachOccurence // /< Notify on each occurence.
98+ };
9299 /* *
93100 * Concrete builder
94101 */
@@ -358,6 +365,25 @@ class CPPKAFKA_API BufferedProducer {
358365 * Simple helper to construct a builder object
359366 */
360367 Builder make_builder (std::string topic);
368+
369+ /* *
370+ * Set the type of notification when RD_KAFKA_RESP_ERR__QUEUE_FULL is received.
371+ *
372+ * This will call the error callback for this producer. By default this is set to QueueFullNotification::None.
373+ */
374+ void set_queue_full_notification (QueueFullNotification notification);
375+
376+ /* *
377+ * Get the queue full notification type.
378+ */
379+ QueueFullNotification get_queue_full_notification () const ;
380+
381+ /* *
382+ * Reset the queue full notification trigger.
383+ *
384+ * This function has no effect unless QueueFullNotification == EdgeTriggered.
385+ */
386+ void queue_full_trigger_reset ();
361387
362388 /* *
363389 * \brief Sets the message produce failure callback
@@ -505,6 +531,8 @@ class CPPKAFKA_API BufferedProducer {
505531 std::atomic<size_t > total_messages_dropped_{0 };
506532 int max_number_retries_{0 };
507533 bool has_internal_data_{false };
534+ QueueFullNotification queue_full_notification_{QueueFullNotification::None};
535+ bool queue_full_trigger_{true };
508536#ifdef KAFKA_TEST_INSTANCE
509537 TestParameters* test_params_;
510538#endif
@@ -798,6 +826,22 @@ BufferedProducer<BufferType, Allocator>::make_builder(std::string topic) {
798826 return Builder (std::move (topic));
799827}
800828
829+ template <typename BufferType, typename Allocator>
830+ void BufferedProducer<BufferType, Allocator>::set_queue_full_notification(QueueFullNotification notification) {
831+ queue_full_notification_ = notification;
832+ }
833+
834+ template <typename BufferType, typename Allocator>
835+ typename BufferedProducer<BufferType, Allocator>::QueueFullNotification
836+ BufferedProducer<BufferType, Allocator>::get_queue_full_notification() const {
837+ return queue_full_notification_;
838+ }
839+
840+ template <typename BufferType, typename Allocator>
841+ void BufferedProducer<BufferType, Allocator>::queue_full_trigger_reset() {
842+ queue_full_trigger_ = true ;
843+ }
844+
801845template <typename BufferType, typename Allocator>
802846void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(ProduceFailureCallback callback) {
803847 produce_failure_callback_ = std::move (callback);
@@ -827,6 +871,9 @@ template <typename BufferType, typename Allocator>
827871template <typename BuilderType>
828872void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
829873 using builder_type = typename std::decay<BuilderType>::type;
874+ bool queue_full_notify = (queue_full_notification_ == QueueFullNotification::None) ? false :
875+ (queue_full_notification_ == QueueFullNotification::EdgeTriggered) ?
876+ queue_full_trigger_ : true ;
830877 while (true ) {
831878 try {
832879 MessageInternalGuard<builder_type> internal_guard (const_cast <builder_type&>(builder));
@@ -840,6 +887,13 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil
840887 if (ex.get_error () == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
841888 // If the output queue is full, then just poll
842889 producer_.poll ();
890+ // Notify application so it can slow-down production
891+ if (queue_full_notify) {
892+ queue_full_notify = queue_full_trigger_ = false ; // clear trigger and local state
893+ CallbackInvoker<Configuration::ErrorCallback>
894+ (" error" , get_producer ().get_configuration ().get_error_callback (), &get_producer ())
895+ (get_producer (), static_cast <int >(ex.get_error ().get_error ()), ex.what ());
896+ }
843897 }
844898 else {
845899 throw ;
0 commit comments