diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 21acc4a32..4d1e8e125 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -68,7 +68,7 @@ jobs: python -m pip install --upgrade pip pip install -r requirements-dev.txt - name: Pylint - run: pylint --recursive=y --errors-only --exit-zero kafka test + run: pylint --recursive=y --errors-only kafka test - name: Setup java uses: actions/setup-java@v5 with: diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 6bdac5613..6b57091d5 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -290,7 +290,7 @@ def _find_coordinator_id_request(self, group_id): request = FindCoordinatorRequest[version](group_id) elif version <= 2: request = FindCoordinatorRequest[version](group_id, 0) - return request + return request # pylint: disable=E0606 def _find_coordinator_id_process_response(self, response): """Process a FindCoordinatorResponse. @@ -506,7 +506,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False): ) # TODO convert structs to a more pythonic interface # TODO raise exceptions if errors - return self._send_request_to_controller(request) + return self._send_request_to_controller(request) # pylint: disable=E0606 def delete_topics(self, topics, timeout_ms=None): """Delete topics from the cluster. @@ -680,7 +680,7 @@ def describe_acls(self, acl_filter): permission_type=acl_filter.permission_type ) - response = self.send_request(request) + response = self.send_request(request) # pylint: disable=E0606 error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: # optionally we could retry if error_type.retriable @@ -793,7 +793,7 @@ def create_acls(self, acls): request = CreateAclsRequest[version]( creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls] ) - response = self.send_request(request) + response = self.send_request(request) # pylint: disable=E0606 return self._convert_create_acls_response_to_acls(acls, response) @staticmethod @@ -907,7 +907,7 @@ def delete_acls(self, acl_filters): request = DeleteAclsRequest[version]( filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters] ) - response = self.send_request(request) + response = self.send_request(request) # pylint: disable=E0606 return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) @staticmethod @@ -1269,14 +1269,15 @@ def _describe_consumer_groups_process_response(self, response): # TODO: Fix GroupInformation defaults described_group_information_list.append([]) group_description = GroupInformation._make(described_group_information_list) - error_code = group_description.error_code - error_type = Errors.for_code(error_code) - # Java has the note: KAFKA-6789, we can retry based on the error code - if error_type is not Errors.NoError: - raise error_type( - "DescribeGroupsResponse failed with response '{}'." - .format(response)) - return group_description + error_code = group_description.error_code + error_type = Errors.for_code(error_code) + # Java has the note: KAFKA-6789, we can retry based on the error code + if error_type is not Errors.NoError: + raise error_type( + "DescribeGroupsResponse failed with response '{}'." + .format(response)) + return group_description + assert False, "DescribeGroupsResponse parsing failed" def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): """Describe a set of consumer groups. diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index a3b9cd5d8..35fcba9cb 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -245,7 +245,7 @@ def _maybe_uncompress(self): uncompressed = lz4_decode(data.tobytes()) if compression_type == self.CODEC_ZSTD: uncompressed = zstd_decode(data.tobytes()) - self._buffer = bytearray(uncompressed) + self._buffer = bytearray(uncompressed) # pylint: disable=E0606 self._pos = 0 self._decompressed = True @@ -658,7 +658,7 @@ def _maybe_compress(self): compressed = lz4_encode(data) elif self._compression_type == self.CODEC_ZSTD: compressed = zstd_encode(data) - compressed_size = len(compressed) + compressed_size = len(compressed) # pylint: disable=E0606 if len(data) <= compressed_size: # We did not get any benefit from compression, lets send # uncompressed diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index f085978f0..4c9bf03dd 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -122,6 +122,9 @@ def _assert_has_codec(self, compression_type): checker, name = codecs.has_snappy, "snappy" elif compression_type == self.CODEC_LZ4: checker, name = codecs.has_lz4, "lz4" + else: + raise UnsupportedCodecError( + "Unrecognized compression type") if not checker(): raise UnsupportedCodecError( "Libraries for {} compression codec not found".format(name)) @@ -206,7 +209,7 @@ def _decompress(self, key_offset): uncompressed = lz4_decode_old_kafka(data.tobytes()) else: uncompressed = lz4_decode(data.tobytes()) - return uncompressed + return uncompressed # pylint: disable=E0606 def _read_header(self, pos): if self._magic == 0: @@ -483,7 +486,7 @@ def _maybe_compress(self): else: compressed = lz4_encode(data) size = self.size_in_bytes( - 0, timestamp=0, key=None, value=compressed) + 0, timestamp=0, key=None, value=compressed) # pylint: disable=E0606 # We will try to reuse the same buffer if we have enough space if size > len(self._buffer): self._buffer = bytearray(size) diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index 0b660e624..1723a3bd5 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -299,7 +299,7 @@ def __init__(self, host, port, broker_id, zookeeper=None, zk_chroot=None, if self.external: self.child = ExternalService(self.host, self.port) - (self._client,) = self.get_clients(1, client_id='_internal_client') + self._client = next(self.get_clients(1, client_id='_internal_client')) self.running = True else: self._client = None @@ -447,7 +447,7 @@ def start(self): else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') - (self._client,) = self.get_clients(1, client_id='_internal_client') + self._client = next(self.get_clients(1, client_id='_internal_client')) self.out("Done!") self.running = True