Skip to content

Commit

Permalink
Test with newer broker versions (#804)
Browse files Browse the repository at this point in the history
* Docker images for newer kafka versions

* Drop stub files for python-kafka

* Drop code that duplicates python-kafka

* Infer broker versions up to 2.5

* Test with newer versions in CI
  • Loading branch information
ods authored Dec 30, 2021
1 parent afe8820 commit 6b15132
Show file tree
Hide file tree
Showing 105 changed files with 76 additions and 3,541 deletions.
30 changes: 21 additions & 9 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,21 @@ jobs:
matrix:
include:
- python: 3.9
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"

# Older python versions against latest broker
- python: 3.6
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"
- python: 3.7
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"
- python: 3.8
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"

# Older brokers against latest python version
# Older/newer brokers against latest python version
- python: 3.9
kafka: "0.9.0.1"
scala: "2.11"
Expand All @@ -256,6 +256,18 @@ jobs:
- python: 3.9
kafka: "2.3.1"
scala: "2.12"
- python: 3.9
kafka: "2.4.1"
scala: "2.12"
- python: 3.9
kafka: "2.5.1"
scala: "2.12"
- python: 3.9
kafka: "2.6.3"
scala: "2.12"
- python: 3.9
kafka: "2.7.2"
scala: "2.13"
fail-fast: false

steps:
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Some simple testing tasks (sorry, UNIX only).

FLAGS?=--maxfail=3
SCALA_VERSION?=2.12
KAFKA_VERSION?=2.2.2
SCALA_VERSION?=2.13
KAFKA_VERSION?=2.8.1
DOCKER_IMAGE=aiolibs/kafka:$(SCALA_VERSION)_$(KAFKA_VERSION)
DIFF_BRANCH=origin/master
FORMATTED_AREAS=aiokafka/util.py aiokafka/structs.py
Expand Down
30 changes: 19 additions & 11 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
import time

from kafka.conn import collect_hosts
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.admin import DescribeAclsRequest_v2
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka import __version__
from aiokafka.conn import create_conn, CloseReason
from aiokafka.cluster import ClusterMetadata
from aiokafka.protocol.coordination import FindCoordinatorRequest
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.errors import (
KafkaError,
KafkaConnectionError,
Expand Down Expand Up @@ -581,13 +583,19 @@ def _check_api_version_response(self, response):
# in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
((2, 3, 0), FetchRequest[0].API_KEY, 11),
((2, 1, 0), MetadataRequest[0].API_KEY, 7),
((1, 1, 0), FetchRequest[0].API_KEY, 7),
((1, 0, 0), MetadataRequest[0].API_KEY, 5),
((0, 11, 0), MetadataRequest[0].API_KEY, 4),
((0, 10, 2), OffsetFetchRequest[0].API_KEY, 2),
((0, 10, 1), MetadataRequest[0].API_KEY, 2),
# TODO Requires unreleased version of python-kafka
# ((2, 6, 0), DescribeClientQuotasRequest[0]),
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
((2, 3, 0), FetchRequest[11]),
((2, 2, 0), OffsetRequest[5]),
((2, 1, 0), FetchRequest[10]),
((2, 0, 0), FetchRequest[8]),
((1, 1, 0), FetchRequest[7]),
((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
]

error_type = Errors.for_code(response.error_code)
Expand All @@ -597,8 +605,8 @@ def _check_api_version_response(self, response):
for api_key, _, max_version in response.api_versions
}
# Get the best match of test cases
for broker_version, api_key, version in test_cases:
if max_versions.get(api_key, -1) >= version:
for broker_version, struct in test_cases:
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
return broker_version

# We know that ApiVersionResponse is only supported in 0.10+
Expand Down
2 changes: 1 addition & 1 deletion aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import async_timeout
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.fetch import FetchRequest

from aiokafka.protocol.fetch import FetchRequest
import aiokafka.errors as Errors
from aiokafka.errors import (
ConsumerStoppedError, RecordTooLargeError, KafkaTimeoutError)
Expand Down
3 changes: 2 additions & 1 deletion aiokafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import time

from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.errors import (
Expand All @@ -14,7 +16,6 @@
OutOfOrderSequenceNumber, TopicAuthorizationFailedError,
GroupAuthorizationFailedError, TransactionalIdAuthorizationFailed,
OperationNotAttempted)
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.protocol.transaction import (
InitProducerIdRequest, AddPartitionsToTxnRequest, EndTxnRequest,
AddOffsetsToTxnRequest, TxnOffsetCommitRequest
Expand Down
212 changes: 0 additions & 212 deletions aiokafka/protocol/fetch.py

This file was deleted.

Loading

0 comments on commit 6b15132

Please sign in to comment.