Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python 3.13, remove 3.8 #1061

Merged
merged 7 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: "3.8"
python-version: "3.9"
- name: Prepare C files to include
run: |
python -m pip install --upgrade pip build
Expand Down Expand Up @@ -52,7 +52,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: "3.8"
python-version: "3.9"
- name: Set up QEMU
if: ${{ matrix.arch == 'aarch64' }}
uses: docker/setup-qemu-action@v1
Expand Down Expand Up @@ -82,10 +82,8 @@ jobs:

strategy:
matrix:
python: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python: ["3.9", "3.10", "3.11", "3.12", "3.13"]
include:
- python: "3.8"
aiokafka_whl: dist/aiokafka-*-cp38-cp38-win_amd64.whl
- python: "3.9"
aiokafka_whl: dist/aiokafka-*-cp39-cp39-win_amd64.whl
- python: "3.10"
Expand All @@ -94,6 +92,8 @@ jobs:
aiokafka_whl: dist/aiokafka-*-cp311-cp311-win_amd64.whl
- python: "3.12"
aiokafka_whl: dist/aiokafka-*-cp312-cp312-win_amd64.whl
- python: "3.13"
aiokafka_whl: dist/aiokafka-*-cp313-cp313-win_amd64.whl

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -127,10 +127,8 @@ jobs:

strategy:
matrix:
python: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python: ["3.9", "3.10", "3.11", "3.12", "3.13"]
include:
- python: "3.8"
aiokafka_whl: dist/aiokafka-*-cp38-cp38-macosx_*_x86_64.whl
- python: "3.9"
aiokafka_whl: dist/aiokafka-*-cp39-cp39-macosx_*_x86_64.whl
- python: "3.10"
Expand All @@ -139,6 +137,8 @@ jobs:
aiokafka_whl: dist/aiokafka-*-cp311-cp311-macosx_*_x86_64.whl
- python: "3.12"
aiokafka_whl: dist/aiokafka-*-cp312-cp312-macosx_*_x86_64.whl
- python: "3.13"
aiokafka_whl: dist/aiokafka-*-cp313-cp313-macosx_*_x86_64.whl

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -170,10 +170,8 @@ jobs:

strategy:
matrix:
python: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python: ["3.9", "3.10", "3.11", "3.12", "3.13"]
include:
- python: "3.8"
aiokafka_whl: dist/aiokafka-*-cp38-cp38-macosx_*_arm64.whl
- python: "3.9"
aiokafka_whl: dist/aiokafka-*-cp39-cp39-macosx_*_arm64.whl
- python: "3.10"
Expand All @@ -182,6 +180,8 @@ jobs:
aiokafka_whl: dist/aiokafka-*-cp311-cp311-macosx_*_arm64.whl
- python: "3.12"
aiokafka_whl: dist/aiokafka-*-cp312-cp312-macosx_*_arm64.whl
- python: "3.13"
aiokafka_whl: dist/aiokafka-*-cp313-cp313-macosx_*_arm64.whl

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -213,10 +213,8 @@ jobs:

strategy:
matrix:
python: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python: ["3.9", "3.10", "3.11", "3.12", "3.13"]
include:
- python: "3.8"
aiokafka_whl: dist/aiokafka-*-cp38-cp38-manylinux*_x86_64.whl
- python: "3.9"
aiokafka_whl: dist/aiokafka-*-cp39-cp39-manylinux*_x86_64.whl
- python: "3.10"
Expand All @@ -225,6 +223,8 @@ jobs:
aiokafka_whl: dist/aiokafka-*-cp311-cp311-manylinux*_x86_64.whl
- python: "3.12"
aiokafka_whl: dist/aiokafka-*-cp312-cp312-manylinux*_x86_64.whl
- python: "3.13"
aiokafka_whl: dist/aiokafka-*-cp313-cp313-manylinux*_x86_64.whl

steps:
- uses: actions/checkout@v2
Expand Down
34 changes: 17 additions & 17 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:

strategy:
matrix:
python: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -141,7 +141,7 @@ jobs:

strategy:
matrix:
python: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -210,14 +210,11 @@ jobs:
strategy:
matrix:
include:
- python: "3.12"
- python: "3.13"
kafka: "2.8.1"
scala: "2.13"

