diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 5001afd4..7550ebbb 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -15,7 +15,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 with: - python-version: "3.8" + python-version: "3.9" - name: Prepare C files to include run: | python -m pip install --upgrade pip build @@ -52,7 +52,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 with: - python-version: "3.8" + python-version: "3.9" - name: Set up QEMU if: ${{ matrix.arch == 'aarch64' }} uses: docker/setup-qemu-action@v1 @@ -82,10 +82,8 @@ jobs: strategy: matrix: - python: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python: ["3.9", "3.10", "3.11", "3.12", "3.13"] include: - - python: "3.8" - aiokafka_whl: dist/aiokafka-*-cp38-cp38-win_amd64.whl - python: "3.9" aiokafka_whl: dist/aiokafka-*-cp39-cp39-win_amd64.whl - python: "3.10" @@ -94,6 +92,8 @@ jobs: aiokafka_whl: dist/aiokafka-*-cp311-cp311-win_amd64.whl - python: "3.12" aiokafka_whl: dist/aiokafka-*-cp312-cp312-win_amd64.whl + - python: "3.13" + aiokafka_whl: dist/aiokafka-*-cp313-cp313-win_amd64.whl steps: - uses: actions/checkout@v2 @@ -127,10 +127,8 @@ jobs: strategy: matrix: - python: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python: ["3.9", "3.10", "3.11", "3.12", "3.13"] include: - - python: "3.8" - aiokafka_whl: dist/aiokafka-*-cp38-cp38-macosx_*_x86_64.whl - python: "3.9" aiokafka_whl: dist/aiokafka-*-cp39-cp39-macosx_*_x86_64.whl - python: "3.10" @@ -139,6 +137,8 @@ jobs: aiokafka_whl: dist/aiokafka-*-cp311-cp311-macosx_*_x86_64.whl - python: "3.12" aiokafka_whl: dist/aiokafka-*-cp312-cp312-macosx_*_x86_64.whl + - python: "3.13" + aiokafka_whl: dist/aiokafka-*-cp313-cp313-macosx_*_x86_64.whl steps: - uses: actions/checkout@v2 @@ -170,10 +170,8 @@ jobs: strategy: matrix: - python: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python: ["3.9", "3.10", "3.11", "3.12", "3.13"] include: - - python: "3.8" - aiokafka_whl: dist/aiokafka-*-cp38-cp38-macosx_*_arm64.whl - python: "3.9" aiokafka_whl: dist/aiokafka-*-cp39-cp39-macosx_*_arm64.whl - python: "3.10" @@ -182,6 +180,8 @@ jobs: aiokafka_whl: dist/aiokafka-*-cp311-cp311-macosx_*_arm64.whl - python: "3.12" aiokafka_whl: dist/aiokafka-*-cp312-cp312-macosx_*_arm64.whl + - python: "3.13" + aiokafka_whl: dist/aiokafka-*-cp313-cp313-macosx_*_arm64.whl steps: - uses: actions/checkout@v2 @@ -213,10 +213,8 @@ jobs: strategy: matrix: - python: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python: ["3.9", "3.10", "3.11", "3.12", "3.13"] include: - - python: "3.8" - aiokafka_whl: dist/aiokafka-*-cp38-cp38-manylinux*_x86_64.whl - python: "3.9" aiokafka_whl: dist/aiokafka-*-cp39-cp39-manylinux*_x86_64.whl - python: "3.10" @@ -225,6 +223,8 @@ jobs: aiokafka_whl: dist/aiokafka-*-cp311-cp311-manylinux*_x86_64.whl - python: "3.12" aiokafka_whl: dist/aiokafka-*-cp312-cp312-manylinux*_x86_64.whl + - python: "3.13" + aiokafka_whl: dist/aiokafka-*-cp313-cp313-manylinux*_x86_64.whl steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ed36a018..589c79bd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -73,7 +73,7 @@ jobs: strategy: matrix: - python: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python: ["3.9", "3.10", "3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v2 @@ -141,7 +141,7 @@ jobs: strategy: matrix: - python: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python: ["3.9", "3.10", "3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v2 @@ -210,14 +210,11 @@ jobs: strategy: matrix: include: - - python: "3.12" + - python: "3.13" kafka: "2.8.1" scala: "2.13" # Older python versions against latest broker - - python: "3.8" - kafka: "2.8.1" - scala: "2.13" - python: "3.9" kafka: "2.8.1" scala: "2.13" @@ -227,39 +224,42 @@ jobs: - python: "3.11" kafka: "2.8.1" scala: "2.13" + - python: "3.12" + kafka: "2.8.1" + scala: "2.13" # Older brokers against latest python version - - python: "3.12" + - python: "3.13" kafka: "0.9.0.1" scala: "2.11" - - python: "3.12" + - python: "3.13" kafka: "0.10.2.1" scala: "2.11" - - python: "3.12" + - python: "3.13" kafka: "0.11.0.3" scala: "2.12" - - python: "3.12" + - python: "3.13" kafka: "1.1.1" scala: "2.12" - - python: "3.12" + - python: "3.13" kafka: "2.1.1" scala: "2.12" - - python: "3.12" + - python: "3.13" kafka: "2.2.2" scala: "2.12" - - python: "3.12" + - python: "3.13" kafka: "2.3.1" scala: "2.12" - - python: "3.12" + - python: "3.13" kafka: "2.4.1" scala: "2.12" - - python: "3.12" + - python: "3.13" kafka: "2.5.1" scala: "2.12" - - python: "3.12" + - python: "3.13" kafka: "2.6.3" scala: "2.12" - - python: "3.12" + - python: "3.13" kafka: "2.7.2" scala: "2.13" fail-fast: false diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index eb535306..b7bc2bed 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -1,8 +1,9 @@ import asyncio import logging from collections import defaultdict +from collections.abc import Sequence from ssl import SSLContext -from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union +from typing import Any, Optional, Union import async_timeout @@ -88,7 +89,7 @@ def __init__( self, *, loop=None, - bootstrap_servers: Union[str, List[str]] = "localhost", + bootstrap_servers: Union[str, list[str]] = "localhost", client_id: str = "aiokafka-" + __version__, request_timeout_ms: int = 40000, connections_max_idle_ms: int = 540000, @@ -159,7 +160,7 @@ async def start(self): log.debug("AIOKafkaAdminClient started") self._started = True - def _matching_api_version(self, operation: Sequence[Type[Request]]) -> int: + def _matching_api_version(self, operation: Sequence[type[Request]]) -> int: """Find the latest version of the protocol operation supported by both this library and the broker. @@ -225,7 +226,7 @@ def _convert_new_topic_request(new_topic): async def create_topics( self, - new_topics: List[NewTopic], + new_topics: list[NewTopic], timeout_ms: Optional[int] = None, validate_only: bool = False, ) -> Response: @@ -267,7 +268,7 @@ async def create_topics( async def delete_topics( self, - topics: List[str], + topics: list[str], timeout_ms: Optional[int] = None, ) -> Response: """Delete topics from the cluster. @@ -284,7 +285,7 @@ async def delete_topics( async def _get_cluster_metadata( self, - topics: Optional[List[str]] = None, + topics: Optional[list[str]] = None, ) -> Response: """ Retrieve cluster metadata @@ -295,20 +296,20 @@ async def _get_cluster_metadata( request = req_cls(topics=topics) return await self._send_request(request) - async def list_topics(self) -> List[str]: + async def list_topics(self) -> list[str]: metadata = await self._get_cluster_metadata(topics=None) obj = metadata.to_object() return [t["topic"] for t in obj["topics"]] async def describe_topics( self, - topics: Optional[List[str]] = None, - ) -> List[Any]: + topics: Optional[list[str]] = None, + ) -> list[Any]: metadata = await self._get_cluster_metadata(topics=topics) obj = metadata.to_object() return obj["topics"] - async def describe_cluster(self) -> Dict[str, Any]: + async def describe_cluster(self) -> dict[str, Any]: metadata = await self._get_cluster_metadata() obj = metadata.to_object() obj.pop("topics") # We have 'describe_topics' for this @@ -316,9 +317,9 @@ async def describe_cluster(self) -> Dict[str, Any]: async def describe_configs( self, - config_resources: List[ConfigResource], + config_resources: list[ConfigResource], include_synonyms: bool = False, - ) -> List[Response]: + ) -> list[Response]: """Fetch configuration parameters for one or more Kafka resources. :param config_resources: An list of ConfigResource objects. @@ -360,8 +361,8 @@ async def describe_configs( return await asyncio.gather(*futures) async def alter_configs( - self, config_resources: List[ConfigResource] - ) -> List[Response]: + self, config_resources: list[ConfigResource] + ) -> list[Response]: """Alter configuration parameters of one or more Kafka resources. :param config_resources: A list of ConfigResource objects. :return: Appropriate version of AlterConfigsResponse class. @@ -398,9 +399,9 @@ def _convert_alter_config_resource_request(config_resource): @classmethod def _convert_config_resources( cls, - config_resources: List[ConfigResource], + config_resources: list[ConfigResource], op_type: str = "describe", - ) -> Tuple[Dict[int, Any], List[Any]]: + ) -> tuple[dict[int, Any], list[Any]]: broker_resources = defaultdict(list) topic_resources = [] if op_type == "describe": @@ -416,7 +417,7 @@ def _convert_config_resources( return broker_resources, topic_resources @staticmethod - def _convert_topic_partitions(topic_partitions: Dict[str, NewPartitions]): + def _convert_topic_partitions(topic_partitions: dict[str, NewPartitions]): return [ (topic_name, (new_part.total_count, new_part.new_assignments)) for topic_name, new_part in topic_partitions.items() @@ -424,7 +425,7 @@ def _convert_topic_partitions(topic_partitions: Dict[str, NewPartitions]): async def create_partitions( self, - topic_partitions: Dict[str, NewPartitions], + topic_partitions: dict[str, NewPartitions], timeout_ms: Optional[int] = None, validate_only: bool = False, ) -> Response: @@ -455,10 +456,10 @@ async def create_partitions( async def describe_consumer_groups( self, - group_ids: List[str], + group_ids: list[str], group_coordinator_id: Optional[int] = None, include_authorized_operations: bool = False, - ) -> List[Response]: + ) -> list[Response]: """Describe a set of consumer groups. Any errors are immediately raised. @@ -508,8 +509,8 @@ async def describe_consumer_groups( async def list_consumer_groups( self, - broker_ids: Optional[List[int]] = None, - ) -> List[Tuple[Any, ...]]: + broker_ids: Optional[list[int]] = None, + ) -> list[tuple[Any, ...]]: """List all consumer groups known to the cluster. This returns a list of Consumer Group tuples. The tuples are @@ -578,8 +579,8 @@ async def list_consumer_group_offsets( self, group_id: str, group_coordinator_id: Optional[int] = None, - partitions: Optional[List[TopicPartition]] = None, - ) -> Dict[TopicPartition, OffsetAndMetadata]: + partitions: Optional[list[TopicPartition]] = None, + ) -> dict[TopicPartition, OffsetAndMetadata]: """Fetch Consumer Offsets for a single consumer group. Note: @@ -636,9 +637,9 @@ async def list_consumer_group_offsets( async def delete_records( self, - records_to_delete: Dict[TopicPartition, RecordsToDelete], + records_to_delete: dict[TopicPartition, RecordsToDelete], timeout_ms: Optional[int] = None, - ) -> Dict[TopicPartition, int]: + ) -> dict[TopicPartition, int]: """Delete records from partitions. :param records_to_delete: A map of RecordsToDelete for each TopicPartition @@ -681,7 +682,7 @@ async def delete_records( @staticmethod def _convert_records_to_delete( - records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]], + records_to_delete: dict[str, list[tuple[int, RecordsToDelete]]], ): return [ (topic, [(partition, rec.before_offset) for partition, rec in records]) diff --git a/aiokafka/client.py b/aiokafka/client.py index 2ec75ec9..91a3e0ae 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -174,7 +174,7 @@ def _get_conn_lock(self): return self._get_conn_lock_value def __repr__(self): - return "" % self._client_id + return f"" @property def api_version(self): @@ -483,9 +483,7 @@ async def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT, no_hint=Fal async def ready(self, node_id, *, group=ConnectionGroup.DEFAULT): conn = await self._get_conn(node_id, group=group) - if conn is None: - return False - return True + return conn is not None async def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT): """Send a request to a specific node. diff --git a/aiokafka/cluster.py b/aiokafka/cluster.py index ccee5636..85496ea9 100644 --- a/aiokafka/cluster.py +++ b/aiokafka/cluster.py @@ -4,7 +4,7 @@ import threading import time from concurrent.futures import Future -from typing import Optional, Set +from typing import Optional from aiokafka import errors as Errors from aiokafka.conn import collect_hosts @@ -74,7 +74,7 @@ def _generate_bootstrap_brokers(self): brokers = {} for i, (host, port, _) in enumerate(bootstrap_hosts): - node_id = "bootstrap-%s" % i + node_id = f"bootstrap-{i}" brokers[node_id] = BrokerMetadata(node_id, host, port, None) return brokers @@ -104,7 +104,7 @@ def broker_metadata(self, broker_id): or self._coordinator_brokers.get(broker_id) ) - def partitions_for_topic(self, topic: str) -> Optional[Set[int]]: + def partitions_for_topic(self, topic: str) -> Optional[set[int]]: """Return set of all partitions for topic (whether available or not) Arguments: diff --git a/aiokafka/conn.py b/aiokafka/conn.py index d3e5ac98..859f8f24 100644 --- a/aiokafka/conn.py +++ b/aiokafka/conn.py @@ -410,7 +410,7 @@ def _on_read_task_error(cls, self_ref, read_task): try: read_task.result() - except Exception as exc: # noqa: BLE001 + except Exception as exc: if not isinstance(exc, (OSError, EOFError, ConnectionError)): log.exception("Unexpected exception in AIOKafkaConnection") diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 0a97bc59..cc509f50 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -4,7 +4,6 @@ import sys import traceback import warnings -from typing import Dict, List from aiokafka import __version__ from aiokafka.abc import ConsumerRebalanceListener @@ -1162,7 +1161,7 @@ async def getone(self, *partitions) -> ConsumerRecord: async def getmany( self, *partitions, timeout_ms=0, max_records=None - ) -> Dict[TopicPartition, List[ConsumerRecord]]: + ) -> dict[TopicPartition, list[ConsumerRecord]]: """Get messages from assigned topics / partitions. Prefetched messages are returned in batches by topic-partition. diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index df59c925..b7a419a2 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -1093,12 +1093,12 @@ async def next_record(self, partitions): for tp in list(self._records.keys()): if partitions and tp not in partitions: - # Cleanup results for unassigned partitons + # Cleanup results for unassigned partitions if not self._subscriptions.is_assigned(tp): del self._records[tp] continue res_or_error = self._records[tp] - if type(res_or_error) == FetchResult: + if type(res_or_error) is FetchResult: message = res_or_error.getone() if message is None: # We already processed all messages, request new ones @@ -1129,12 +1129,12 @@ async def fetched_records(self, partitions, timeout=0, max_records=None): drained = {} for tp in list(self._records.keys()): if partitions and tp not in partitions: - # Cleanup results for unassigned partitons + # Cleanup results for unassigned partitions if not self._subscriptions.is_assigned(tp): del self._records[tp] continue res_or_error = self._records[tp] - if type(res_or_error) == FetchResult: + if type(res_or_error) is FetchResult: records = res_or_error.getall(max_records) if not res_or_error.has_more(): # We processed all messages - request new ones diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 36d79c10..06555675 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -434,7 +434,7 @@ async def _perform_assignment(self, response: Response): assignment_strategy = response.group_protocol members = response.members assignor = self._lookup_assignor(assignment_strategy) - assert assignor, "Invalid assignment protocol: %s" % assignment_strategy + assert assignor, f"Invalid assignment protocol: {assignment_strategy}" member_metadata = {} all_subscribed_topics = set() for member in members: @@ -477,16 +477,13 @@ async def _perform_assignment(self, response: Response): # so we force a snapshot update. self._metadata_snapshot = self._get_metadata_snapshot() - group_assignment = {} - for member_id, assignment in assignments.items(): - group_assignment[member_id] = assignment - return group_assignment + return assignments async def _on_join_complete( self, generation, member_id, protocol, member_assignment_bytes ): assignor = self._lookup_assignor(protocol) - assert assignor, "invalid assignment protocol: %s" % protocol + assert assignor, f"invalid assignment protocol: {protocol}" assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -1210,15 +1207,15 @@ async def fetch_committed_offsets(self, partitions): async def _do_fetch_commit_offsets(self, partitions): log.debug("Fetching committed offsets for partitions: %s", partitions) # construct the request - topic_partitions = collections.defaultdict(list) + partitions_by_topic = collections.defaultdict(list) for tp in partitions: - topic_partitions[tp.topic].append(tp.partition) + partitions_by_topic[tp.topic].append(tp.partition) - request = OffsetFetchRequest(self.group_id, list(topic_partitions.items())) + request = OffsetFetchRequest(self.group_id, list(partitions_by_topic.items())) response = await self._send_req(request) offsets = {} - for topic, partitions in response.topics: - for partition, offset, metadata, error_code in partitions: + for topic, topic_partitions in response.topics: + for partition, offset, metadata, error_code in topic_partitions: tp = TopicPartition(topic, partition) error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: @@ -1474,7 +1471,7 @@ async def _on_join_leader(self, response): """ try: group_assignment = await self._coordinator._perform_assignment(response) - except Exception as e: # noqa: BLE001 + except Exception as e: raise Errors.KafkaError(repr(e)) from e assignment_req = [] diff --git a/aiokafka/consumer/subscription_state.py b/aiokafka/consumer/subscription_state.py index 6ad50691..7cba7046 100644 --- a/aiokafka/consumer/subscription_state.py +++ b/aiokafka/consumer/subscription_state.py @@ -5,8 +5,9 @@ import logging import time from asyncio import Event, shield +from collections.abc import Iterable from enum import Enum -from typing import Dict, Iterable, Pattern, Set +from re import Pattern from aiokafka.abc import ConsumerRebalanceListener from aiokafka.errors import IllegalStateError @@ -70,7 +71,7 @@ def topics(self): return self._subscription.topics return set() - def assigned_partitions(self) -> Set[TopicPartition]: + def assigned_partitions(self) -> set[TopicPartition]: if self._subscription is None: return set() if self._subscription.assignment is None: @@ -135,7 +136,7 @@ def _notify_assignment_waiters(self): # Consumer callable API: - def subscribe(self, topics: Set[str], listener=None): + def subscribe(self, topics: set[str], listener=None): """Subscribe to a list (or tuple) of topics Caller: Consumer. @@ -194,7 +195,7 @@ def unsubscribe(self): # Coordinator callable API: - def subscribe_from_pattern(self, topics: Set[str]): + def subscribe_from_pattern(self, topics: set[str]): """Change subscription on cluster metadata update if a new topic created or one is removed. @@ -204,7 +205,7 @@ def subscribe_from_pattern(self, topics: Set[str]): assert self._subscription_type == SubscriptionType.AUTO_PATTERN self._change_subscription(Subscription(topics)) - def assign_from_subscribed(self, assignment: Set[TopicPartition]): + def assign_from_subscribed(self, assignment: set[TopicPartition]): """Set assignment if automatic assignment is used. Caller: Coordinator @@ -275,7 +276,7 @@ def abort_waiters(self, exc): def pause(self, tp: TopicPartition) -> None: self._assigned_state(tp).pause() - def paused_partitions(self) -> Set[TopicPartition]: + def paused_partitions(self) -> set[TopicPartition]: res = set() for tp in self.assigned_partitions(): if self._assigned_state(tp).paused: @@ -365,7 +366,7 @@ def __init__(self, user_assignment: Iterable[TopicPartition], loop=None): super().__init__(topics, loop=loop) self._assignment = Assignment(user_assignment) - def _assign(self, topic_partitions: Set[TopicPartition]): # pragma: no cover + def _assign(self, topic_partitions: set[TopicPartition]): # pragma: no cover raise AssertionError("Should not be called") @property @@ -415,7 +416,7 @@ def _unassign(self): def state_value(self, tp: TopicPartition) -> TopicPartitionState: return self._tp_state.get(tp) - def all_consumed_offsets(self) -> Dict[TopicPartition, OffsetAndMetadata]: + def all_consumed_offsets(self) -> dict[TopicPartition, OffsetAndMetadata]: """Returns consumed offsets as {TopicPartition: OffsetAndMetadata}""" all_consumed = {} for tp in self._topic_partitions: diff --git a/aiokafka/coordinator/assignors/abstract.py b/aiokafka/coordinator/assignors/abstract.py index 0946afb9..3032fede 100644 --- a/aiokafka/coordinator/assignors/abstract.py +++ b/aiokafka/coordinator/assignors/abstract.py @@ -1,6 +1,6 @@ import abc import logging -from typing import Dict, Iterable, Mapping +from collections.abc import Iterable, Mapping from aiokafka.cluster import ClusterMetadata from aiokafka.coordinator.protocol import ( @@ -27,7 +27,7 @@ def assign( cls, cluster: ClusterMetadata, members: Mapping[str, ConsumerProtocolMemberMetadata], - ) -> Dict[str, ConsumerProtocolMemberAssignment]: + ) -> dict[str, ConsumerProtocolMemberAssignment]: """Perform group assignment given cluster metadata and member subscriptions Arguments: diff --git a/aiokafka/coordinator/assignors/range.py b/aiokafka/coordinator/assignors/range.py index 7d398f9e..a51429d8 100644 --- a/aiokafka/coordinator/assignors/range.py +++ b/aiokafka/coordinator/assignors/range.py @@ -1,6 +1,6 @@ import collections import logging -from typing import Dict, Iterable, List, Mapping +from collections.abc import Iterable, Mapping from aiokafka.cluster import ClusterMetadata from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor @@ -38,14 +38,14 @@ def assign( cls, cluster: ClusterMetadata, members: Mapping[str, ConsumerProtocolMemberMetadata], - ) -> Dict[str, ConsumerProtocolMemberAssignment]: - consumers_per_topic: Dict[str, List[str]] = collections.defaultdict(list) + ) -> dict[str, ConsumerProtocolMemberAssignment]: + consumers_per_topic: dict[str, list[str]] = collections.defaultdict(list) for member, metadata in members.items(): for topic in metadata.subscription: consumers_per_topic[topic].append(member) # construct {member_id: {topic: [partition, ...]}} - assignment: Dict[str, Dict[str, List[int]]] = collections.defaultdict(dict) + assignment: dict[str, dict[str, list[int]]] = collections.defaultdict(dict) for topic, consumers_for_topic in consumers_per_topic.items(): partitions = cluster.partitions_for_topic(topic) @@ -66,7 +66,7 @@ def assign( length += 1 assignment[member][topic] = partitions_list[start : start + length] - protocol_assignment: Dict[str, ConsumerProtocolMemberAssignment] = {} + protocol_assignment: dict[str, ConsumerProtocolMemberAssignment] = {} for member_id in members: protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( cls.version, sorted(assignment[member_id].items()), b"" diff --git a/aiokafka/coordinator/assignors/roundrobin.py b/aiokafka/coordinator/assignors/roundrobin.py index 0399b199..f456e59d 100644 --- a/aiokafka/coordinator/assignors/roundrobin.py +++ b/aiokafka/coordinator/assignors/roundrobin.py @@ -1,7 +1,7 @@ import collections import itertools import logging -from typing import Dict, Iterable, List, Mapping +from collections.abc import Iterable, Mapping from aiokafka.cluster import ClusterMetadata from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor @@ -55,12 +55,12 @@ def assign( cls, cluster: ClusterMetadata, members: Mapping[str, ConsumerProtocolMemberMetadata], - ) -> Dict[str, ConsumerProtocolMemberAssignment]: + ) -> dict[str, ConsumerProtocolMemberAssignment]: all_topics = set() for metadata in members.values(): all_topics.update(metadata.subscription) - all_topic_partitions: List[TopicPartition] = [] + all_topic_partitions: list[TopicPartition] = [] for topic in all_topics: partitions = cluster.partitions_for_topic(topic) if partitions is None: @@ -72,7 +72,7 @@ def assign( all_topic_partitions.sort() # construct {member_id: {topic: [partition, ...]}} - assignment: Dict[str, Dict[str, List[int]]] = collections.defaultdict( + assignment: dict[str, dict[str, list[int]]] = collections.defaultdict( lambda: collections.defaultdict(list) ) diff --git a/aiokafka/coordinator/assignors/sticky/partition_movements.py b/aiokafka/coordinator/assignors/sticky/partition_movements.py index e9531113..7b864890 100644 --- a/aiokafka/coordinator/assignors/sticky/partition_movements.py +++ b/aiokafka/coordinator/assignors/sticky/partition_movements.py @@ -1,7 +1,8 @@ import logging from collections import defaultdict +from collections.abc import Sequence from copy import deepcopy -from typing import Any, Dict, List, NamedTuple, Sequence, Set, Tuple +from typing import Any, NamedTuple from aiokafka.structs import TopicPartition @@ -48,8 +49,8 @@ class PartitionMovements: """ def __init__(self) -> None: - self.partition_movements_by_topic: Dict[str, Dict[ConsumerPair, Set[TopicPartition]]] = defaultdict(lambda: defaultdict(set)) # fmt: skip # noqa: E501 - self.partition_movements: Dict[TopicPartition, ConsumerPair] = {} + self.partition_movements_by_topic: dict[str, dict[ConsumerPair, set[TopicPartition]]] = defaultdict(lambda: defaultdict(set)) # fmt: skip # noqa: E501 + self.partition_movements: dict[TopicPartition, ConsumerPair] = {} def move_partition( self, partition: TopicPartition, old_consumer: str, new_consumer: str @@ -124,12 +125,12 @@ def _add_partition_movement_record( self.partition_movements[partition] = pair self.partition_movements_by_topic[partition.topic][pair].add(partition) - def _has_cycles(self, consumer_pairs: Set[ConsumerPair]) -> bool: - cycles: Set[Tuple[str, ...]] = set() + def _has_cycles(self, consumer_pairs: set[ConsumerPair]) -> bool: + cycles: set[tuple[str, ...]] = set() for pair in consumer_pairs: reduced_pairs = deepcopy(consumer_pairs) reduced_pairs.remove(pair) - path: List[str] = [pair.src_member_id] + path: list[str] = [pair.src_member_id] if self._is_linked( pair.dst_member_id, pair.src_member_id, reduced_pairs, path ) and not self._is_subcycle(path, cycles): @@ -147,7 +148,7 @@ def _has_cycles(self, consumer_pairs: Set[ConsumerPair]) -> bool: ) @staticmethod - def _is_subcycle(cycle: List[str], cycles: Set[Tuple[str, ...]]) -> bool: + def _is_subcycle(cycle: list[str], cycles: set[tuple[str, ...]]) -> bool: super_cycle = deepcopy(cycle) super_cycle = super_cycle[:-1] super_cycle.extend(cycle) @@ -157,7 +158,7 @@ def _is_subcycle(cycle: List[str], cycles: Set[Tuple[str, ...]]) -> bool: return False def _is_linked( - self, src: str, dst: str, pairs: Set[ConsumerPair], current_path: List[str] + self, src: str, dst: str, pairs: set[ConsumerPair], current_path: list[str] ) -> bool: if src == dst: return False diff --git a/aiokafka/coordinator/assignors/sticky/sorted_set.py b/aiokafka/coordinator/assignors/sticky/sorted_set.py index 8ffc53bf..bcb1c7b7 100644 --- a/aiokafka/coordinator/assignors/sticky/sorted_set.py +++ b/aiokafka/coordinator/assignors/sticky/sorted_set.py @@ -1,12 +1,9 @@ +from collections.abc import Collection, Iterable, Iterator from typing import ( Any, Callable, - Collection, Generic, - Iterable, - Iterator, Optional, - Set, TypeVar, final, ) @@ -22,7 +19,7 @@ def __init__( key: Optional[Callable[[T], Any]] = None, ) -> None: self._key: Callable[[T], Any] = key if key is not None else lambda x: x - self._set: Set[T] = set(iterable) if iterable is not None else set() + self._set: set[T] = set(iterable) if iterable is not None else set() self._cached_last: Optional[T] = None self._cached_first: Optional[T] = None diff --git a/aiokafka/coordinator/assignors/sticky/sticky_assignor.py b/aiokafka/coordinator/assignors/sticky/sticky_assignor.py index c0462ce1..1e2ef828 100644 --- a/aiokafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/aiokafka/coordinator/assignors/sticky/sticky_assignor.py @@ -1,20 +1,19 @@ import contextlib import logging from collections import defaultdict -from copy import deepcopy -from typing import ( - Any, +from collections.abc import ( Collection, - Dict, Iterable, - List, Mapping, MutableSequence, - NamedTuple, - Optional, Sequence, Sized, - Tuple, +) +from copy import deepcopy +from typing import ( + Any, + NamedTuple, + Optional, ) from aiokafka.cluster import ClusterMetadata @@ -43,7 +42,7 @@ class ConsumerSubscription(NamedTuple): partitions: Sequence[TopicPartition] -def has_identical_list_elements(list_: Sequence[List[Any]]) -> bool: +def has_identical_list_elements(list_: Sequence[list[Any]]) -> bool: """Checks if all lists in the collection have the same members Arguments: @@ -57,13 +56,13 @@ def has_identical_list_elements(list_: Sequence[List[Any]]) -> bool: return all(list_[i] == list_[i - 1] for i in range(1, len(list_))) -def subscriptions_comparator_key(element: Tuple[str, Sized]) -> Tuple[int, str]: +def subscriptions_comparator_key(element: tuple[str, Sized]) -> tuple[int, str]: return len(element[1]), element[0] def partitions_comparator_key( - element: Tuple[TopicPartition, Sized], -) -> Tuple[int, str, int]: + element: tuple[TopicPartition, Sized], +) -> tuple[int, str, int]: return len(element[1]), element[0].topic, element[0].partition @@ -73,8 +72,8 @@ def remove_if_present(collection: MutableSequence[Any], element: Any) -> None: class StickyAssignorMemberMetadataV1(NamedTuple): - subscription: List[str] - partitions: List[TopicPartition] + subscription: list[str] + partitions: list[TopicPartition] generation: int @@ -86,9 +85,9 @@ class StickyAssignorUserDataV1(Struct): class PreviousAssignment(NamedTuple): topic: str - partitions: List[int] + partitions: list[int] - previous_assignment: List[PreviousAssignment] + previous_assignment: list[PreviousAssignment] generation: int SCHEMA = Schema( @@ -104,32 +103,32 @@ class StickyAssignmentExecutor: def __init__( self, cluster: ClusterMetadata, - members: Dict[str, StickyAssignorMemberMetadataV1], + members: dict[str, StickyAssignorMemberMetadataV1], ) -> None: self.members = members # a mapping between consumers and their assigned partitions that is updated # during assignment procedure - self.current_assignment: Dict[str, List[TopicPartition]] = defaultdict(list) + self.current_assignment: dict[str, list[TopicPartition]] = defaultdict(list) # an assignment from a previous generation - self.previous_assignment: Dict[TopicPartition, ConsumerGenerationPair] = {} + self.previous_assignment: dict[TopicPartition, ConsumerGenerationPair] = {} # a mapping between partitions and their assigned consumers - self.current_partition_consumer: Dict[TopicPartition, str] = {} + self.current_partition_consumer: dict[TopicPartition, str] = {} # a flag indicating that there were no previous assignments performed ever self.is_fresh_assignment = False # a mapping of all topic partitions to all consumers that can be assigned to # them - self.partition_to_all_potential_consumers: Dict[TopicPartition, List[str]] = {} + self.partition_to_all_potential_consumers: dict[TopicPartition, list[str]] = {} # a mapping of all consumers to all potential topic partitions that can be # assigned to them - self.consumer_to_all_potential_partitions: Dict[str, List[TopicPartition]] = {} + self.consumer_to_all_potential_partitions: dict[str, list[TopicPartition]] = {} # an ascending sorted set of consumers based on how many topic partitions are # already assigned to them self.sorted_current_subscriptions: SortedSet[ConsumerSubscription] = SortedSet() # an ascending sorted list of topic partitions based on how many consumers can # potentially use them - self.sorted_partitions: List[TopicPartition] = [] + self.sorted_partitions: list[TopicPartition] = [] # all partitions that need to be assigned - self.unassigned_partitions: List[TopicPartition] = [] + self.unassigned_partitions: list[TopicPartition] = [] # a flag indicating that a certain partition cannot remain assigned to its # current consumer because the consumer is no longer subscribed to its topic self.revocation_required = False @@ -167,7 +166,7 @@ def balance(self) -> None: # narrow down the reassignment scope to only those consumers that are subject to # reassignment - fixed_assignments: Dict[str, List[TopicPartition]] = {} + fixed_assignments: dict[str, list[TopicPartition]] = {} for consumer in self.consumer_to_all_potential_partitions: if not self._can_consumer_participate_in_reassignment(consumer): self._remove_consumer_from_current_subscriptions_and_maintain_order( @@ -205,8 +204,8 @@ def balance(self) -> None: self.current_assignment[consumer] = partitions self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) - def get_final_assignment(self, member_id: str) -> Collection[Tuple[str, List[int]]]: - assignment: Dict[str, List[int]] = defaultdict(list) + def get_final_assignment(self, member_id: str) -> Collection[tuple[str, list[int]]]: + assignment: dict[str, list[int]] = defaultdict(list) for topic_partition in self.current_assignment[member_id]: assignment[topic_partition.topic].append(topic_partition.partition) assignment = {k: sorted(v) for k, v in assignment.items()} @@ -242,7 +241,7 @@ def _initialize(self, cluster: ClusterMetadata) -> None: self.current_assignment[consumer_id] = [] def _init_current_assignments( - self, members: Dict[str, StickyAssignorMemberMetadataV1] + self, members: dict[str, StickyAssignorMemberMetadataV1] ) -> None: # we need to process subscriptions' user data with each consumer's reported # generation in mind higher generations overwrite lower generations in case of @@ -250,8 +249,8 @@ def _init_current_assignments( # different generations # for each partition we create a map of its consumers by generation - sorted_partition_consumers_by_generation: Dict[ - TopicPartition, Dict[int, str] + sorted_partition_consumers_by_generation: dict[ + TopicPartition, dict[int, str] ] = {} for consumer, member_metadata in members.items(): for partition in member_metadata.partitions: @@ -380,7 +379,7 @@ def _populate_sorted_partitions(self) -> None: def _populate_partitions_to_reassign(self) -> None: self.unassigned_partitions = deepcopy(self.sorted_partitions) - assignments_to_remove: List[str] = [] + assignments_to_remove: list[str] = [] for consumer_id, partitions in self.current_assignment.items(): if consumer_id not in self.members: # if a consumer that existed before (and had some partition assignments) @@ -536,7 +535,7 @@ def _can_consumer_participate_in_reassignment(self, consumer: str) -> bool: return False def _perform_reassignments( - self, reassignable_partitions: List[TopicPartition] + self, reassignable_partitions: list[TopicPartition] ) -> bool: reassignment_performed = False @@ -637,7 +636,7 @@ def _move_partition(self, partition: TopicPartition, new_consumer: str) -> None: self._add_consumer_to_current_subscriptions_and_maintain_order(old_consumer) @staticmethod - def _get_balance_score(assignment: Dict[str, List[TopicPartition]]) -> int: + def _get_balance_score(assignment: dict[str, list[TopicPartition]]) -> int: """Calculates a balance score of a give assignment as the sum of assigned partitions size difference of all consumer pairs. A perfectly balanced assignment (with all consumers getting the same number of @@ -651,7 +650,7 @@ def _get_balance_score(assignment: Dict[str, List[TopicPartition]]) -> int: the balance score of the assignment """ score = 0 - consumer_to_assignment: Dict[str, int] = {} + consumer_to_assignment: dict[str, int] = {} for consumer_id, partitions in assignment.items(): consumer_to_assignment[consumer_id] = len(partitions) @@ -746,7 +745,7 @@ class StickyPartitionAssignor(AbstractPartitionAssignor): name = "sticky" version = 0 - member_assignment: Optional[List[TopicPartition]] = None + member_assignment: Optional[list[TopicPartition]] = None generation: int = DEFAULT_GENERATION_ID _latest_partition_movements: Optional[PartitionMovements] = None @@ -756,7 +755,7 @@ def assign( cls, cluster: ClusterMetadata, members: Mapping[str, ConsumerProtocolMemberMetadata], - ) -> Dict[str, ConsumerProtocolMemberAssignment]: + ) -> dict[str, ConsumerProtocolMemberAssignment]: """Performs group assignment given cluster metadata and member subscriptions Arguments: @@ -767,7 +766,7 @@ def assign( Returns: dict: {member_id: MemberAssignment} """ - members_metadata: Dict[str, StickyAssignorMemberMetadataV1] = {} + members_metadata: dict[str, StickyAssignorMemberMetadataV1] = {} for consumer, member_metadata in members.items(): members_metadata[consumer] = cls.parse_member_metadata(member_metadata) @@ -777,7 +776,7 @@ def assign( cls._latest_partition_movements = executor.partition_movements - assignment: Dict[str, ConsumerProtocolMemberAssignment] = {} + assignment: dict[str, ConsumerProtocolMemberAssignment] = {} for member_id in members: assignment[member_id] = ConsumerProtocolMemberAssignment( cls.version, @@ -822,7 +821,7 @@ def parse_member_metadata( subscription=metadata.subscription, ) - member_partitions: List[TopicPartition] = [] + member_partitions: list[TopicPartition] = [] for ( topic, partitions, @@ -844,7 +843,7 @@ def metadata(cls, topics: Iterable[str]) -> ConsumerProtocolMemberMetadata: def _metadata( cls, topics: Iterable[str], - member_assignment_partitions: Optional[List[TopicPartition]], + member_assignment_partitions: Optional[list[TopicPartition]], generation: int = -1, ) -> ConsumerProtocolMemberMetadata: if member_assignment_partitions is None: diff --git a/aiokafka/coordinator/protocol.py b/aiokafka/coordinator/protocol.py index afc15e56..5abd9508 100644 --- a/aiokafka/coordinator/protocol.py +++ b/aiokafka/coordinator/protocol.py @@ -1,4 +1,4 @@ -from typing import List, NamedTuple +from typing import NamedTuple from aiokafka.protocol.struct import Struct from aiokafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String @@ -7,7 +7,7 @@ class ConsumerProtocolMemberMetadata(Struct): version: int - subscription: List[str] + subscription: list[str] user_data: bytes SCHEMA = Schema( @@ -20,10 +20,10 @@ class ConsumerProtocolMemberMetadata(Struct): class ConsumerProtocolMemberAssignment(Struct): class Assignment(NamedTuple): topic: str - partitions: List[int] + partitions: list[int] version: int - assignment: List[Assignment] + assignment: list[Assignment] user_data: bytes SCHEMA = Schema( @@ -32,7 +32,7 @@ class Assignment(NamedTuple): ("user_data", Bytes), ) - def partitions(self) -> List[TopicPartition]: + def partitions(self) -> list[TopicPartition]: return [ TopicPartition(topic, partition) for topic, partitions in self.assignment diff --git a/aiokafka/errors.py b/aiokafka/errors.py index b4f5c48f..07c87d90 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -1,4 +1,5 @@ -from typing import Any, Iterable, Type, TypeVar +from collections.abc import Iterable +from typing import Any, TypeVar __all__ = [ # aiokafka custom errors @@ -872,5 +873,5 @@ def _iter_subclasses(cls: _T) -> Iterable[_T]: kafka_errors = {x.errno: x for x in _iter_subclasses(BrokerResponseError)} -def for_code(error_code: int) -> Type[BrokerResponseError]: +def for_code(error_code: int) -> type[BrokerResponseError]: return kafka_errors.get(error_code, UnknownError) diff --git a/aiokafka/metrics/metric_name.py b/aiokafka/metrics/metric_name.py index d67f78de..be1f2f88 100644 --- a/aiokafka/metrics/metric_name.py +++ b/aiokafka/metrics/metric_name.py @@ -92,7 +92,7 @@ def __eq__(self, other): if other is None: return False return ( - type(self) == type(other) + type(self) is type(other) and self.group == other.group and self.name == other.name and self.tags == other.tags diff --git a/aiokafka/metrics/quota.py b/aiokafka/metrics/quota.py index d3117937..9e86fa47 100644 --- a/aiokafka/metrics/quota.py +++ b/aiokafka/metrics/quota.py @@ -34,7 +34,7 @@ def __eq__(self, other): if self is other: return True return ( - type(self) == type(other) + type(self) is type(other) and self.bound == other.bound and self.is_upper_bound() == other.is_upper_bound() ) diff --git a/aiokafka/metrics/stats/histogram.py b/aiokafka/metrics/stats/histogram.py index bb3b2f61..46aeb2c7 100644 --- a/aiokafka/metrics/stats/histogram.py +++ b/aiokafka/metrics/stats/histogram.py @@ -37,7 +37,7 @@ def __str__(self): for i, value in enumerate(self._hist[:-1]) ] values.append("{}:{}".format(float("inf"), self._hist[-1])) - return "{%s}" % ",".join(values) + return "{{{}}}".format(",".join(values)) class ConstantBinScheme: def __init__(self, bins, min_val, max_val): diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index fb53a9df..ab956e81 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -277,7 +277,7 @@ def __init__( AIOKafkaProducer._PRODUCER_CLIENT_ID_SEQUENCE += 1 if client_id is None: client_id = ( - "aiokafka-producer-%s" % AIOKafkaProducer._PRODUCER_CLIENT_ID_SEQUENCE + f"aiokafka-producer-{AIOKafkaProducer._PRODUCER_CLIENT_ID_SEQUENCE}" ) self._key_serializer = key_serializer diff --git a/aiokafka/producer/sender.py b/aiokafka/producer/sender.py index 519b5d34..f743b208 100644 --- a/aiokafka/producer/sender.py +++ b/aiokafka/producer/sender.py @@ -877,6 +877,4 @@ def _can_retry(self, error, batch): # as long as we set proper sequence, pid and epoch. if self._sender._txn_manager is None and batch.expired(): return False - if error.retriable: - return True - return False + return error.retriable diff --git a/aiokafka/protocol/admin.py b/aiokafka/protocol/admin.py index 2f374286..e834d132 100644 --- a/aiokafka/protocol/admin.py +++ b/aiokafka/protocol/admin.py @@ -1,4 +1,5 @@ -from typing import Dict, Iterable, Optional, Tuple +from collections.abc import Iterable +from typing import Optional from .api import Request, Response from .types import ( @@ -1389,9 +1390,9 @@ class DeleteRecordsRequest_v2(Request): def __init__( self, - topics: Iterable[Tuple[str, Iterable[Tuple[int, int]]]], + topics: Iterable[tuple[str, Iterable[tuple[int, int]]]], timeout_ms: int, - tags: Optional[Dict[int, bytes]] = None, + tags: Optional[dict[int, bytes]] = None, ) -> None: super().__init__( [ diff --git a/aiokafka/protocol/api.py b/aiokafka/protocol/api.py index 1e6ee3b6..aeede6d9 100644 --- a/aiokafka/protocol/api.py +++ b/aiokafka/protocol/api.py @@ -2,7 +2,7 @@ import abc from io import BytesIO -from typing import Any, ClassVar, Dict, Optional, Type, Union +from typing import Any, ClassVar, Optional, Union from .struct import Struct from .types import Array, Int16, Int32, Schema, String, TaggedFields @@ -39,7 +39,7 @@ def __init__( request: Request, correlation_id: int = 0, client_id: str = "aiokafka", - tags: Optional[Dict[int, bytes]] = None, + tags: Optional[dict[int, bytes]] = None, ): super().__init__( request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {} @@ -74,7 +74,7 @@ def API_VERSION(self) -> int: @property @abc.abstractmethod - def RESPONSE_TYPE(self) -> Type[Response]: + def RESPONSE_TYPE(self) -> type[Response]: """The Response class associated with the api request""" @property @@ -86,7 +86,7 @@ def expect_response(self) -> bool: """Override this method if an api request does not always generate a response""" return True - def to_object(self) -> Dict[str, Any]: + def to_object(self) -> dict[str, Any]: return _to_object(self.SCHEMA, self) def build_request_header( @@ -124,12 +124,12 @@ def API_VERSION(self) -> int: def SCHEMA(self) -> Schema: """An instance of Schema() representing the response structure""" - def to_object(self) -> Dict[str, Any]: + def to_object(self) -> dict[str, Any]: return _to_object(self.SCHEMA, self) -def _to_object(schema: Schema, data: Union[Struct, Dict[int, Any]]) -> Dict[str, Any]: - obj: Dict[str, Any] = {} +def _to_object(schema: Schema, data: Union[Struct, dict[int, Any]]) -> dict[str, Any]: + obj: dict[str, Any] = {} for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)): if isinstance(data, Struct): val = data.get_item(name) diff --git a/aiokafka/protocol/fetch.py b/aiokafka/protocol/fetch.py index 8f93754a..90b9e8a5 100644 --- a/aiokafka/protocol/fetch.py +++ b/aiokafka/protocol/fetch.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Tuple +from typing import Optional from .api import Request, Response from .types import Array, Bytes, Int8, Int16, Int32, Int64, Schema, String @@ -25,7 +25,7 @@ class FetchResponse_v0(Response): ) ) - topics: Optional[List[Tuple[str, List[Tuple[int, int, int, bytes]]]]] + topics: Optional[list[tuple[str, list[tuple[int, int, int, bytes]]]]] class FetchResponse_v1(Response): diff --git a/aiokafka/protocol/message.py b/aiokafka/protocol/message.py index 77103af4..fbdc2cf6 100644 --- a/aiokafka/protocol/message.py +++ b/aiokafka/protocol/message.py @@ -1,7 +1,8 @@ import io import time from binascii import crc32 -from typing import Iterable, List, Literal, Optional, Tuple, Union, cast, overload +from collections.abc import Iterable +from typing import Literal, Optional, Union, cast, overload from typing_extensions import Self @@ -196,16 +197,14 @@ def validate_crc(self) -> bool: if self._validated_crc is None: raw_msg = self.encode(recalc_crc=False) self._validated_crc = crc32(raw_msg[4:]) - if self.crc == self._validated_crc: - return True - return False + return self.crc == self._validated_crc def is_compressed(self) -> bool: return self.attributes & self.CODEC_MASK != 0 def decompress( self, - ) -> List[Union[Tuple[int, int, "Message"], Tuple[None, None, "PartialMessage"]]]: + ) -> list[Union[tuple[int, int, "Message"], tuple[None, None, "PartialMessage"]]]: assert self.value is not None codec = self.attributes & self.CODEC_MASK assert codec in ( @@ -253,7 +252,7 @@ class MessageSet: @classmethod def encode( cls, - items: Union[io.BytesIO, Iterable[Tuple[int, bytes]]], + items: Union[io.BytesIO, Iterable[tuple[int, bytes]]], prepend_size: bool = True, ) -> bytes: # RecordAccumulator encodes messagesets internally @@ -265,7 +264,7 @@ def encode( size += 4 return items.read(size) - encoded_values: List[bytes] = [] + encoded_values: list[bytes] = [] for offset, message in items: encoded_values.append(Int64.encode(offset)) encoded_values.append(Bytes.encode(message)) @@ -278,7 +277,7 @@ def encode( @classmethod def decode( cls, data: Union[io.BytesIO, bytes], bytes_to_read: Optional[int] = None - ) -> List[Union[Tuple[int, int, Message], Tuple[None, None, PartialMessage]]]: + ) -> list[Union[tuple[int, int, Message], tuple[None, None, PartialMessage]]]: """Compressed messages should pass in bytes_to_read (via message size) otherwise, we decode from data as Int32 """ @@ -292,8 +291,8 @@ def decode( # So create an internal buffer to avoid over-reading raw = io.BytesIO(data.read(bytes_to_read)) - items: List[ - Union[Tuple[int, int, Message], Tuple[None, None, PartialMessage]] + items: list[ + Union[tuple[int, int, Message], tuple[None, None, PartialMessage]] ] = [] try: while bytes_to_read: @@ -316,7 +315,7 @@ def repr( cls, messages: Union[ io.BytesIO, - List[Union[Tuple[int, int, Message], Tuple[None, None, PartialMessage]]], + list[Union[tuple[int, int, Message], tuple[None, None, PartialMessage]]], ], ) -> str: if isinstance(messages, io.BytesIO): diff --git a/aiokafka/protocol/produce.py b/aiokafka/protocol/produce.py index 6b69fc31..849f8e2a 100644 --- a/aiokafka/protocol/produce.py +++ b/aiokafka/protocol/produce.py @@ -170,9 +170,7 @@ class ProduceRequestBase(Request): required_acks: int def expect_response(self) -> bool: - if self.required_acks == 0: - return False - return True + return self.required_acks != 0 class ProduceRequest_v0(ProduceRequestBase): diff --git a/aiokafka/protocol/struct.py b/aiokafka/protocol/struct.py index fc1461bf..4786dccf 100644 --- a/aiokafka/protocol/struct.py +++ b/aiokafka/protocol/struct.py @@ -1,5 +1,5 @@ from io import BytesIO -from typing import Any, ClassVar, List, Union +from typing import Any, ClassVar, Union from typing_extensions import Self @@ -36,11 +36,11 @@ def decode(cls, data: Union[BytesIO, bytes]) -> Self: def get_item(self, name: str) -> Any: if name not in self.SCHEMA.names: - raise KeyError("%s is not in the schema" % name) + raise KeyError(f"{name} is not in the schema") return self.__dict__[name] def __repr__(self) -> str: - key_vals: List[str] = [] + key_vals: list[str] = [] for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields): key_vals.append(f"{name}={field.repr(self.__dict__[name])}") return self.__class__.__name__ + "(" + ", ".join(key_vals) + ")" diff --git a/aiokafka/protocol/types.py b/aiokafka/protocol/types.py index 944783c0..bd6389b6 100644 --- a/aiokafka/protocol/types.py +++ b/aiokafka/protocol/types.py @@ -1,15 +1,11 @@ import struct +from collections.abc import Sequence from io import BytesIO from struct import error from typing import ( Any, Callable, - Dict, - List, Optional, - Sequence, - Tuple, - Type, TypeVar, Union, cast, @@ -22,7 +18,7 @@ T = TypeVar("T") -ValueT: TypeAlias = Union[Type[AbstractType[Any]], "String", "Array", "Schema"] +ValueT: TypeAlias = Union[type[AbstractType[Any]], "String", "Array", "Schema"] def _pack(f: Callable[[T], bytes], value: T) -> bytes: @@ -35,7 +31,7 @@ def _pack(f: Callable[[T], bytes], value: T) -> bytes: ) from e -def _unpack(f: Callable[[Buffer], Tuple[T, ...]], data: Buffer) -> T: +def _unpack(f: Callable[[Buffer], tuple[T, ...]], data: Buffer) -> T: try: (value,) = f(data) except error as e: @@ -188,10 +184,10 @@ def decode(cls, data: BytesIO) -> bool: class Schema: - names: Tuple[str, ...] - fields: Tuple[ValueT, ...] + names: tuple[str, ...] + fields: tuple[ValueT, ...] - def __init__(self, *fields: Tuple[str, ValueT]): + def __init__(self, *fields: tuple[str, ValueT]): if fields: self.names, self.fields = zip(*fields) else: @@ -204,14 +200,14 @@ def encode(self, item: Sequence[Any]) -> bytes: def decode( self, data: BytesIO - ) -> Tuple[Union[Any, str, None, List[Union[Any, Tuple[Any, ...]]]], ...]: + ) -> tuple[Union[Any, str, None, list[Union[Any, tuple[Any, ...]]]], ...]: return tuple(field.decode(data) for field in self.fields) def __len__(self) -> int: return len(self.fields) def repr(self, value: Any) -> str: - key_vals: List[str] = [] + key_vals: list[str] = [] try: for i in range(len(self)): try: @@ -232,16 +228,16 @@ def __init__(self, array_of_0: ValueT): ... @overload def __init__( - self, array_of_0: Tuple[str, ValueT], *array_of: Tuple[str, ValueT] + self, array_of_0: tuple[str, ValueT], *array_of: tuple[str, ValueT] ): ... def __init__( self, - array_of_0: Union[ValueT, Tuple[str, ValueT]], - *array_of: Tuple[str, ValueT], + array_of_0: Union[ValueT, tuple[str, ValueT]], + *array_of: tuple[str, ValueT], ) -> None: if array_of: - array_of_0 = cast(Tuple[str, ValueT], array_of_0) + array_of_0 = cast(tuple[str, ValueT], array_of_0) self.array_of = Schema(array_of_0, *array_of) else: array_of_0 = cast(ValueT, array_of_0) @@ -260,7 +256,7 @@ def encode(self, items: Optional[Sequence[Any]]) -> bytes: (Int32.encode(len(items)), *encoded_items), ) - def decode(self, data: BytesIO) -> Optional[List[Union[Any, Tuple[Any, ...]]]]: + def decode(self, data: BytesIO) -> Optional[list[Union[Any, tuple[Any, ...]]]]: length = Int32.decode(data) if length == -1: return None @@ -360,11 +356,11 @@ def encode(self, value: Optional[str]) -> bytes: return UnsignedVarInt32.encode(len(encoded_value) + 1) + encoded_value -class TaggedFields(AbstractType[Dict[int, bytes]]): +class TaggedFields(AbstractType[dict[int, bytes]]): @classmethod - def decode(cls, data: BytesIO) -> Dict[int, bytes]: + def decode(cls, data: BytesIO) -> dict[int, bytes]: num_fields = UnsignedVarInt32.decode(data) - ret: Dict[int, bytes] = {} + ret: dict[int, bytes] = {} if not num_fields: return ret prev_tag = -1 @@ -379,7 +375,7 @@ def decode(cls, data: BytesIO) -> Dict[int, bytes]: return ret @classmethod - def encode(cls, value: Dict[int, bytes]) -> bytes: + def encode(cls, value: dict[int, bytes]) -> bytes: ret = UnsignedVarInt32.encode(len(value)) for k, v in value.items(): # do we allow for other data types ?? It could get complicated really fast @@ -418,7 +414,7 @@ def encode(self, items: Optional[Sequence[Any]]) -> bytes: (UnsignedVarInt32.encode(len(items) + 1), *encoded_items), ) - def decode(self, data: BytesIO) -> Optional[List[Union[Any, Tuple[Any, ...]]]]: + def decode(self, data: BytesIO) -> Optional[list[Union[Any, tuple[Any, ...]]]]: length = UnsignedVarInt32.decode(data) - 1 if length == -1: return None diff --git a/aiokafka/record/_crc32c.py b/aiokafka/record/_crc32c.py index 22158249..0fd9368a 100644 --- a/aiokafka/record/_crc32c.py +++ b/aiokafka/record/_crc32c.py @@ -23,7 +23,7 @@ """ import array -from typing import Iterable +from collections.abc import Iterable # fmt: off CRC_TABLE = ( @@ -106,7 +106,7 @@ def crc_update(crc: int, data: Iterable[int]) -> int: Returns: 32-bit updated CRC-32C as long. """ - if type(data) != array.array or data.itemsize != 1: + if not isinstance(data, array.array) or data.itemsize != 1: buf = array.array("B", data) else: buf = data diff --git a/aiokafka/record/_crecords/legacy_records.pyi b/aiokafka/record/_crecords/legacy_records.pyi index b160d99b..39aec43d 100644 --- a/aiokafka/record/_crecords/legacy_records.pyi +++ b/aiokafka/record/_crecords/legacy_records.pyi @@ -1,4 +1,5 @@ -from typing import Any, ClassVar, Generator, final +from collections.abc import Generator +from typing import Any, ClassVar, final from typing_extensions import Buffer, Literal, Never diff --git a/aiokafka/record/_protocols.py b/aiokafka/record/_protocols.py index 176932b1..20ab0acb 100644 --- a/aiokafka/record/_protocols.py +++ b/aiokafka/record/_protocols.py @@ -1,14 +1,11 @@ from __future__ import annotations +from collections.abc import Iterable, Iterator from typing import ( Any, ClassVar, - Iterable, - Iterator, - List, Optional, Protocol, - Tuple, Union, runtime_checkable, ) @@ -44,7 +41,7 @@ def append( timestamp: Optional[int], key: Optional[bytes], value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], + headers: list[tuple[str, Optional[bytes]]], ) -> Optional[DefaultRecordMetadataProtocol]: ... def build(self) -> bytearray: ... def size(self) -> int: ... @@ -54,21 +51,21 @@ def size_in_bytes( timestamp: int, key: Optional[bytes], value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], + headers: list[tuple[str, Optional[bytes]]], ) -> int: ... @classmethod def size_of( cls, key: Optional[bytes], value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], + headers: list[tuple[str, Optional[bytes]]], ) -> int: ... @classmethod def estimate_size_in_bytes( cls, key: Optional[bytes], value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], + headers: list[tuple[str, Optional[bytes]]], ) -> int: ... def set_producer_state( self, producer_id: int, producer_epoch: int, base_sequence: int @@ -144,7 +141,7 @@ def __init__( timestamp_type: int, key: Optional[bytes], value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], + headers: list[tuple[str, Optional[bytes]]], ) -> None: ... @property def offset(self) -> int: ... @@ -165,7 +162,7 @@ def value(self) -> Optional[bytes]: """Bytes value or None""" @property - def headers(self) -> List[Tuple[str, Optional[bytes]]]: ... + def headers(self) -> list[tuple[str, Optional[bytes]]]: ... @property def checksum(self) -> None: ... @@ -262,7 +259,7 @@ def value(self) -> Optional[bytes]: """Bytes value or None""" @property - def headers(self) -> List[Never]: ... + def headers(self) -> list[Never]: ... @property def checksum(self) -> int: ... diff --git a/aiokafka/record/default_records.py b/aiokafka/record/default_records.py index 8b0a596d..8b39cdc5 100644 --- a/aiokafka/record/default_records.py +++ b/aiokafka/record/default_records.py @@ -56,8 +56,9 @@ import struct import time +from collections.abc import Sized from dataclasses import dataclass -from typing import Any, Callable, List, Optional, Sized, Tuple, Type, Union, final +from typing import Any, Callable, Optional, Union, final from typing_extensions import Self, TypeIs, assert_never @@ -156,7 +157,7 @@ def _assert_has_codec( class _DefaultRecordBatchPy(DefaultRecordBase, DefaultRecordBatchProtocol): def __init__(self, buffer: Union[bytes, bytearray, memoryview]) -> None: self._buffer = bytearray(buffer) - self._header_data: Tuple[ + self._header_data: tuple[ int, int, int, int, int, int, int, int, int, int, int, int, int ] = self.HEADER_STRUCT.unpack_from(self._buffer) self._pos = self.HEADER_STRUCT.size @@ -245,7 +246,7 @@ def _maybe_uncompress(self) -> None: self._decompressed = True def _read_msg( - self, decode_varint: Callable[[bytearray, int], Tuple[int, int]] = decode_varint + self, decode_varint: Callable[[bytearray, int], tuple[int, int]] = decode_varint ) -> "_DefaultRecordPy": # Record => # Length => Varint @@ -292,7 +293,7 @@ def _read_msg( raise CorruptRecordException( f"Found invalid number of record headers {header_count}" ) - headers: List[Tuple[str, Optional[bytes]]] = [] + headers: list[tuple[str, Optional[bytes]]] = [] while header_count: # Header key is of type String, that can't be None h_key_len, pos = decode_varint(buffer, pos) @@ -367,7 +368,7 @@ class _DefaultRecordPy(DefaultRecordProtocol): timestamp_type: int key: Optional[bytes] value: Optional[bytes] - headers: List[Tuple[str, Optional[bytes]]] + headers: list[tuple[str, Optional[bytes]]] @property def checksum(self) -> None: @@ -432,19 +433,19 @@ def append( timestamp: Optional[int], key: Optional[bytes], value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], + headers: list[tuple[str, Optional[bytes]]], # Cache for LOAD_FAST opcodes encode_varint: Callable[[int, Callable[[int], None]], int] = encode_varint, size_of_varint: Callable[[int], int] = size_of_varint, get_type: Callable[[Any], type] = type, - type_int: Type[int] = int, + type_int: type[int] = int, time_time: Callable[[], float] = time.time, - byte_like: Tuple[Type[bytes], Type[bytearray], Type[memoryview]] = ( + byte_like: tuple[type[bytes], type[bytearray], type[memoryview]] = ( bytes, bytearray, memoryview, ), - bytearray_type: Type[bytearray] = bytearray, + bytearray_type: type[bytearray] = bytearray, len_func: Callable[[Sized], int] = len, zero_len_varint: int = 1, ) -> Optional["_DefaultRecordMetadataPy"]: @@ -518,8 +519,7 @@ def append( # Those should be updated after the length check assert self._max_timestamp is not None - if self._max_timestamp < timestamp: - self._max_timestamp = timestamp + self._max_timestamp = max(self._max_timestamp, timestamp) self._num_records += 1 self._last_offset = offset @@ -593,7 +593,7 @@ def size_in_bytes( timestamp: int, key: Optional[bytes], value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], + headers: list[tuple[str, Optional[bytes]]], ) -> int: if self._first_timestamp is not None: timestamp_delta = timestamp - self._first_timestamp @@ -612,7 +612,7 @@ def size_of( cls, key: Optional[bytes], value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], + headers: list[tuple[str, Optional[bytes]]], ) -> int: size = 0 # Key size @@ -645,7 +645,7 @@ def estimate_size_in_bytes( cls, key: Optional[bytes], value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], + headers: list[tuple[str, Optional[bytes]]], ) -> int: """Get the upper bound estimate on the size of record""" return ( @@ -694,10 +694,10 @@ def __repr__(self) -> str: ) -DefaultRecordBatchBuilder: Type[DefaultRecordBatchBuilderProtocol] -DefaultRecordMetadata: Type[DefaultRecordMetadataProtocol] -DefaultRecordBatch: Type[DefaultRecordBatchProtocol] -DefaultRecord: Type[DefaultRecordProtocol] +DefaultRecordBatchBuilder: type[DefaultRecordBatchBuilderProtocol] +DefaultRecordMetadata: type[DefaultRecordMetadataProtocol] +DefaultRecordBatch: type[DefaultRecordBatchProtocol] +DefaultRecord: type[DefaultRecordProtocol] if NO_EXTENSIONS: DefaultRecordBatchBuilder = _DefaultRecordBatchBuilderPy diff --git a/aiokafka/record/legacy_records.py b/aiokafka/record/legacy_records.py index 32878229..fa28c157 100644 --- a/aiokafka/record/legacy_records.py +++ b/aiokafka/record/legacy_records.py @@ -3,8 +3,9 @@ import struct import time from binascii import crc32 +from collections.abc import Generator from dataclasses import dataclass -from typing import Any, Generator, List, Optional, Tuple, Type, Union, final +from typing import Any, Optional, Union, final from typing_extensions import Literal, Never, TypeIs, assert_never @@ -193,7 +194,7 @@ def _decompress(self, key_offset: int) -> bytes: assert_never(compression_type) return uncompressed - def _read_header(self, pos: int) -> Tuple[int, int, int, int, int, Optional[int]]: + def _read_header(self, pos: int) -> tuple[int, int, int, int, int, Optional[int]]: if self._magic == 0: offset, length, crc, magic_read, attrs = self.HEADER_STRUCT_V0.unpack_from( self._buffer, pos @@ -212,9 +213,9 @@ def _read_header(self, pos: int) -> Tuple[int, int, int, int, int, Optional[int] def _read_all_headers( self, - ) -> List[Tuple[Tuple[int, int, int, int, int, Optional[int]], int]]: + ) -> list[tuple[tuple[int, int, int, int, int, Optional[int]], int]]: pos = 0 - msgs: List[Tuple[Tuple[int, int, int, int, int, Optional[int]], int]] = [] + msgs: list[tuple[tuple[int, int, int, int, int, Optional[int]], int]] = [] buffer_len = len(self._buffer) while pos < buffer_len: header = self._read_header(pos) @@ -222,7 +223,7 @@ def _read_all_headers( pos += self.LOG_OVERHEAD + header[1] # length return msgs - def _read_key_value(self, pos: int) -> Tuple[Optional[bytes], Optional[bytes]]: + def _read_key_value(self, pos: int) -> tuple[Optional[bytes], Optional[bytes]]: key_size: int = struct.unpack_from(">i", self._buffer, pos)[0] pos += self.KEY_LENGTH if key_size == -1: @@ -302,7 +303,7 @@ class _LegacyRecordPy(LegacyRecordProtocol): crc: int @property - def headers(self) -> List[Never]: + def headers(self) -> list[Never]: return [] @property @@ -331,7 +332,7 @@ def __init__( self._magic = magic self._compression_type = compression_type self._batch_size = batch_size - self._msg_buffers: List[bytearray] = [] + self._msg_buffers: list[bytearray] = [] self._pos = 0 def append( @@ -571,10 +572,10 @@ def __repr__(self) -> str: ) -LegacyRecordBatchBuilder: Type[LegacyRecordBatchBuilderProtocol] -LegacyRecordMetadata: Type[LegacyRecordMetadataProtocol] -LegacyRecordBatch: Type[LegacyRecordBatchProtocol] -LegacyRecord: Type[LegacyRecordProtocol] +LegacyRecordBatchBuilder: type[LegacyRecordBatchBuilderProtocol] +LegacyRecordMetadata: type[LegacyRecordMetadataProtocol] +LegacyRecordBatch: type[LegacyRecordBatchProtocol] +LegacyRecord: type[LegacyRecordProtocol] if NO_EXTENSIONS: LegacyRecordBatchBuilder = _LegacyRecordBatchBuilderPy diff --git a/aiokafka/record/memory_records.py b/aiokafka/record/memory_records.py index b618d4a8..8e7bc0c3 100644 --- a/aiokafka/record/memory_records.py +++ b/aiokafka/record/memory_records.py @@ -20,7 +20,7 @@ # used to construct the correct class for Batch itself. import struct -from typing import Optional, Type, Union, final +from typing import Optional, Union, final from aiokafka.errors import CorruptRecordException from aiokafka.util import NO_EXTENSIONS @@ -104,7 +104,7 @@ def next_batch( return LegacyRecordBatch(next_slice, magic) -MemoryRecords: Type[MemoryRecordsProtocol] +MemoryRecords: type[MemoryRecordsProtocol] if NO_EXTENSIONS: MemoryRecords = _MemoryRecordsPy diff --git a/aiokafka/record/util.py b/aiokafka/record/util.py index 5133bc8d..9ad54bb0 100644 --- a/aiokafka/record/util.py +++ b/aiokafka/record/util.py @@ -1,4 +1,5 @@ -from typing import Callable, Iterable, Tuple, Union +from collections.abc import Iterable +from typing import Callable, Union from aiokafka.util import NO_EXTENSIONS @@ -82,7 +83,7 @@ def size_of_varint_py(value: int) -> int: return 10 -def decode_varint_py(buffer: bytearray, pos: int = 0) -> Tuple[int, int]: +def decode_varint_py(buffer: bytearray, pos: int = 0) -> tuple[int, int]: """Decode an integer from a varint presentation. See https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints on how those can be produced. @@ -121,7 +122,7 @@ def calc_crc32c_py(memview: Iterable[int]) -> int: calc_crc32c: Callable[[Union[bytes, bytearray]], int] -decode_varint: Callable[[bytearray, int], Tuple[int, int]] +decode_varint: Callable[[bytearray, int], tuple[int, int]] size_of_varint: Callable[[int], int] encode_varint: Callable[[int, Callable[[int], None]], int] diff --git a/aiokafka/structs.py b/aiokafka/structs.py index 7118b28c..3db1d20a 100644 --- a/aiokafka/structs.py +++ b/aiokafka/structs.py @@ -1,5 +1,6 @@ +from collections.abc import Sequence from dataclasses import dataclass -from typing import Generic, List, NamedTuple, Optional, Sequence, Tuple, TypeVar +from typing import Generic, NamedTuple, Optional, TypeVar from aiokafka.errors import KafkaError @@ -54,9 +55,9 @@ class PartitionMetadata(NamedTuple): leader: int "The id of the broker that is the leader for the partition" - replicas: List[int] + replicas: list[int] "The ids of all brokers that contain replicas of the partition" - isr: List[int] + isr: list[int] "The ids of all brokers that contain in-sync replicas of the partition" error: Optional[KafkaError] @@ -152,7 +153,7 @@ class ConsumerRecord(Generic[KT, VT]): serialized_value_size: int "The size of the serialized, uncompressed value in bytes." - headers: Sequence[Tuple[str, bytes]] + headers: Sequence[tuple[str, bytes]] "The headers" diff --git a/aiokafka/util.py b/aiokafka/util.py index 875d774b..56a576ea 100644 --- a/aiokafka/util.py +++ b/aiokafka/util.py @@ -3,13 +3,10 @@ import asyncio import os from asyncio import AbstractEventLoop +from collections.abc import Awaitable, Coroutine from typing import ( Any, - Awaitable, - Coroutine, - Dict, Optional, - Tuple, TypeVar, Union, cast, @@ -50,11 +47,11 @@ async def wait_for(fut: Awaitable[T], timeout: Union[None, int, float] = None) - return await fut -def parse_kafka_version(api_version: str) -> Tuple[int, int, int]: +def parse_kafka_version(api_version: str) -> tuple[int, int, int]: parsed = Version(api_version).release if not 2 <= len(parsed) <= 3: raise ValueError(api_version) - version = cast(Tuple[int, int, int], (*parsed, 0)[:3]) + version = cast(tuple[int, int, int], (*parsed, 0)[:3]) if not (0, 9) <= version < (3, 0): raise ValueError(api_version) @@ -62,8 +59,8 @@ def parse_kafka_version(api_version: str) -> Tuple[int, int, int]: def commit_structure_validate( - offsets: Dict[TopicPartition, Union[int, Tuple[int, str], OffsetAndMetadata]], -) -> Dict[TopicPartition, OffsetAndMetadata]: + offsets: dict[TopicPartition, Union[int, tuple[int, str], OffsetAndMetadata]], +) -> dict[TopicPartition, OffsetAndMetadata]: # validate `offsets` structure if not offsets or not isinstance(offsets, dict): raise ValueError(offsets) @@ -78,7 +75,7 @@ def commit_structure_validate( else: try: offset, metadata = offset_and_metadata - except Exception as exc: # noqa: BLE001 + except Exception as exc: raise ValueError(offsets) from exc if not isinstance(metadata, str): diff --git a/docs/index.rst b/docs/index.rst index 97fb8eb5..389d6173 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -5,8 +5,6 @@ Welcome to aiokafka's documentation! .. _asyncio: https://docs.python.org/3/library/asyncio.html .. _kafka-python: https://github.com/dpkp/kafka-python -.. image:: https://img.shields.io/badge/kafka-1.0%2C%200.11%2C%200.10%2C%200.9-brightgreen.svg - :target: https://kafka.apache.org .. image:: https://img.shields.io/pypi/pyversions/aiokafka.svg :target: https://pypi.python.org/pypi/aiokafka .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg diff --git a/gen-ssl-certs.sh b/gen-ssl-certs.sh index 07e9be3d..aafd01a9 100644 --- a/gen-ssl-certs.sh +++ b/gen-ssl-certs.sh @@ -1,169 +1,82 @@ #!/bin/bash -#### Kind thanks for ``librdkafka`` for making this great script. -# Taken from https://raw.githubusercontent.com/edenhill/librdkafka/master/tests/gen-ssl-certs.sh - -# -# -# This scripts generates: -# - root CA certificate -# - server certificate and keystore -# - client keys -# -# https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka -# - - -if [[ "$1" == "-k" ]]; then - USE_KEYTOOL=1 - shift -else - USE_KEYTOOL=0 -fi - -OP="$1" -CA_CERT="$2" -PFX="$3" -HOST="$4" - -C=NN -ST=NN -L=NN -O=NN -OU=NN -CN="$HOST" - - -# Password -PASS="abcdefgh" - -# Cert validity, in days -VALIDITY=10000 +# Documentation: +# https://kafka.apache.org/documentation.html#security_ssl +# Content was compiled from: +# https://medium.com/@ahosanhabib.974/enabling-ssl-tls-encryption-for-kafka-with-jks-8186ccc82dd1 +# https://developer.confluent.io/courses/security/hands-on-setting-up-encryption/ set -e -export LC_ALL=C - -if [[ $OP == "ca" && ! -z "$CA_CERT" && ! -z "$3" ]]; then - CN="$3" - openssl req -new -x509 -keyout ${CA_CERT}.key -out $CA_CERT -days $VALIDITY -passin "pass:$PASS" -passout "pass:$PASS" < ca.cnf < sign-ext.cnf < " - echo " $0 [-k] server|client " - echo "" - echo " -k = Use keytool/Java Keystore, else standard SSL keys" - exit 1 -fi +# Generate client Private Key and CSR +openssl genpkey -algorithm RSA -out client.key -pass "pass:$PASS" +openssl req -new -nodes -key client.key -passin "pass:$PASS" -out client.csr \ + -subj "/CN=client" +# Generate the client certificate using the CSR, the CA cert, and private key +openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial \ + -out client.crt -days 3650 -extfile sign-ext.cnf diff --git a/pyproject.toml b/pyproject.toml index 8121a35b..b9e7b2e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ requires = ["setuptools >=61", "wheel", "Cython >=3.0.5"] name = "aiokafka" description = "Kafka integration with asyncio" readme = "README.rst" -requires-python = ">=3.8" +requires-python = ">=3.9" license = { file = "LICENSE" } authors = [ { name = "Andrew Svetlov", email = "andrew.svetlov@gmail.com" }, @@ -14,11 +14,11 @@ classifiers = [ "License :: OSI Approved :: Apache Software License", "Intended Audience :: Developers", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "Operating System :: OS Independent", "Topic :: System :: Networking", "Topic :: System :: Distributed Computing", @@ -159,8 +159,8 @@ pyupgrade.keep-runtime-typing = true docstring-code-format = true [tool.mypy] -python_version = "3.8" -ignore_missing_imports = true +python_version = "3.9" +disable_error_code = "import-untyped" check_untyped_defs = true disallow_any_generics = true disallow_untyped_defs = true diff --git a/requirements-ci.txt b/requirements-ci.txt index 2294b51d..615ffb65 100644 --- a/requirements-ci.txt +++ b/requirements-ci.txt @@ -1,16 +1,13 @@ -r requirements-cython.txt -ruff==0.3.4 -mypy==1.10.0 -pytest==7.4.3 -pytest-cov==4.1.0 -pytest-asyncio==0.21.1 -pytest-mock==3.12.0 -docker==7.0.0 -# TODO Remove this pi after update of `docker` -# https://github.com/docker/docker-py/pull/3257 -requests==2.31.0 -docutils==0.20.1 -Pygments==2.15.0 -gssapi==1.8.3 -async-timeout==4.0.1 -cramjam==2.8.0 +ruff==0.7.0 +mypy==1.12.0 +pytest==8.3.3 +pytest-cov==5.0.0 +pytest-asyncio==0.24.0 +pytest-mock==3.14.0 +docker==7.1.0 +docutils==0.21.2 +Pygments==2.18.0 +gssapi==1.9.0 +async-timeout==4.0.3 +cramjam==2.9.0 diff --git a/requirements-docs.txt b/requirements-docs.txt index e76b61b1..72fe2078 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -1,5 +1,5 @@ -r requirements-cython.txt -Sphinx==7.2.6 +Sphinx==7.4.7 sphinxcontrib-asyncio==0.3.0 sphinxcontrib-spelling==8.0.0 alabaster==0.7.16 diff --git a/requirements-win-test.txt b/requirements-win-test.txt index 2fb51acd..8e97bd1a 100644 --- a/requirements-win-test.txt +++ b/requirements-win-test.txt @@ -1,12 +1,9 @@ -r requirements-cython.txt -ruff==0.3.2 -mypy==1.10.0 -pytest==7.4.3 -pytest-cov==4.1.0 -pytest-asyncio==0.21.1 -pytest-mock==3.12.0 -docker==7.0.0 -# TODO Remove this pi after update of `docker` -# https://github.com/docker/docker-py/pull/3257 -requests==2.31.0 -cramjam==2.8.0 +ruff==0.7.0 +mypy==1.12.0 +pytest==8.3.3 +pytest-cov==5.0.0 +pytest-asyncio==0.24.0 +pytest-mock==3.14.0 +docker==7.1.0 +cramjam==2.9.0 diff --git a/tests/_testutil.py b/tests/_testutil.py index 67cd2f75..a93dd477 100644 --- a/tests/_testutil.py +++ b/tests/_testutil.py @@ -72,7 +72,7 @@ def construct_lambda(s): op_str = s[0:2] # >= <= v_str = s[2:] else: - raise ValueError("Unrecognized kafka version / operator: %s" % s) + raise ValueError(f"Unrecognized kafka version / operator: {s}") op_map = { "=": operator.eq, @@ -444,9 +444,9 @@ def assert_message_count(self, messages, num_messages): def create_ssl_context(self): context = create_ssl_context( - cafile=str(self.ssl_folder / "ca-cert"), - certfile=str(self.ssl_folder / "cl_client.pem"), - keyfile=str(self.ssl_folder / "cl_client.key"), + cafile=str(self.ssl_folder / "ca.crt"), + certfile=str(self.ssl_folder / "client.crt"), + keyfile=str(self.ssl_folder / "client.key"), password="abcdefgh", ) context.check_hostname = False diff --git a/tests/conftest.py b/tests/conftest.py index d582386c..bd4fda2d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -124,7 +124,7 @@ def kafka_image(): @pytest.fixture(scope="session") -def ssl_folder(docker_ip_address, docker, kafka_image): +def ssl_folder(docker, kafka_image): ssl_dir = pathlib.Path("tests/ssl_cert") if ssl_dir.is_dir(): # Skip generating certificates when they already exist. Remove @@ -151,18 +151,13 @@ def ssl_folder(docker_ip_address, docker, kafka_image): ) try: - for args in [ - ["ca", "ca-cert", docker_ip_address], - ["-k", "server", "ca-cert", "br_", docker_ip_address], - ["client", "ca-cert", "cl_", docker_ip_address], - ]: - exit_code, output = container.exec_run( - ["bash", "/gen-ssl-certs.sh", *args], - user=f"{os.getuid()}:{os.getgid()}", - ) - if exit_code != 0: - print(output.decode(), file=sys.stderr) - pytest.exit("Could not generate certificates") + exit_code, output = container.exec_run( + ["bash", "/gen-ssl-certs.sh"], + user=f"{os.getuid()}:{os.getgid()}", + ) + if exit_code != 0: + print(output.decode(), file=sys.stderr) + pytest.exit("Could not generate certificates") finally: container.stop() diff --git a/tests/coordinator/test_assignors.py b/tests/coordinator/test_assignors.py index 86572459..02241dd3 100644 --- a/tests/coordinator/test_assignors.py +++ b/tests/coordinator/test_assignors.py @@ -1,6 +1,7 @@ from collections import defaultdict +from collections.abc import Generator, Sequence from random import randint, sample -from typing import Callable, Dict, Generator, Optional, Sequence, Set +from typing import Callable, Optional from unittest.mock import MagicMock import pytest @@ -27,9 +28,9 @@ def reset_sticky_assignor() -> Generator[None, None, None]: def create_cluster( mocker: MockerFixture, - topics: Set[str], - topics_partitions: Optional[Set[int]] = None, - topic_partitions_lambda: Optional[Callable[[str], Optional[Set[int]]]] = None, + topics: set[str], + topics_partitions: Optional[set[int]] = None, + topic_partitions_lambda: Optional[Callable[[str], Optional[set[int]]]] = None, ) -> MagicMock: cluster = mocker.MagicMock() cluster.topics.return_value = topics @@ -227,7 +228,7 @@ def test_sticky_assignor2(mocker: MockerFixture) -> None: def test_sticky_one_consumer_no_topic(mocker: MockerFixture) -> None: cluster = create_cluster(mocker, topics=set(), topics_partitions=set()) - subscriptions: Dict[str, Set[str]] = { + subscriptions: dict[str, set[str]] = { "C": set(), } member_metadata = make_member_metadata(subscriptions) @@ -714,7 +715,7 @@ def test_stickiness(mocker: MockerFixture) -> None: def test_assignment_updated_for_deleted_topic(mocker: MockerFixture) -> None: - def topic_partitions(topic: str) -> Optional[Set[int]]: + def topic_partitions(topic: str) -> Optional[set[int]]: if topic == "t1": return {0} elif topic == "t3": @@ -962,7 +963,7 @@ def test_assignment_with_conflicting_previous_generations( "C2": 1, "C3": 2, } - member_metadata: Dict[str, ConsumerProtocolMemberMetadata] = {} + member_metadata: dict[str, ConsumerProtocolMemberMetadata] = {} for member in member_assignments: member_metadata[member] = StickyPartitionAssignor._metadata( {"t"}, member_assignments[member], member_generations[member] @@ -975,17 +976,17 @@ def test_assignment_with_conflicting_previous_generations( def make_member_metadata( - subscriptions: Dict[str, Set[str]], -) -> Dict[str, ConsumerProtocolMemberMetadata]: - member_metadata: Dict[str, ConsumerProtocolMemberMetadata] = {} + subscriptions: dict[str, set[str]], +) -> dict[str, ConsumerProtocolMemberMetadata]: + member_metadata: dict[str, ConsumerProtocolMemberMetadata] = {} for member, topics in subscriptions.items(): member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) return member_metadata def assert_assignment( - result_assignment: Dict[str, ConsumerProtocolMemberAssignment], - expected_assignment: Dict[str, ConsumerProtocolMemberAssignment], + result_assignment: dict[str, ConsumerProtocolMemberAssignment], + expected_assignment: dict[str, ConsumerProtocolMemberAssignment], ) -> None: assert result_assignment == expected_assignment assert set(result_assignment) == set(expected_assignment) @@ -996,8 +997,8 @@ def assert_assignment( def verify_validity_and_balance( - subscriptions: Dict[str, Set[str]], - assignment: Dict[str, ConsumerProtocolMemberAssignment], + subscriptions: dict[str, set[str]], + assignment: dict[str, ConsumerProtocolMemberAssignment], ) -> None: """ Verifies that the given assignment is valid with respect to the given subscriptions @@ -1070,7 +1071,7 @@ def verify_validity_and_balance( def group_partitions_by_topic( partitions: Sequence[TopicPartition], -) -> Dict[str, Set[int]]: +) -> dict[str, set[int]]: result = defaultdict(set) for p in partitions: result[p.topic].add(p.partition) diff --git a/tests/record/test_default_records.py b/tests/record/test_default_records.py index 74d893d0..1832d646 100644 --- a/tests/record/test_default_records.py +++ b/tests/record/test_default_records.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Tuple +from typing import Optional from unittest import mock import pytest @@ -11,7 +11,7 @@ DefaultRecordBatchBuilder, ) -HeadersT = List[Tuple[str, Optional[bytes]]] +HeadersT = list[tuple[str, Optional[bytes]]] @pytest.mark.parametrize( @@ -21,10 +21,9 @@ # Gzip header includes timestamp, so checksum varies pytest.param(DefaultRecordBatch.CODEC_GZIP, None, id="gzip"), pytest.param(DefaultRecordBatch.CODEC_SNAPPY, 2171068483, id="snappy"), - # Checksum is - # 462121143 with content size (header = 01101000) - # 1260758266 without content size (header = 01100000) - pytest.param(DefaultRecordBatch.CODEC_LZ4, 1260758266, id="lz4"), + # cramjam uses different parameters for LZ4, so checksum varies from + # version to version + pytest.param(DefaultRecordBatch.CODEC_LZ4, None, id="lz4"), pytest.param(DefaultRecordBatch.CODEC_ZSTD, 1714138923, id="zstd"), ], ) diff --git a/tests/record/test_legacy.py b/tests/record/test_legacy.py index 9faa3bc0..3b6f97a7 100644 --- a/tests/record/test_legacy.py +++ b/tests/record/test_legacy.py @@ -1,5 +1,5 @@ import struct -from typing import Optional, Tuple +from typing import Optional from unittest import mock import pytest @@ -25,7 +25,7 @@ def test_read_write_serde_v0_v1_no_compression( magic: Literal[0, 1], key: Optional[bytes], value: Optional[bytes], - checksum: Tuple[int, int], + checksum: tuple[int, int], ) -> None: builder = LegacyRecordBatchBuilder( magic=magic, compression_type=0, batch_size=1024 * 1024 diff --git a/tests/record/test_util.py b/tests/record/test_util.py index 139043fc..109dbf98 100644 --- a/tests/record/test_util.py +++ b/tests/record/test_util.py @@ -1,11 +1,10 @@ import struct -from typing import List, Tuple import pytest from aiokafka.record import util -varint_data: List[Tuple[bytes, int]] = [ +varint_data: list[tuple[bytes, int]] = [ (b"\x00", 0), (b"\x01", -1), (b"\x02", 1), diff --git a/tests/test_consumer.py b/tests/test_consumer.py index fea0f823..4916a08c 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -46,7 +46,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): async def consumer_factory(self, **kwargs): enable_auto_commit = kwargs.pop("enable_auto_commit", True) auto_offset_reset = kwargs.pop("auto_offset_reset", "earliest") - group = kwargs.pop("group", "group-%s" % self.id()) + group = kwargs.pop("group", f"group-{self.id()}") consumer = AIOKafkaConsumer( self.topic, group_id=group, @@ -148,7 +148,7 @@ def test_create_consumer_no_running_loop(self): async def test_consumer_context_manager(self): await self.send_messages(0, list(range(10))) - group = "group-%s" % self.id() + group = f"group-{self.id()}" consumer = AIOKafkaConsumer( self.topic, group_id=group, @@ -1104,7 +1104,7 @@ async def test_consumer_rebalance_on_new_topic(self): # Wait for group to stabilize assign1 = await listener1.wait_assign() assign2 = await listener2.wait_assign() - # We expect 2 partitons for autocreated topics + # We expect 2 partitions for autocreated topics my_partitions = {TopicPartition(my_topic, 0), TopicPartition(my_topic, 1)} self.assertEqual(assign1 | assign2, my_partitions) self.assertEqual(consumer1.assignment() | consumer2.assignment(), my_partitions) @@ -1118,7 +1118,7 @@ async def test_consumer_rebalance_on_new_topic(self): # Wait for group to stabilize assign1 = await listener1.wait_assign() assign2 = await listener2.wait_assign() - # We expect 2 partitons for autocreated topics + # We expect 2 partitions for autocreated topics my_partitions = { TopicPartition(my_topic, 0), TopicPartition(my_topic, 1), @@ -1578,7 +1578,7 @@ async def test_kafka_consumer_offsets_old_brokers(self): @run_until_complete async def test_kafka_consumer_sets_coordinator_values(self): - group = "test-group-%s" % self.id() + group = f"test-group-{self.id()}" session_timeout_ms = 12345 heartbeat_interval_ms = 3456 retry_backoff_ms = 567 @@ -1627,7 +1627,7 @@ async def test_consumer_manual_assignment_with_group(self): consumer = AIOKafkaConsumer( enable_auto_commit=False, auto_offset_reset="earliest", - group_id="group-%s" % self.id(), + group_id=f"group-{self.id()}", bootstrap_servers=self.hosts, ) tp = TopicPartition(self.topic, 0) @@ -1647,7 +1647,7 @@ async def test_consumer_manual_assignment_with_group(self): consumer = AIOKafkaConsumer( enable_auto_commit=False, auto_offset_reset="earliest", - group_id="group-%s" % self.id(), + group_id=f"group-{self.id()}", bootstrap_servers=self.hosts, ) tp = TopicPartition(self.topic, 0) diff --git a/tests/test_coordinator.py b/tests/test_coordinator.py index 58f03013..57a11483 100644 --- a/tests/test_coordinator.py +++ b/tests/test_coordinator.py @@ -945,9 +945,7 @@ def force_metadata_update(): client.force_metadata_update.side_effect = force_metadata_update async def ready(node_id, group=None): - if node_id == 0: - return True - return False + return node_id == 0 client.ready.side_effect = ready client.coordinator_lookup = mock.Mock() diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 2522d44d..08dbcdc6 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,14 +1,13 @@ import ssl from pathlib import Path -from typing import Tuple from aiokafka.helpers import create_ssl_context -def _check_ssl_dir(ssl_folder: Path) -> Tuple[Path, Path, Path]: - cafile = ssl_folder / "ca-cert" - certfile = ssl_folder / "cl_client.pem" - keyfile = ssl_folder / "cl_client.key" +def _check_ssl_dir(ssl_folder: Path) -> tuple[Path, Path, Path]: + cafile = ssl_folder / "ca.crt" + certfile = ssl_folder / "client.crt" + keyfile = ssl_folder / "client.key" assert ssl_folder.exists(), str(ssl_folder) cafile.exists(), str(cafile) certfile.exists(), str(certfile) diff --git a/tests/test_protocol.py b/tests/test_protocol.py index f023325f..05df8282 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -1,6 +1,5 @@ import io import struct -from typing import Type import pytest @@ -344,7 +343,7 @@ def test_compact_data_structs() -> None: assert arr.decode(io.BytesIO(b"\x00")) is None enc = arr.encode([]) assert enc == b"\x01" - assert [] == arr.decode(io.BytesIO(enc)) + assert arr.decode(io.BytesIO(enc)) == [] encoded = arr.encode(["foo", "bar", "baz", "quux"]) assert arr.decode(io.BytesIO(encoded)) == ["foo", "bar", "baz", "quux"] @@ -368,7 +367,7 @@ def test_compact_data_structs() -> None: @pytest.mark.parametrize("klass", Request.__subclasses__()) @pytest.mark.parametrize("attr_name", attr_names) -def test_request_type_conformance(klass: Type[Request], attr_name: str) -> None: +def test_request_type_conformance(klass: type[Request], attr_name: str) -> None: assert hasattr(klass, attr_name) @@ -382,5 +381,5 @@ def test_request_type_conformance(klass: Type[Request], attr_name: str) -> None: @pytest.mark.parametrize("klass", Response.__subclasses__()) @pytest.mark.parametrize("attr_name", attr_names) -def test_response_type_conformance(klass: Type[Response], attr_name: str) -> None: +def test_response_type_conformance(klass: type[Response], attr_name: str) -> None: assert hasattr(klass, attr_name) diff --git a/tests/test_protocol_object_conversion.py b/tests/test_protocol_object_conversion.py index e9987366..3394a893 100644 --- a/tests/test_protocol_object_conversion.py +++ b/tests/test_protocol_object_conversion.py @@ -1,16 +1,16 @@ -from typing import Type, TypeVar, Union +from typing import TypeVar, Union import pytest from aiokafka.protocol.admin import Request, Response from aiokafka.protocol.types import Array, Int16, Schema, String -C = TypeVar("C", bound=Type[Union[Request, Response]]) +C = TypeVar("C", bound=type[Union[Request, Response]]) def _make_test_class( - klass: Type[Union[Request, Response]], schema: Schema -) -> Type[Union[Request, Response]]: + klass: type[Union[Request, Response]], schema: Schema +) -> type[Union[Request, Response]]: if klass is Request: class RequestTestClass(Request): @@ -32,7 +32,7 @@ class ResponseTestClass(Response): @pytest.mark.parametrize("superclass", (Request, Response)) class TestObjectConversion: - def test_get_item(self, superclass: Type[Union[Request, Response]]) -> None: + def test_get_item(self, superclass: type[Union[Request, Response]]) -> None: TestClass = _make_test_class(superclass, Schema(("myobject", Int16))) tc = TestClass(myobject=0) @@ -41,7 +41,7 @@ def test_get_item(self, superclass: Type[Union[Request, Response]]) -> None: tc.get_item("does-not-exist") def test_with_empty_schema( - self, superclass: Type[Union[Request, Response]] + self, superclass: type[Union[Request, Response]] ) -> None: TestClass = _make_test_class(superclass, Schema()) @@ -50,7 +50,7 @@ def test_with_empty_schema( assert tc.to_object() == {} def test_with_basic_schema( - self, superclass: Type[Union[Request, Response]] + self, superclass: type[Union[Request, Response]] ) -> None: TestClass = _make_test_class(superclass, Schema(("myobject", Int16))) @@ -59,7 +59,7 @@ def test_with_basic_schema( assert tc.to_object() == {"myobject": 0} def test_with_basic_array_schema( - self, superclass: Type[Union[Request, Response]] + self, superclass: type[Union[Request, Response]] ) -> None: TestClass = _make_test_class(superclass, Schema(("myarray", Array(Int16)))) @@ -68,7 +68,7 @@ def test_with_basic_array_schema( assert tc.to_object()["myarray"] == [1, 2, 3] def test_with_complex_array_schema( - self, superclass: Type[Union[Request, Response]] + self, superclass: type[Union[Request, Response]] ) -> None: TestClass = _make_test_class( superclass, @@ -88,7 +88,7 @@ def test_with_complex_array_schema( assert obj["myarray"][0]["othersubobject"] == "hello" def test_with_array_and_other( - self, superclass: Type[Union[Request, Response]] + self, superclass: type[Union[Request, Response]] ) -> None: TestClass = _make_test_class( superclass, @@ -110,7 +110,7 @@ def test_with_array_and_other( assert obj["notarray"] == 42 def test_with_nested_array( - self, superclass: Type[Union[Request, Response]] + self, superclass: type[Union[Request, Response]] ) -> None: TestClass = _make_test_class( superclass, @@ -138,7 +138,7 @@ def test_with_nested_array( assert obj["myarray"][1]["otherobject"] == 4 def test_with_complex_nested_array( - self, superclass: Type[Union[Request, Response]] + self, superclass: type[Union[Request, Response]] ) -> None: TestClass = _make_test_class( superclass,