Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
- name: Pylint
run: pylint --recursive=y --errors-only --exit-zero kafka test
run: pylint --recursive=y --errors-only kafka test
- name: Setup java
uses: actions/setup-java@v5
with:
Expand Down
27 changes: 14 additions & 13 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def _find_coordinator_id_request(self, group_id):
request = FindCoordinatorRequest[version](group_id)
elif version <= 2:
request = FindCoordinatorRequest[version](group_id, 0)
return request
return request # pylint: disable=E0606

def _find_coordinator_id_process_response(self, response):
"""Process a FindCoordinatorResponse.
Expand Down Expand Up @@ -506,7 +506,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
)
# TODO convert structs to a more pythonic interface
# TODO raise exceptions if errors
return self._send_request_to_controller(request)
return self._send_request_to_controller(request) # pylint: disable=E0606

def delete_topics(self, topics, timeout_ms=None):
"""Delete topics from the cluster.
Expand Down Expand Up @@ -680,7 +680,7 @@ def describe_acls(self, acl_filter):
permission_type=acl_filter.permission_type

)
response = self.send_request(request)
response = self.send_request(request) # pylint: disable=E0606
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
# optionally we could retry if error_type.retriable
Expand Down Expand Up @@ -793,7 +793,7 @@ def create_acls(self, acls):
request = CreateAclsRequest[version](
creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls]
)
response = self.send_request(request)
response = self.send_request(request) # pylint: disable=E0606
return self._convert_create_acls_response_to_acls(acls, response)

@staticmethod
Expand Down Expand Up @@ -907,7 +907,7 @@ def delete_acls(self, acl_filters):
request = DeleteAclsRequest[version](
filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters]
)
response = self.send_request(request)
response = self.send_request(request) # pylint: disable=E0606
return self._convert_delete_acls_response_to_matching_acls(acl_filters, response)

@staticmethod
Expand Down Expand Up @@ -1269,14 +1269,15 @@ def _describe_consumer_groups_process_response(self, response):
# TODO: Fix GroupInformation defaults
described_group_information_list.append([])
group_description = GroupInformation._make(described_group_information_list)
error_code = group_description.error_code
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"DescribeGroupsResponse failed with response '{}'."
.format(response))
return group_description
error_code = group_description.error_code
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"DescribeGroupsResponse failed with response '{}'."
.format(response))
return group_description
assert False, "DescribeGroupsResponse parsing failed"

def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
"""Describe a set of consumer groups.
Expand Down
4 changes: 2 additions & 2 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def _maybe_uncompress(self):
uncompressed = lz4_decode(data.tobytes())
if compression_type == self.CODEC_ZSTD:
uncompressed = zstd_decode(data.tobytes())
self._buffer = bytearray(uncompressed)
self._buffer = bytearray(uncompressed) # pylint: disable=E0606
self._pos = 0
self._decompressed = True

Expand Down Expand Up @@ -658,7 +658,7 @@ def _maybe_compress(self):
compressed = lz4_encode(data)
elif self._compression_type == self.CODEC_ZSTD:
compressed = zstd_encode(data)
compressed_size = len(compressed)
compressed_size = len(compressed) # pylint: disable=E0606
if len(data) <= compressed_size:
# We did not get any benefit from compression, lets send
# uncompressed
Expand Down
7 changes: 5 additions & 2 deletions kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def _assert_has_codec(self, compression_type):
checker, name = codecs.has_snappy, "snappy"
elif compression_type == self.CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
else:
raise UnsupportedCodecError(
"Unrecognized compression type")
if not checker():
raise UnsupportedCodecError(
"Libraries for {} compression codec not found".format(name))
Expand Down Expand Up @@ -206,7 +209,7 @@ def _decompress(self, key_offset):
uncompressed = lz4_decode_old_kafka(data.tobytes())
else:
uncompressed = lz4_decode(data.tobytes())
return uncompressed
return uncompressed # pylint: disable=E0606

def _read_header(self, pos):
if self._magic == 0:
Expand Down Expand Up @@ -483,7 +486,7 @@ def _maybe_compress(self):
else:
compressed = lz4_encode(data)
size = self.size_in_bytes(
0, timestamp=0, key=None, value=compressed)
0, timestamp=0, key=None, value=compressed) # pylint: disable=E0606
# We will try to reuse the same buffer if we have enough space
if size > len(self._buffer):
self._buffer = bytearray(size)
Expand Down
4 changes: 2 additions & 2 deletions test/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def __init__(self, host, port, broker_id, zookeeper=None, zk_chroot=None,

if self.external:
self.child = ExternalService(self.host, self.port)
(self._client,) = self.get_clients(1, client_id='_internal_client')
self._client = next(self.get_clients(1, client_id='_internal_client'))
self.running = True
else:
self._client = None
Expand Down Expand Up @@ -447,7 +447,7 @@ def start(self):
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')

(self._client,) = self.get_clients(1, client_id='_internal_client')
self._client = next(self.get_clients(1, client_id='_internal_client'))

self.out("Done!")
self.running = True
Expand Down
Loading