# Older python versions against latest broker
- python: "3.8"
kafka: "2.8.1"
scala: "2.13"
- python: "3.9"
kafka: "2.8.1"
scala: "2.13"
Expand All @@ -227,39 +224,42 @@ jobs:
- python: "3.11"
kafka: "2.8.1"
scala: "2.13"
- python: "3.12"
kafka: "2.8.1"
scala: "2.13"

# Older brokers against latest python version
- python: "3.12"
- python: "3.13"
kafka: "0.9.0.1"
scala: "2.11"
- python: "3.12"
- python: "3.13"
kafka: "0.10.2.1"
scala: "2.11"
- python: "3.12"
- python: "3.13"
kafka: "0.11.0.3"
scala: "2.12"
- python: "3.12"
- python: "3.13"
kafka: "1.1.1"
scala: "2.12"
- python: "3.12"
- python: "3.13"
kafka: "2.1.1"
scala: "2.12"
- python: "3.12"
- python: "3.13"
kafka: "2.2.2"
scala: "2.12"
- python: "3.12"
- python: "3.13"
kafka: "2.3.1"
scala: "2.12"
- python: "3.12"
- python: "3.13"
kafka: "2.4.1"
scala: "2.12"
- python: "3.12"
- python: "3.13"
kafka: "2.5.1"
scala: "2.12"
- python: "3.12"
- python: "3.13"
kafka: "2.6.3"
scala: "2.12"
- python: "3.12"
- python: "3.13"
kafka: "2.7.2"
scala: "2.13"
fail-fast: false
Expand Down
55 changes: 28 additions & 27 deletions aiokafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import logging
from collections import defaultdict
from collections.abc import Sequence
from ssl import SSLContext
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union
from typing import Any, Optional, Union

import async_timeout

