From fde0fd1372760777a3be6b978643b7b40d831532 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 21 Mar 2026 12:03:00 -0700 Subject: [PATCH 1/2] Assignor.assign takes JoinGroupResponseMembers list --- kafka/coordinator/assignors/abstract.py | 5 +- kafka/coordinator/assignors/range.py | 14 +-- kafka/coordinator/assignors/roundrobin.py | 17 ++-- .../assignors/sticky/sticky_assignor.py | 20 ++-- kafka/coordinator/consumer.py | 14 +-- test/test_assignors.py | 97 +++++++++++-------- test/test_coordinator.py | 24 ++--- 7 files changed, 101 insertions(+), 90 deletions(-) diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py index a14f11ee4..09f5d26f3 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/kafka/coordinator/assignors/abstract.py @@ -22,9 +22,10 @@ def assign(self, cluster, members): Arguments: cluster (ClusterMetadata): metadata for use in assignment - members (dict of {member_id: Subscription}): decoded metadata + members ([JoinGroupResponseMember]): member_id and metadata for each member in the group, including group_instance_id - when available. + when available (v5+). metadata is a decoded instance of + ConsumerProtocolSubscription. Returns: dict: {member_id: ConsumerProtocolAssignment} diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py index 7b1cad21e..133cfa876 100644 --- a/kafka/coordinator/assignors/range.py +++ b/kafka/coordinator/assignors/range.py @@ -32,11 +32,11 @@ class RangePartitionAssignor(AbstractPartitionAssignor): version = 0 @classmethod - def assign(cls, cluster, group_subscriptions): + def assign(cls, cluster, members): consumers_per_topic = collections.defaultdict(list) - for member_id, subscription in group_subscriptions.items(): - for topic in subscription.topics: - consumers_per_topic[topic].append((subscription.group_instance_id, member_id)) + for member in members: + for topic in member.metadata.topics: + consumers_per_topic[topic].append((member.group_instance_id, member.member_id)) # construct {member_id: {topic: [partition, ...]}} assignment = collections.defaultdict(dict) @@ -65,10 +65,10 @@ def assign(cls, cluster, group_subscriptions): assignment[member_id][topic] = partitions[start:start+length] protocol_assignment = {} - for member_id in group_subscriptions: - protocol_assignment[member_id] = ConsumerProtocolAssignment( + for member in members: + protocol_assignment[member.member_id] = ConsumerProtocolAssignment( cls.version, - sorted(assignment[member_id].items()), + sorted(assignment[member.member_id].items()), b'') return protocol_assignment diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index 279001cf2..2fe62e1c0 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -48,10 +48,10 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor): version = 0 @classmethod - def assign(cls, cluster, group_subscriptions): + def assign(cls, cluster, members): all_topics = set() - for subscription in group_subscriptions.values(): - all_topics.update(subscription.topics) + for member in members: + all_topics.update(member.metadata.topics) all_topic_partitions = [] for topic in all_topics: @@ -67,10 +67,11 @@ def assign(cls, cluster, group_subscriptions): assignment = collections.defaultdict(lambda: collections.defaultdict(list)) # Sort static and dynamic members separately to maintain stable static assignments - ungrouped = [(subscription.group_instance_id, member_id) for member_id, subscription in group_subscriptions.items()] + ungrouped = [(member.group_instance_id, member.member_id) for member in members] grouped = {k: list(g) for k, g in itertools.groupby(ungrouped, key=lambda ids: ids[0] is not None)} member_list = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic member_iter = itertools.cycle(member_list) + member_topics = {member.member_id: member.metadata.topics for member in members} for partition in all_topic_partitions: _group_instance_id, member_id = next(member_iter) @@ -79,15 +80,15 @@ def assign(cls, cluster, group_subscriptions): # member subscribed topics, we should be safe assuming that # each topic in all_topic_partitions is in at least one member # subscription; otherwise this could yield an infinite loop - while partition.topic not in group_subscriptions[member_id].topics: + while partition.topic not in member_topics[member_id]: member_id = next(member_iter) assignment[member_id][partition.topic].append(partition.partition) protocol_assignment = {} - for member_id in group_subscriptions: - protocol_assignment[member_id] = ConsumerProtocolAssignment( + for member in members: + protocol_assignment[member.member_id] = ConsumerProtocolAssignment( cls.version, - sorted(assignment[member_id].items()), + sorted(assignment[member.member_id].items()), b'') return protocol_assignment diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 2fd2af964..d270dcd94 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -576,26 +576,26 @@ def assign(cls, cluster, members): Arguments: cluster (ClusterMetadata): cluster metadata - members (dict of {member_id: MemberMetadata}): decoded metadata for each member in the group. + members ([JoinGroupResponseMember]): decoded metadata for each member in the group. Returns: dict: {member_id: ConsumerProtocolAssignment} """ - members_metadata = {} - for consumer, member_metadata in members.items(): - members_metadata[consumer] = cls.parse_member_metadata(member_metadata) - + members_metadata = { + member.member_id: cls.parse_member_metadata(member.metadata) + for member in members + } executor = StickyAssignmentExecutor(cluster, members_metadata) executor.perform_initial_assignment() executor.balance() cls._latest_partition_movements = executor.partition_movements - assignment = {} - for member_id in members: - assignment[member_id] = ConsumerProtocolAssignment( - cls.version, sorted(executor.get_final_assignment(member_id)), b'' - ) + assignment = { + member.member_id: ConsumerProtocolAssignment( + cls.version, sorted(executor.get_final_assignment(member.member_id)), b'') + for member in members + } return assignment @classmethod diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index be3975d24..45252982d 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -11,7 +11,6 @@ from kafka.protocol.new.consumer.metadata import ( ConsumerProtocolType, ConsumerProtocolSubscription, ConsumerProtocolAssignment, ) -from kafka.coordinator.subscription import Subscription import kafka.errors as Errors from kafka.future import Future from kafka.metrics import AnonMeasurable @@ -330,15 +329,10 @@ def time_to_next_poll(self): def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) - member_subscriptions = {} all_subscribed_topics = set() for member in members: - subscription = Subscription( - ConsumerProtocolSubscription.decode(member.metadata), - member.group_instance_id - ) - member_subscriptions[member.member_id] = subscription - all_subscribed_topics.update(subscription.topics) + member.metadata = ConsumerProtocolSubscription.decode(member.metadata) + all_subscribed_topics.update(member.metadata.topics) # the leader will begin watching for changes to any of the topics # the group is interested in, which ensures that all metadata changes @@ -356,9 +350,9 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): log.debug("Performing assignment for group %s using strategy %s" " with subscriptions %s", self.group_id, assignor.name, - member_subscriptions) + members) - assignments = assignor.assign(self._cluster, member_subscriptions) + assignments = assignor.assign(self._cluster, members) log.debug("Finished assignment for group %s: %s", self.group_id, assignments) diff --git a/test/test_assignors.py b/test/test_assignors.py index 189885a84..808d69ff9 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -10,7 +10,7 @@ from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor -from kafka.coordinator.subscription import Subscription +from kafka.protocol.new.consumer import JoinGroupResponse @pytest.fixture(autouse=True) @@ -30,16 +30,26 @@ def create_cluster(mocker, topics, topics_partitions=None, topic_partitions_lamb return cluster +def make_join_group_response_members(group_subscriptions): + return [ + JoinGroupResponse.JoinGroupResponseMember( + member_id=member_id, + metadata=subscription, + ) + for member_id, subscription in group_subscriptions.items() + ] + + def test_assignor_roundrobin(mocker): assignor = RoundRobinPartitionAssignor - group_subscriptions = { - 'C0': Subscription(assignor.metadata({'t0', 't1'}), None), - 'C1': Subscription(assignor.metadata({'t0', 't1'}), None), - } + members = make_join_group_response_members({ + 'C0': assignor.metadata({'t0', 't1'}), + 'C1': assignor.metadata({'t0', 't1'}), + }) cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2}) - ret = assignor.assign(cluster, group_subscriptions) + ret = assignor.assign(cluster, members) expected = { 'C0': ConsumerProtocolAssignment( assignor.version, [('t0', [0, 2]), ('t1', [1])], b''), @@ -55,13 +65,13 @@ def test_assignor_roundrobin(mocker): def test_assignor_range(mocker): assignor = RangePartitionAssignor - group_subscriptions = { - 'C0': Subscription(assignor.metadata({'t0', 't1'}), None), - 'C1': Subscription(assignor.metadata({'t0', 't1'}), None), - } + members = make_join_group_response_members({ + 'C0': assignor.metadata({'t0', 't1'}), + 'C1': assignor.metadata({'t0', 't1'}), + }) cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2}) - ret = assignor.assign(cluster, group_subscriptions) + ret = assignor.assign(cluster, members) expected = { 'C0': ConsumerProtocolAssignment( assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''), @@ -74,6 +84,16 @@ def test_assignor_range(mocker): assert ret[member].encode() == expected[member].encode() +def make_member_metadata(subscriptions): + return [ + JoinGroupResponse.JoinGroupResponseMember( + member_id=member_id, + metadata=StickyPartitionAssignor._metadata(topics, []), + ) + for member_id, topics in subscriptions.items() + ] + + def test_sticky_assignor1(mocker): """ Given: there are three consumers C0, C1, C2, @@ -112,7 +132,7 @@ def test_sticky_assignor1(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { 'C0': ConsumerProtocolAssignment( StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t2', [0]), ('t3', [0])], b'' @@ -155,7 +175,7 @@ def test_sticky_assignor2(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { 'C0': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [0])], b''), 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 1])], b''), @@ -168,7 +188,7 @@ def test_sticky_assignor2(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''), 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), @@ -329,7 +349,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): topics, assignment[member].partitions() if member in assignment else [] ) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) subscriptions = { @@ -339,7 +359,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) assert len(assignment['C2'].assigned_partitions[0][1]) == 3 @@ -368,7 +388,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''), 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''), @@ -383,7 +403,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [1])], b''), 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 2])], b''), @@ -414,7 +434,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -437,7 +457,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker): member_metadata[member] = StickyPartitionAssignor._metadata( topics, assignment[member].partitions() if member in assignment else [] ) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -462,7 +482,7 @@ def test_sticky_same_subscriptions(mocker): member_metadata = {} for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -495,7 +515,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): del subscriptions[member] del member_metadata[member] - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -518,7 +538,7 @@ def test_new_subscription(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -541,7 +561,7 @@ def test_move_existing_assignments(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, member_assignments[member]) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) @@ -571,7 +591,7 @@ def test_stickiness(mocker): for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -627,7 +647,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) cluster = create_cluster(mocker, topics={}, topics_partitions={}) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [], b''), } @@ -646,7 +666,7 @@ def test_conflicting_previous_assignments(mocker): # assume both C1 and C2 have partition 1 assigned to them in generation 1 member_metadata[member] = StickyPartitionAssignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) @@ -677,7 +697,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -691,7 +711,7 @@ def test_assignment_with_multiple_generations1(mocker): 'C3': StickyPartitionAssignor._metadata({'t'}, []), } - assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment1 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) assert len(assignment1['C1'].assigned_partitions[0][1]) == 2 assert len(assignment1['C2'].assigned_partitions[0][1]) == 2 @@ -702,7 +722,7 @@ def test_assignment_with_multiple_generations1(mocker): 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions()), } - assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment2 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2) assert len(assignment2['C1'].assigned_partitions[0][1]) == 3 assert len(assignment2['C2'].assigned_partitions[0][1]) == 3 @@ -715,7 +735,7 @@ def test_assignment_with_multiple_generations1(mocker): 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1), } - assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment3 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3) assert len(assignment3['C2'].assigned_partitions[0][1]) == 3 assert len(assignment3['C3'].assigned_partitions[0][1]) == 3 @@ -731,7 +751,7 @@ def test_assignment_with_multiple_generations2(mocker): 'C3': StickyPartitionAssignor._metadata({'t'}, []), } - assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment1 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) assert len(assignment1['C1'].assigned_partitions[0][1]) == 2 assert len(assignment1['C2'].assigned_partitions[0][1]) == 2 @@ -741,7 +761,7 @@ def test_assignment_with_multiple_generations2(mocker): 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1), } - assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment2 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C2': {'t'}}, assignment2) assert len(assignment2['C2'].assigned_partitions[0][1]) == 6 assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]]) @@ -753,7 +773,7 @@ def test_assignment_with_multiple_generations2(mocker): 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1), } - assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment3 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() assert set(assignment3['C1'].assigned_partitions[0][1]) == set(assignment1['C1'].assigned_partitions[0][1]) @@ -779,18 +799,11 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb for member in member_assignments: member_metadata[member] = StickyPartitionAssignor._metadata({'t'}, member_assignments[member], member_generations[member]) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() -def make_member_metadata(subscriptions): - member_metadata = {} - for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) - return member_metadata - - def assert_assignment(result_assignment, expected_assignment): assert result_assignment == expected_assignment assert set(result_assignment) == set(expected_assignment) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 54d300c34..b325d1f3b 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -13,7 +13,6 @@ from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.coordinator.base import Generation, MemberState, HeartbeatThread from kafka.coordinator.consumer import ConsumerCoordinator -from kafka.coordinator.subscription import Subscription import kafka.errors as Errors from kafka.future import Future from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS @@ -192,10 +191,16 @@ def test_subscription_listener_failure(mocker, coordinator): def test_perform_assignment(mocker, coordinator): coordinator._subscription.subscribe(topics=['foo1']) - group_subscriptions = { - 'member-foo': Subscription(ConsumerProtocolSubscription(0, ['foo1'], b''), None), - 'member-bar': Subscription(ConsumerProtocolSubscription(0, ['foo1'], b''), None), - } + members = [ + JoinGroupResponse.JoinGroupResponseMember( + member_id='member-foo', + metadata=ConsumerProtocolSubscription(0, ['foo1'], b'').encode(), + ), + JoinGroupResponse.JoinGroupResponseMember( + member_id='member-bar', + metadata=ConsumerProtocolSubscription(0, ['foo1'], b'').encode(), + ), + ] assignments = { 'member-foo': ConsumerProtocolAssignment( 0, [('foo1', [0])], b''), @@ -207,15 +212,12 @@ def test_perform_assignment(mocker, coordinator): RoundRobinPartitionAssignor.assign.return_value = assignments ret = coordinator._perform_assignment( - 'member-foo', 'roundrobin', - [JoinGroupResponse.JoinGroupResponseMember( - member_id=member_id, - metadata=subscription.encode(), - ) for member_id, subscription in group_subscriptions.items()]) + 'member-foo', 'roundrobin', members, + ) assert RoundRobinPartitionAssignor.assign.call_count == 1 RoundRobinPartitionAssignor.assign.assert_called_with( - coordinator._client.cluster, group_subscriptions) + coordinator._client.cluster, members) assert ret == assignments From ec0dcd265c539f9aa742322f53f86c3a2bdef809 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Mar 2026 08:52:11 -0700 Subject: [PATCH 2/2] docstring update --- kafka/coordinator/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index f87e31618..d8ae4bc5d 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -216,9 +216,9 @@ def _perform_assignment(self, leader_id, protocol, members): leader_id (str): The id of the leader (which is this member) protocol (str): the chosen group protocol (assignment strategy) members (list): [JoinGroupResponseMember] from JoinGroupResponse. - metadata_bytes are associated with the chosen group protocol, + metadata is associated with the chosen group protocol, and the Coordinator subclass is responsible for decoding - metadata_bytes based on that protocol. + metadata based on that protocol. Returns: dict: {member_id: assignment}; assignment must either be bytes