Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ def assign(self, cluster, members):
Arguments:
cluster (ClusterMetadata): metadata for use in assignment
members (dict of {member_id: Subscription}): decoded metadata
members ([JoinGroupResponseMember]): member_id and metadata
for each member in the group, including group_instance_id
when available.
when available (v5+). metadata is a decoded instance of
ConsumerProtocolSubscription.
Returns:
dict: {member_id: ConsumerProtocolAssignment}
Expand Down
14 changes: 7 additions & 7 deletions kafka/coordinator/assignors/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
version = 0

@classmethod
def assign(cls, cluster, group_subscriptions):
def assign(cls, cluster, members):
consumers_per_topic = collections.defaultdict(list)
for member_id, subscription in group_subscriptions.items():
for topic in subscription.topics:
consumers_per_topic[topic].append((subscription.group_instance_id, member_id))
for member in members:
for topic in member.metadata.topics:
consumers_per_topic[topic].append((member.group_instance_id, member.member_id))

# construct {member_id: {topic: [partition, ...]}}
assignment = collections.defaultdict(dict)
Expand Down Expand Up @@ -65,10 +65,10 @@ def assign(cls, cluster, group_subscriptions):
assignment[member_id][topic] = partitions[start:start+length]

protocol_assignment = {}
for member_id in group_subscriptions:
protocol_assignment[member_id] = ConsumerProtocolAssignment(
for member in members:
protocol_assignment[member.member_id] = ConsumerProtocolAssignment(
cls.version,
sorted(assignment[member_id].items()),
sorted(assignment[member.member_id].items()),
b'')
return protocol_assignment

Expand Down
17 changes: 9 additions & 8 deletions kafka/coordinator/assignors/roundrobin.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
version = 0

@classmethod
def assign(cls, cluster, group_subscriptions):
def assign(cls, cluster, members):
all_topics = set()
for subscription in group_subscriptions.values():
all_topics.update(subscription.topics)
for member in members:
all_topics.update(member.metadata.topics)

all_topic_partitions = []
for topic in all_topics:
Expand All @@ -67,10 +67,11 @@ def assign(cls, cluster, group_subscriptions):
assignment = collections.defaultdict(lambda: collections.defaultdict(list))

# Sort static and dynamic members separately to maintain stable static assignments
ungrouped = [(subscription.group_instance_id, member_id) for member_id, subscription in group_subscriptions.items()]
ungrouped = [(member.group_instance_id, member.member_id) for member in members]
grouped = {k: list(g) for k, g in itertools.groupby(ungrouped, key=lambda ids: ids[0] is not None)}
member_list = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic
member_iter = itertools.cycle(member_list)
member_topics = {member.member_id: member.metadata.topics for member in members}

for partition in all_topic_partitions:
_group_instance_id, member_id = next(member_iter)
Expand All @@ -79,15 +80,15 @@ def assign(cls, cluster, group_subscriptions):
# member subscribed topics, we should be safe assuming that
# each topic in all_topic_partitions is in at least one member
# subscription; otherwise this could yield an infinite loop
while partition.topic not in group_subscriptions[member_id].topics:
while partition.topic not in member_topics[member_id]:
member_id = next(member_iter)
assignment[member_id][partition.topic].append(partition.partition)

protocol_assignment = {}
for member_id in group_subscriptions:
protocol_assignment[member_id] = ConsumerProtocolAssignment(
for member in members:
protocol_assignment[member.member_id] = ConsumerProtocolAssignment(
cls.version,
sorted(assignment[member_id].items()),
sorted(assignment[member.member_id].items()),
b'')
return protocol_assignment

Expand Down
20 changes: 10 additions & 10 deletions kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,26 +576,26 @@ def assign(cls, cluster, members):

Arguments:
cluster (ClusterMetadata): cluster metadata
members (dict of {member_id: MemberMetadata}): decoded metadata for each member in the group.
members ([JoinGroupResponseMember]): decoded metadata for each member in the group.

Returns:
dict: {member_id: ConsumerProtocolAssignment}
"""
members_metadata = {}
for consumer, member_metadata in members.items():
members_metadata[consumer] = cls.parse_member_metadata(member_metadata)

members_metadata = {
member.member_id: cls.parse_member_metadata(member.metadata)
for member in members
}
executor = StickyAssignmentExecutor(cluster, members_metadata)
executor.perform_initial_assignment()
executor.balance()

cls._latest_partition_movements = executor.partition_movements

assignment = {}
for member_id in members:
assignment[member_id] = ConsumerProtocolAssignment(
cls.version, sorted(executor.get_final_assignment(member_id)), b''
)
assignment = {
member.member_id: ConsumerProtocolAssignment(
cls.version, sorted(executor.get_final_assignment(member.member_id)), b'')
for member in members
}
return assignment

@classmethod
Expand Down
14 changes: 4 additions & 10 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from kafka.protocol.new.consumer.metadata import (
ConsumerProtocolType, ConsumerProtocolSubscription, ConsumerProtocolAssignment,
)
from kafka.coordinator.subscription import Subscription
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics import AnonMeasurable
Expand Down Expand Up @@ -330,15 +329,10 @@ def time_to_next_poll(self):
def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,)
member_subscriptions = {}
all_subscribed_topics = set()
for member in members:
subscription = Subscription(
ConsumerProtocolSubscription.decode(member.metadata),
member.group_instance_id
)
member_subscriptions[member.member_id] = subscription
all_subscribed_topics.update(subscription.topics)
member.metadata = ConsumerProtocolSubscription.decode(member.metadata)
all_subscribed_topics.update(member.metadata.topics)

# the leader will begin watching for changes to any of the topics
# the group is interested in, which ensures that all metadata changes
Expand All @@ -356,9 +350,9 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):

log.debug("Performing assignment for group %s using strategy %s"
" with subscriptions %s", self.group_id, assignor.name,
member_subscriptions)
members)

assignments = assignor.assign(self._cluster, member_subscriptions)
assignments = assignor.assign(self._cluster, members)

log.debug("Finished assignment for group %s: %s", self.group_id, assignments)

Expand Down
Loading
Loading