Expand Down Expand Up @@ -88,7 +89,7 @@ def __init__(
self,
*,
loop=None,
bootstrap_servers: Union[str, List[str]] = "localhost",
bootstrap_servers: Union[str, list[str]] = "localhost",
client_id: str = "aiokafka-" + __version__,
request_timeout_ms: int = 40000,
connections_max_idle_ms: int = 540000,
Expand Down Expand Up @@ -159,7 +160,7 @@ async def start(self):
log.debug("AIOKafkaAdminClient started")
self._started = True

def _matching_api_version(self, operation: Sequence[Type[Request]]) -> int:
def _matching_api_version(self, operation: Sequence[type[Request]]) -> int:
"""Find the latest version of the protocol operation
supported by both this library and the broker.

Expand Down Expand Up @@ -225,7 +226,7 @@ def _convert_new_topic_request(new_topic):

async def create_topics(
self,
new_topics: List[NewTopic],
new_topics: list[NewTopic],
timeout_ms: Optional[int] = None,
validate_only: bool = False,
) -> Response:
Expand Down Expand Up @@ -267,7 +268,7 @@ async def create_topics(

async def delete_topics(
self,
topics: List[str],
topics: list[str],
timeout_ms: Optional[int] = None,
) -> Response:
"""Delete topics from the cluster.
Expand All @@ -284,7 +285,7 @@ async def delete_topics(

async def _get_cluster_metadata(
self,
topics: Optional[List[str]] = None,
topics: Optional[list[str]] = None,
) -> Response:
"""
Retrieve cluster metadata
Expand All @@ -295,30 +296,30 @@ async def _get_cluster_metadata(
request = req_cls(topics=topics)
return await self._send_request(request)

async def list_topics(self) -> List[str]:
async def list_topics(self) -> list[str]:
metadata = await self._get_cluster_metadata(topics=None)
obj = metadata.to_object()
return [t["topic"] for t in obj["topics"]]

async def describe_topics(
self,
topics: Optional[List[str]] = None,
) -> List[Any]:
topics: Optional[list[str]] = None,
) -> list[Any]:
metadata = await self._get_cluster_metadata(topics=topics)
obj = metadata.to_object()
return obj["topics"]

async def describe_cluster(self) -> Dict[str, Any]:
async def describe_cluster(self) -> dict[str, Any]:
metadata = await self._get_cluster_metadata()
obj = metadata.to_object()
obj.pop("topics") # We have 'describe_topics' for this
return obj

async def describe_configs(
self,
config_resources: List[ConfigResource],
config_resources: list[ConfigResource],
include_synonyms: bool = False,
) -> List[Response]:
) -> list[Response]:
"""Fetch configuration parameters for one or more Kafka resources.

:param config_resources: An list of ConfigResource objects.
Expand Down Expand Up @@ -360,8 +361,8 @@ async def describe_configs(
return await asyncio.gather(*futures)

async def alter_configs(
self, config_resources: List[ConfigResource]
) -> List[Response]:
self, config_resources: list[ConfigResource]
) -> list[Response]:
"""Alter configuration parameters of one or more Kafka resources.
:param config_resources: A list of ConfigResource objects.
:return: Appropriate version of AlterConfigsResponse class.
Expand Down Expand Up @@ -398,9 +399,9 @@ def _convert_alter_config_resource_request(config_resource):
@classmethod
def _convert_config_resources(
cls,
config_resources: List[ConfigResource],
config_resources: list[ConfigResource],
op_type: str = "describe",
) -> Tuple[Dict[int, Any], List[Any]]:
) -> tuple[dict[int, Any], list[Any]]:
broker_resources = defaultdict(list)
topic_resources = []
if op_type == "describe":
Expand All @@ -416,15 +417,15 @@ def _convert_config_resources(
return broker_resources, topic_resources

@staticmethod
def _convert_topic_partitions(topic_partitions: Dict[str, NewPartitions]):
def _convert_topic_partitions(topic_partitions: dict[str, NewPartitions]):
return [
(topic_name, (new_part.total_count, new_part.new_assignments))
for topic_name, new_part in topic_partitions.items()
]

async def create_partitions(
self,
topic_partitions: Dict[str, NewPartitions],
topic_partitions: dict[str, NewPartitions],
timeout_ms: Optional[int] = None,
validate_only: bool = False,
) -> Response:
Expand Down Expand Up @@ -455,10 +456,10 @@ async def create_partitions(

async def describe_consumer_groups(
self,
group_ids: List[str],
group_ids: list[str],
group_coordinator_id: Optional[int] = None,
include_authorized_operations: bool = False,
) -> List[Response]:
) -> list[Response]:
"""Describe a set of consumer groups.

Any errors are immediately raised.
Expand Down Expand Up @@ -508,8 +509,8 @@ async def describe_consumer_groups(

async def list_consumer_groups(
self,
broker_ids: Optional[List[int]] = None,
) -> List[Tuple[Any, ...]]:
broker_ids: Optional[list[int]] = None,
) -> list[tuple[Any, ...]]:
"""List all consumer groups known to the cluster.

This returns a list of Consumer Group tuples. The tuples are
Expand Down Expand Up @@ -578,8 +579,8 @@ async def list_consumer_group_offsets(
self,
group_id: str,
group_coordinator_id: Optional[int] = None,
partitions: Optional[List[TopicPartition]] = None,
) -> Dict[TopicPartition, OffsetAndMetadata]:
partitions: Optional[list[TopicPartition]] = None,
) -> dict[TopicPartition, OffsetAndMetadata]:
"""Fetch Consumer Offsets for a single consumer group.

Note:
Expand Down Expand Up @@ -636,9 +637,9 @@ async def list_consumer_group_offsets(

async def delete_records(
self,
records_to_delete: Dict[TopicPartition, RecordsToDelete],
records_to_delete: dict[TopicPartition, RecordsToDelete],
timeout_ms: Optional[int] = None,
) -> Dict[TopicPartition, int]:
) -> dict[TopicPartition, int]:
"""Delete records from partitions.

:param records_to_delete: A map of RecordsToDelete for each TopicPartition
Expand Down Expand Up @@ -681,7 +682,7 @@ async def delete_records(

@staticmethod
def _convert_records_to_delete(
records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]],
records_to_delete: dict[str, list[tuple[int, RecordsToDelete]]],
):
return [
(topic, [(partition, rec.before_offset) for partition, rec in records])
Expand Down
6 changes: 2 additions & 4 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _get_conn_lock(self):
return self._get_conn_lock_value

def __repr__(self):
return "<AIOKafkaClient client_id=%s>" % self._client_id
return f"<AIOKafkaClient client_id={self._client_id}>"

@property
def api_version(self):
Expand Down Expand Up @@ -483,9 +483,7 @@ async def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT, no_hint=Fal

async def ready(self, node_id, *, group=ConnectionGroup.DEFAULT):
conn = await self._get_conn(node_id, group=group)
if conn is None:
return False
return True
return conn is not None

async def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
"""Send a request to a specific node.
Expand Down
Loading
Loading