From b7d7e89a06deb0b657d981ddb07085f0ac798324 Mon Sep 17 00:00:00 2001 From: RT Date: Tue, 11 Sep 2018 17:39:05 +0530 Subject: [PATCH] change partition on leader not found --- kafka/producer/base.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 956cef6c5..a87214854 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -3,6 +3,7 @@ import atexit import logging import time +import random try: from queue import Empty, Full, Queue # pylint: disable=import-error @@ -125,9 +126,17 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, stop_event.set() break + topic_partitions = client.topic_partitions + # Adjust the timeout to match the remaining period count -= 1 timeout = send_at - time.time() + partitions_for_topic = len(topic_partitions[topic_partition.topic]) + partition = topic_partition.partition + while topic_partitions[topic_partition.topic][topic_partition.partition] != -1: + log.warn('APPLIFT : Leader not found for %s:%d, reassigning data to other', topic_partition.topic, partition) + partition = random.randint(0, partitions_for_topic) + topic_partition = TopicPartition(topic_partition.topic, partition) msgset[topic_partition].append((msg, key)) # Send collected requests upstream