diff --git a/.appveyor.yml b/.appveyor.yml index cabbd8f21..76d2511b7 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -1,6 +1,6 @@ environment: global: - LIBRDKAFKA_NUGET_VERSION: 1.5.0 + LIBRDKAFKA_NUGET_VERSION: 1.5.2 CIBW_SKIP: cp33-* cp34-* CIBW_TEST_REQUIRES: pytest pytest-timeout requests trivup # SDK v7.0 MSVC Express 2008's SetEnv.cmd script will fail if the @@ -22,6 +22,7 @@ install: - SET PATH=%PYTHON%;%PYTHON%\\Scripts;%PATH% - python --version - python -m pip install -U pip + - python -m pip install -U -r tests/requirements.txt build_script: - tools/windows-build.bat diff --git a/.travis.yml b/.travis.yml index e1355acc1..f9e8807de 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,33 +1,33 @@ env: global: - - LIBRDKAFKA_VERSION=v1.5.0 + - LIBRDKAFKA_VERSION=v1.5.2 jobs: include: - # Source package verification with Python 2.7 - - os: linux + - name: "Source package verification with Python 2.7 (Linux)" + os: linux language: python dist: trusty python: "2.7" env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" - # Source package verification with Python 3.6 - - os: linux + - name: "Source package verification with Python 3.6 (Linux)" + os: linux language: python dist: trusty python: "3.6" env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" - # Source package verification with Python 2.7 - - os: osx + - name: "Source package verification with Python 2.7 (OSX)" + os: osx python: "2.7" env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" INTERPRETER_VERSION="2.7.17" - # Source package verification with Python 3.6 - - os: osx + - name: "Source package verification with Python 3.6 (OSX) +docs" + os: osx python: "3.6" env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" MK_DOCS="y" INTERPRETER_VERSION="3.6.5" - # cibuildwheel for osx - - os: osx + - name: "cibuildwheel (OSX)" + os: osx env: CIBW_BEFORE_BUILD="tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp" CFLAGS="-Itmp/include" LDFLAGS="-Ltmp/lib" INTERPRETER_VERSION="2.7.17" - # cibuildwheel for manylinux - - os: linux + - name: "cibuildwheel (manylinux)" + os: linux dist: trusty env: - CIBW_BEFORE_BUILD="tools/prepare-cibuildwheel-linux.sh ${LIBRDKAFKA_VERSION}" @@ -53,22 +53,20 @@ before_install: install: - tools/install-interceptors.sh - pip install -r tests/requirements.txt + - pip install tox - flake8 - if [[ $MK_DOCS == y ]]; then pip install -r docs/requirements.txt; fi - if [[ -z $CIBW_BEFORE_BUILD ]]; then tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build && pip install --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro]; fi # Build wheels -script: - - if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then cibuildwheel --output-dir wheelhouse1 && tools/fixup-wheels.sh wheelhouse1 wheelhouse; fi - # Make plugins available for tests # Execute tests if not CIBW_BEFORE_BUILD [osx, linux] # Execute integration tests if CIBW_BEFORE_BUILD # Build docs if MK_DOCS -after_script: +script: + - if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then cibuildwheel --output-dir wheelhouse1 && tools/fixup-wheels.sh wheelhouse1 wheelhouse; fi - ldd staging/libs/* || otool -L staging/libs/* || true - - if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "osx" ]]; then DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:staging/libs py.test --timeout=60 --ignore=tmp-build --import-mode append; fi - - if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "linux" ]]; then LD_LIBRARY_PATH=$LD_LIBRARY_PATH:staging/libs py.test --timeout=60 --ignore=tmp-build --import-mode append; fi + - [[ -n $CIBW_BEFORE_BUILD ]] || LD_LIBRARY_PATH=$LD_LIBRARY_PATH:staging/libs DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:staging/libs tox - if [[ -n $TRAVIS_TAG && $TRAVIS_OS_NAME == osx && -n $CIBW_BEFORE_BUILD ]]; then tools/test-wheel.sh wheelhouse; fi - if [[ $MK_DOCS == y ]]; then make docs; fi diff --git a/CHANGELOG.md b/CHANGELOG.md index e22c274cf..dc6a6ddae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,28 @@ # Confluent's Python client for Apache Kafka +## v1.5.2 + +v1.5.2 is a maintenance release with the following fixing and enhancements: + + - Add producer purge method with optional blocking argument (@peteryin21, #548) + - Add AdminClient.list_groups API (@messense, #948) + - Rename asyncio.py example to avoid circular import (#945) + - Upgrade bundled OpenSSL to v1.1.1h (from v1.0.2u) + - The Consumer destructor will no longer trigger `consumer.close()` + callbacks, `consumer.close()` must now be explicitly called to cleanly + close down the consumer and leave the group. + - Fix `PY_SSIZE_T_CLEAN` warning in calls to produce(). + - Restructure source tree to avoid undesired local imports of confluent_kafka + when running pytest. + +confluent-kafka-python is based on librdkafka v1.5.2, see the +[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.5.2) +for a complete list of changes, enhancements, fixes and upgrade considerations. + + +**Note: There was no v1.5.1 release** + + ## v1.5.0 v1.5.0 is a maintenance release with the following fixes and enhancements: diff --git a/MANIFEST.in b/MANIFEST.in index ccfa3dbb2..4590b1dba 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ include README.md include LICENSE.txt include test-requirements.txt -include confluent_kafka/src/*.[ch] +include src/confluent_kafka/src/*.[ch] diff --git a/docs/conf.py b/docs/conf.py index 9dc18b06d..f096b3146 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -57,9 +57,9 @@ # built documents. # # The short X.Y version. -version = '1.5.0' +version = '1.5.2' # The full version, including alpha/beta/rc tags. -release = '1.5.0' +release = version # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/examples/docker/Dockerfile.alpine b/examples/docker/Dockerfile.alpine index 006eb0c91..67e3ad9ba 100644 --- a/examples/docker/Dockerfile.alpine +++ b/examples/docker/Dockerfile.alpine @@ -30,7 +30,7 @@ FROM alpine:3.12 COPY . /usr/src/confluent-kafka-python -ENV LIBRDKAFKA_VERSION v1.5.0 +ENV LIBRDKAFKA_VERSION v1.5.2 ENV KAFKACAT_VERSION master diff --git a/setup.py b/setup.py index 4157b5f8b..b590dddb4 100755 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ import platform work_dir = os.path.dirname(os.path.realpath(__file__)) -mod_dir = os.path.join(work_dir, 'confluent_kafka') +mod_dir = os.path.join(work_dir, 'src', 'confluent_kafka') ext_dir = os.path.join(mod_dir, 'src') INSTALL_REQUIRES = [ @@ -64,13 +64,14 @@ def get_install_requirements(path): setup(name='confluent-kafka', # Make sure to bump CFL_VERSION* in confluent_kafka/src/confluent_kafka.h # and version and release in docs/conf.py. - version='1.5.0', + version='1.5.2', description='Confluent\'s Python client for Apache Kafka', author='Confluent Inc', author_email='support@confluent.io', url='https://github.com/confluentinc/confluent-kafka-python', ext_modules=[module], - packages=find_packages(exclude=("tests", "tests.*")), + packages=find_packages('src'), + package_dir={'': 'src'}, data_files=[('', [os.path.join(work_dir, 'LICENSE.txt')])], install_requires=INSTALL_REQUIRES, extras_require={ diff --git a/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py similarity index 100% rename from confluent_kafka/__init__.py rename to src/confluent_kafka/__init__.py diff --git a/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py similarity index 100% rename from confluent_kafka/admin/__init__.py rename to src/confluent_kafka/admin/__init__.py diff --git a/confluent_kafka/avro/__init__.py b/src/confluent_kafka/avro/__init__.py similarity index 100% rename from confluent_kafka/avro/__init__.py rename to src/confluent_kafka/avro/__init__.py diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/src/confluent_kafka/avro/cached_schema_registry_client.py similarity index 100% rename from confluent_kafka/avro/cached_schema_registry_client.py rename to src/confluent_kafka/avro/cached_schema_registry_client.py diff --git a/confluent_kafka/avro/error.py b/src/confluent_kafka/avro/error.py similarity index 100% rename from confluent_kafka/avro/error.py rename to src/confluent_kafka/avro/error.py diff --git a/confluent_kafka/avro/load.py b/src/confluent_kafka/avro/load.py similarity index 100% rename from confluent_kafka/avro/load.py rename to src/confluent_kafka/avro/load.py diff --git a/confluent_kafka/avro/requirements.txt b/src/confluent_kafka/avro/requirements.txt similarity index 100% rename from confluent_kafka/avro/requirements.txt rename to src/confluent_kafka/avro/requirements.txt diff --git a/confluent_kafka/avro/serializer/__init__.py b/src/confluent_kafka/avro/serializer/__init__.py similarity index 100% rename from confluent_kafka/avro/serializer/__init__.py rename to src/confluent_kafka/avro/serializer/__init__.py diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/src/confluent_kafka/avro/serializer/message_serializer.py similarity index 100% rename from confluent_kafka/avro/serializer/message_serializer.py rename to src/confluent_kafka/avro/serializer/message_serializer.py diff --git a/confluent_kafka/deserializing_consumer.py b/src/confluent_kafka/deserializing_consumer.py similarity index 100% rename from confluent_kafka/deserializing_consumer.py rename to src/confluent_kafka/deserializing_consumer.py diff --git a/confluent_kafka/error.py b/src/confluent_kafka/error.py similarity index 100% rename from confluent_kafka/error.py rename to src/confluent_kafka/error.py diff --git a/confluent_kafka/kafkatest/README.md b/src/confluent_kafka/kafkatest/README.md similarity index 100% rename from confluent_kafka/kafkatest/README.md rename to src/confluent_kafka/kafkatest/README.md diff --git a/confluent_kafka/kafkatest/__init__.py b/src/confluent_kafka/kafkatest/__init__.py similarity index 100% rename from confluent_kafka/kafkatest/__init__.py rename to src/confluent_kafka/kafkatest/__init__.py diff --git a/confluent_kafka/kafkatest/deploy.sh b/src/confluent_kafka/kafkatest/deploy.sh similarity index 100% rename from confluent_kafka/kafkatest/deploy.sh rename to src/confluent_kafka/kafkatest/deploy.sh diff --git a/confluent_kafka/kafkatest/globals.json b/src/confluent_kafka/kafkatest/globals.json similarity index 100% rename from confluent_kafka/kafkatest/globals.json rename to src/confluent_kafka/kafkatest/globals.json diff --git a/confluent_kafka/kafkatest/verifiable_client.py b/src/confluent_kafka/kafkatest/verifiable_client.py similarity index 93% rename from confluent_kafka/kafkatest/verifiable_client.py rename to src/confluent_kafka/kafkatest/verifiable_client.py index 761e69cfa..714783e57 100644 --- a/confluent_kafka/kafkatest/verifiable_client.py +++ b/src/confluent_kafka/kafkatest/verifiable_client.py @@ -86,10 +86,12 @@ def set_config(conf, args): if n == 'partition.assignment.strategy': # Convert Java class name to config value. # "org.apache.kafka.clients.consumer.RangeAssignor" -> "range" - conf[n] = re.sub(r'org.apache.kafka.clients.consumer.(\w+)Assignor', - lambda x: x.group(1).lower(), v) - else: - conf[n] = v + v = re.sub(r'org.apache.kafka.clients.consumer.(\w+)Assignor', + lambda x: x.group(1).lower(), v) + if v == 'sticky': + v = 'cooperative-sticky' + + conf[n] = v @staticmethod def read_config_file(path): diff --git a/confluent_kafka/kafkatest/verifiable_consumer.py b/src/confluent_kafka/kafkatest/verifiable_consumer.py similarity index 100% rename from confluent_kafka/kafkatest/verifiable_consumer.py rename to src/confluent_kafka/kafkatest/verifiable_consumer.py diff --git a/confluent_kafka/kafkatest/verifiable_producer.py b/src/confluent_kafka/kafkatest/verifiable_producer.py similarity index 100% rename from confluent_kafka/kafkatest/verifiable_producer.py rename to src/confluent_kafka/kafkatest/verifiable_producer.py diff --git a/confluent_kafka/requirements.txt b/src/confluent_kafka/requirements.txt similarity index 100% rename from confluent_kafka/requirements.txt rename to src/confluent_kafka/requirements.txt diff --git a/confluent_kafka/schema_registry/__init__.py b/src/confluent_kafka/schema_registry/__init__.py similarity index 100% rename from confluent_kafka/schema_registry/__init__.py rename to src/confluent_kafka/schema_registry/__init__.py diff --git a/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py similarity index 100% rename from confluent_kafka/schema_registry/avro.py rename to src/confluent_kafka/schema_registry/avro.py diff --git a/confluent_kafka/schema_registry/error.py b/src/confluent_kafka/schema_registry/error.py similarity index 100% rename from confluent_kafka/schema_registry/error.py rename to src/confluent_kafka/schema_registry/error.py diff --git a/confluent_kafka/schema_registry/json_schema.py b/src/confluent_kafka/schema_registry/json_schema.py similarity index 100% rename from confluent_kafka/schema_registry/json_schema.py rename to src/confluent_kafka/schema_registry/json_schema.py diff --git a/confluent_kafka/schema_registry/protobuf.py b/src/confluent_kafka/schema_registry/protobuf.py similarity index 100% rename from confluent_kafka/schema_registry/protobuf.py rename to src/confluent_kafka/schema_registry/protobuf.py diff --git a/confluent_kafka/schema_registry/requirements.txt b/src/confluent_kafka/schema_registry/requirements.txt similarity index 100% rename from confluent_kafka/schema_registry/requirements.txt rename to src/confluent_kafka/schema_registry/requirements.txt diff --git a/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py similarity index 100% rename from confluent_kafka/schema_registry/schema_registry_client.py rename to src/confluent_kafka/schema_registry/schema_registry_client.py diff --git a/confluent_kafka/serialization/__init__.py b/src/confluent_kafka/serialization/__init__.py similarity index 100% rename from confluent_kafka/serialization/__init__.py rename to src/confluent_kafka/serialization/__init__.py diff --git a/confluent_kafka/serializing_producer.py b/src/confluent_kafka/serializing_producer.py similarity index 100% rename from confluent_kafka/serializing_producer.py rename to src/confluent_kafka/serializing_producer.py diff --git a/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c similarity index 100% rename from confluent_kafka/src/Admin.c rename to src/confluent_kafka/src/Admin.c diff --git a/confluent_kafka/src/AdminTypes.c b/src/confluent_kafka/src/AdminTypes.c similarity index 100% rename from confluent_kafka/src/AdminTypes.c rename to src/confluent_kafka/src/AdminTypes.c diff --git a/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c similarity index 99% rename from confluent_kafka/src/Consumer.c rename to src/confluent_kafka/src/Consumer.c index bc5c079e0..62d20603a 100644 --- a/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -63,12 +63,8 @@ static void Consumer_dealloc (Handle *self) { CallState_begin(self, &cs); - /* If application has not called c.close() then - * rd_kafka_destroy() will, and that might trigger - * callbacks to be called from consumer_close(). - * This should probably be fixed in librdkafka, - * or the application. */ - rd_kafka_destroy(self->rk); + rd_kafka_destroy_flags(self->rk, + RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); CallState_end(self, &cs); } diff --git a/confluent_kafka/src/Metadata.c b/src/confluent_kafka/src/Metadata.c similarity index 100% rename from confluent_kafka/src/Metadata.c rename to src/confluent_kafka/src/Metadata.c diff --git a/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c similarity index 99% rename from confluent_kafka/src/Producer.c rename to src/confluent_kafka/src/Producer.c index c9b6c9926..625c0ec8f 100644 --- a/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -228,7 +228,7 @@ Producer_produce0 (Handle *self, static PyObject *Producer_produce (Handle *self, PyObject *args, PyObject *kwargs) { const char *topic, *value = NULL, *key = NULL; - int value_len = 0, key_len = 0; + Py_ssize_t value_len = 0, key_len = 0; int partition = RD_KAFKA_PARTITION_UA; PyObject *headers = NULL, *dr_cb = NULL, *dr_cb2 = NULL; long long timestamp = 0; diff --git a/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c similarity index 100% rename from confluent_kafka/src/confluent_kafka.c rename to src/confluent_kafka/src/confluent_kafka.c diff --git a/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h similarity index 99% rename from confluent_kafka/src/confluent_kafka.h rename to src/confluent_kafka/src/confluent_kafka.h index ddf7825ad..1c98c9cb7 100644 --- a/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -14,6 +14,7 @@ * limitations under the License. */ +#define PY_SSIZE_T_CLEAN #include #include #include @@ -41,8 +42,8 @@ * 0xMMmmRRPP * MM=major, mm=minor, RR=revision, PP=patchlevel (not used) */ -#define CFL_VERSION 0x01050000 -#define CFL_VERSION_STR "1.5.0" +#define CFL_VERSION 0x01050200 +#define CFL_VERSION_STR "1.5.2" /** * Minimum required librdkafka version. This is checked both during diff --git a/tests/docker/.env b/tests/docker/.env index 53a37b81c..14bca4182 100644 --- a/tests/docker/.env +++ b/tests/docker/.env @@ -1,10 +1,11 @@ #!/usr/bin/env bash -export DOCKER_SOURCE="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" -export DOCKER_CONTEXT=$DOCKER_SOURCE/docker-compose.yaml -export DOCKER_BIN=$DOCKER_SOURCE/bin -export DOCKER_CONF=$DOCKER_SOURCE/conf -export TLS=$DOCKER_CONF/tls +export PY_DOCKER_SOURCE="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" +export PY_DOCKER_COMPOSE_FILE=$PY_DOCKER_SOURCE/docker-compose.yaml +export PY_DOCKER_CONTEXT="python-test-$(uuidgen)" +export PY_DOCKER_BIN=$PY_DOCKER_SOURCE/bin +export PY_DOCKER_CONF=$PY_DOCKER_SOURCE/conf +export TLS=$PY_DOCKER_CONF/tls export MY_BOOTSTRAP_SERVER_ENV=localhost:29092 export MY_SCHEMA_REGISTRY_URL_ENV=http://$(hostname):8081 diff --git a/tests/docker/bin/certify.sh b/tests/docker/bin/certify.sh index 6368444a0..e39e7c7c6 100755 --- a/tests/docker/bin/certify.sh +++ b/tests/docker/bin/certify.sh @@ -2,10 +2,10 @@ set -eu -DOCKER_BIN="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" +PY_DOCKER_BIN="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" export PASS="abcdefgh" -source ${DOCKER_BIN}/../.env +source ${PY_DOCKER_BIN}/../.env mkdir -p ${TLS} @@ -17,11 +17,11 @@ fi HOST=$(hostname -f) echo "Creating ca-cert..." -${DOCKER_BIN}/gen-ssl-certs.sh ca ${TLS}/ca-cert ${HOST} +${PY_DOCKER_BIN}/gen-ssl-certs.sh ca ${TLS}/ca-cert ${HOST} echo "Creating server cert..." -${DOCKER_BIN}/gen-ssl-certs.sh -k server ${TLS}/ca-cert ${TLS}/ ${HOST} ${HOST} +${PY_DOCKER_BIN}/gen-ssl-certs.sh -k server ${TLS}/ca-cert ${TLS}/ ${HOST} ${HOST} echo "Creating client cert..." -${DOCKER_BIN}/gen-ssl-certs.sh client ${TLS}/ca-cert ${TLS}/ ${HOST} ${HOST} +${PY_DOCKER_BIN}/gen-ssl-certs.sh client ${TLS}/ca-cert ${TLS}/ ${HOST} ${HOST} echo "Creating key ..." openssl rsa -in ${TLS}/client.key -out ${TLS}/client.key -passin pass:${PASS} diff --git a/tests/docker/bin/cluster_down.sh b/tests/docker/bin/cluster_down.sh index cc8e306dc..3d4bdc721 100755 --- a/tests/docker/bin/cluster_down.sh +++ b/tests/docker/bin/cluster_down.sh @@ -2,8 +2,8 @@ set -eu -DOCKER_BIN="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" -source ${DOCKER_BIN}/../.env +PY_DOCKER_BIN="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" +source ${PY_DOCKER_BIN}/../.env echo "Destroying cluster.." -docker-compose -f ${DOCKER_CONTEXT} down -v --remove-orphans +docker-compose -f $PY_DOCKER_COMPOSE_FILE down -v --remove-orphans diff --git a/tests/docker/bin/cluster_up.sh b/tests/docker/bin/cluster_up.sh index 8305433a6..72a0b9967 100755 --- a/tests/docker/bin/cluster_up.sh +++ b/tests/docker/bin/cluster_up.sh @@ -2,8 +2,8 @@ set -eu -DOCKER_BIN="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" -source ${DOCKER_BIN}/../.env +PY_DOCKER_BIN="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" +source ${PY_DOCKER_BIN}/../.env # Wait for http service listener to come up and start serving # $1 http service name @@ -27,20 +27,21 @@ await_http() { } echo "Configuring Environment..." -source ${DOCKER_SOURCE}/.env +source ${PY_DOCKER_SOURCE}/.env echo "Generating SSL certs..." -${DOCKER_BIN}/certify.sh +${PY_DOCKER_BIN}/certify.sh echo "Deploying cluster..." -docker-compose -f ${DOCKER_CONTEXT} up -d +docker-compose -f $PY_DOCKER_COMPOSE_FILE up -d echo "Setting throttle for throttle test..." -docker-compose -f ${DOCKER_CONTEXT} exec kafka sh -c " +docker-compose -f $PY_DOCKER_COMPOSE_FILE exec kafka sh -c " /usr/bin/kafka-configs --zookeeper zookeeper:2181 \ --alter --add-config 'producer_byte_rate=1,consumer_byte_rate=1,request_percentage=001' \ --entity-name throttled_client --entity-type clients" await_http "schema-registry" "http://localhost:8081" + await_http "schema-registry-basic-auth" "http://localhost:8083" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6543cedba..53111fc91 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -28,6 +28,7 @@ def kafka_cluster(): cluster = TrivupFixture({'with_sr': True, + 'cp_version': 'latest', 'broker_conf': ['transaction.state.log.replication.factor=1', 'transaction.state.log.min.isr=1']}) try: diff --git a/tests/integration/consumer/test_consumer_error.py b/tests/integration/consumer/test_consumer_error.py index cda27d691..24bab1200 100644 --- a/tests/integration/consumer/test_consumer_error.py +++ b/tests/integration/consumer/test_consumer_error.py @@ -17,7 +17,7 @@ # import pytest -from confluent_kafka.cimpl import TopicPartition, OFFSET_END +from confluent_kafka import TopicPartition, OFFSET_END, KafkaError from confluent_kafka.error import ConsumeError from confluent_kafka.serialization import StringSerializer @@ -39,6 +39,8 @@ def test_consume_error(kafka_cluster): value_deserializer=StringSerializer()) consumer.assign([TopicPartition(topic, 0, OFFSET_END)]) - with pytest.raises(ConsumeError, match="No more messages"): + with pytest.raises(ConsumeError) as exc_info: # Trigger EOF error consumer.poll() + assert exc_info.value.args[0].code() == KafkaError._PARTITION_EOF, \ + "Expected _PARTITION_EOF, not {}".format(exc_info) diff --git a/tests/integration/schema_registry/test_api_client.py b/tests/integration/schema_registry/test_api_client.py index 95e253c66..f8f599550 100644 --- a/tests/integration/schema_registry/test_api_client.py +++ b/tests/integration/schema_registry/test_api_client.py @@ -86,8 +86,7 @@ def test_api_register_schema_invalid(kafka_cluster, load_file): schema = Schema(load_file('invalid_schema.avsc'), schema_type='AVRO') subject = _subject_name('test_invalid_schema') - with pytest.raises(SchemaRegistryError, match="Input schema is an invalid" - " Avro schema") as e: + with pytest.raises(SchemaRegistryError) as e: sr.register_schema(subject, schema) assert e.value.http_status_code == 422 assert e.value.error_code == 42201 @@ -126,7 +125,7 @@ def test_api_get_schema_not_found(kafka_cluster, load_file): """ sr = kafka_cluster.schema_registry() - with pytest.raises(SchemaRegistryError, match="Schema not found") as e: + with pytest.raises(SchemaRegistryError, match="Schema .*not found.*") as e: sr.get_schema(999999) assert e.value.http_status_code == 404 @@ -148,7 +147,7 @@ def test_api_get_registration_subject_not_found(kafka_cluster, load_file): subject = _subject_name("registration_subject_not_found") - with pytest.raises(SchemaRegistryError, match="Subject not found") as e: + with pytest.raises(SchemaRegistryError, match="Subject .*not found.*") as e: sr.lookup_schema(subject, schema) assert e.value.http_status_code == 404 assert e.value.error_code == 40401 @@ -265,7 +264,7 @@ def test_api_delete_subject_not_found(kafka_cluster): subject = _subject_name("test-delete_invalid_subject") - with pytest.raises(SchemaRegistryError, match="Subject not found") as e: + with pytest.raises(SchemaRegistryError, match="Subject .*not found.*") as e: sr.delete_subject(subject) assert e.value.http_status_code == 404 assert e.value.error_code == 40401 @@ -302,7 +301,7 @@ def test_api_get_subject_version_no_version(kafka_cluster, load_file): subject = _subject_name('test-get_subject') sr.register_schema(subject, schema) - with pytest.raises(SchemaRegistryError, match="Version not found") as e: + with pytest.raises(SchemaRegistryError, match="Version .*not found") as e: sr.get_version(subject, version=3) assert e.value.http_status_code == 404 assert e.value.error_code == 40402 @@ -316,8 +315,9 @@ def test_api_get_subject_version_invalid(kafka_cluster, load_file): subject = _subject_name('test-get_subject') sr.register_schema(subject, schema) - with pytest.raises(SchemaRegistryError, match="The specified version is not" - " a valid version id") as e: + with pytest.raises(SchemaRegistryError, + match="The specified version .*is not" + " a valid version id.*") as e: sr.get_version(subject, version='a') assert e.value.http_status_code == 422 assert e.value.error_code == 42202 diff --git a/tests/integration/schema_registry/test_json_serializers.py b/tests/integration/schema_registry/test_json_serializers.py index f6645b849..f28bd7af3 100644 --- a/tests/integration/schema_registry/test_json_serializers.py +++ b/tests/integration/schema_registry/test_json_serializers.py @@ -91,7 +91,7 @@ def test_json_record_serialization(kafka_cluster, load_file): """ topic = kafka_cluster.create_topic("serialization-json") - sr = kafka_cluster.schema_registry({'url': 'http://localhost:8081'}) + sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") value_serializer = JSONSerializer(schema_str, sr) @@ -139,7 +139,7 @@ def test_json_record_serialization_incompatible(kafka_cluster, load_file): """ topic = kafka_cluster.create_topic("serialization-json") - sr = kafka_cluster.schema_registry({'url': 'http://localhost:8081'}) + sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") value_serializer = JSONSerializer(schema_str, sr) @@ -165,7 +165,7 @@ def test_json_record_serialization_no_title(kafka_cluster, load_file): load_file (callable(str)): JSON Schema file reader """ - sr = kafka_cluster.schema_registry({'url': 'http://localhost:8081'}) + sr = kafka_cluster.schema_registry() schema_str = load_file('not_title.json') with pytest.raises(ValueError, @@ -184,7 +184,7 @@ def test_json_record_serialization_custom(kafka_cluster, load_file): """ topic = kafka_cluster.create_topic("serialization-json") - sr = kafka_cluster.schema_registry({'url': 'http://localhost:8081'}) + sr = kafka_cluster.schema_registry() schema_str = load_file("product.json") value_serializer = JSONSerializer(schema_str, sr, @@ -228,7 +228,7 @@ def test_json_record_deserialization_mismatch(kafka_cluster, load_file): """ topic = kafka_cluster.create_topic("serialization-json") - sr = kafka_cluster.schema_registry({'url': 'http://localhost:8081'}) + sr = kafka_cluster.schema_registry() schema_str = load_file("contractor.json") schema_str2 = load_file("product.json") diff --git a/tests/integration/schema_registry/test_proto_serializers.py b/tests/integration/schema_registry/test_proto_serializers.py index 2701235a1..840c94f7f 100644 --- a/tests/integration/schema_registry/test_proto_serializers.py +++ b/tests/integration/schema_registry/test_proto_serializers.py @@ -54,7 +54,7 @@ def test_protobuf_message_serialization(kafka_cluster, pb2, data): """ topic = kafka_cluster.create_topic("serialization-proto") - sr = kafka_cluster.schema_registry({'url': 'http://localhost:8081'}) + sr = kafka_cluster.schema_registry() value_serializer = ProtobufSerializer(pb2, sr) value_deserializer = ProtobufDeserializer(pb2) @@ -85,7 +85,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): Registry to ensure the references match up. """ - sr = kafka_cluster.schema_registry({'url': 'http://localhost:8081'}) + sr = kafka_cluster.schema_registry() topic = kafka_cluster.create_topic("serialization-proto-refs") serializer = ProtobufSerializer(pb2, sr) producer = kafka_cluster.producer(key_serializer=serializer) @@ -106,7 +106,7 @@ def test_protobuf_serializer_type_mismatch(kafka_cluster): pb2_1 = TestProto_pb2.TestMessage pb2_2 = NestedTestProto_pb2.NestedMessage - sr = kafka_cluster.schema_registry({'url': 'http://localhost:8081'}) + sr = kafka_cluster.schema_registry() topic = kafka_cluster.create_topic("serialization-proto-refs") serializer = ProtobufSerializer(pb2_1, sr) @@ -127,7 +127,7 @@ def test_protobuf_deserializer_type_mismatch(kafka_cluster): pb2_1 = PublicTestProto_pb2.TestMessage pb2_2 = metadata_proto_pb2.HDFSOptions - sr = kafka_cluster.schema_registry({'url': 'http://localhost:8081'}) + sr = kafka_cluster.schema_registry() topic = kafka_cluster.create_topic("serialization-proto-refs") serializer = ProtobufSerializer(pb2_1, sr) deserializer = ProtobufDeserializer(pb2_2) diff --git a/tests/requirements.txt b/tests/requirements.txt index 75d1b9bcf..492e55e34 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,6 +1,6 @@ - flake8 - pytest==4.6.9;python_version<="3.0" - pytest;python_version>="3.0" - pytest-timeout - requests-mock - trivup +flake8 +pytest==4.6.9;python_version<="3.0" +pytest>=6.0.0;python_version>="3.0" +pytest-timeout +requests-mock +trivup>=0.8.2 diff --git a/tests/schema_registry/conftest.py b/tests/schema_registry/conftest.py index d9b86dc80..ea71b1938 100644 --- a/tests/schema_registry/conftest.py +++ b/tests/schema_registry/conftest.py @@ -183,7 +183,7 @@ def _auth_matcher(cls, request): # We only support the BASIC scheme today scheme, userinfo = authinfo.split(" ") - if b64decode(userinfo) == cls.USERINFO: + if b64decode(userinfo).decode('utf-8') == cls.USERINFO: return None unauthorized = {'error_code': 401, @@ -349,7 +349,7 @@ def post_subject_version_callback(self, request, context): return {'id': self.SCHEMA_ID} -@pytest.fixture("package") +@pytest.fixture(scope="package") def load_avsc(): def get_handle(name): with open(os.path.join(work_dir, '..', 'integration', 'schema_registry', diff --git a/tests/test_error.py b/tests/test_error.py index 9ceceb6fb..a06dfc0ad 100644 --- a/tests/test_error.py +++ b/tests/test_error.py @@ -16,7 +16,7 @@ # limit # -from confluent_kafka.cimpl import KafkaError +from confluent_kafka import KafkaError from confluent_kafka.error import ConsumeError, \ ProduceError diff --git a/tools/build-openssl.sh b/tools/build-openssl.sh index 56efea610..195aa63d2 100755 --- a/tools/build-openssl.sh +++ b/tools/build-openssl.sh @@ -6,11 +6,12 @@ # NOTE: Keep this updated to make sure we always build the latest # version of OpenSSL in the 1.0 release train. -OPENSSL_VERSION=1.0.2u +OPENSSL_VERSION=1.1.1h PREFIX=$1 if [[ -z $PREFIX ]]; then echo "Usage: $0 " + exit 1 fi set -ex @@ -23,14 +24,14 @@ if ! grep -q "^VERSION=${OPENSSL_VERSION}$" build-openssl/Makefile ; then rm -rf build-openssl mkdir -p build-openssl pushd build-openssl - curl -fL https://www.openssl.org/source/old/1.0.2/openssl-${OPENSSL_VERSION}.tar.gz | \ + curl -fL https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz | \ tar -xz --strip-components=1 -f - else echo "Reusing existing build-openssl directory" pushd build-openssl fi -./config --prefix=${PREFIX} zlib no-krb5 zlib shared +./config --prefix=${PREFIX} zlib shared no-deprecated echo "## building openssl" if ! make -j 2>&1 | tail -20 ; then echo "## Make failed, cleaning up and retrying" diff --git a/tools/windows-build.bat b/tools/windows-build.bat index 6ab6d2527..628a9f24b 100644 --- a/tools/windows-build.bat +++ b/tools/windows-build.bat @@ -51,14 +51,12 @@ for %%W in (wheelhouse\confluent_kafka-*cp%PYTHON_SHORTVER%*win*%PYTHON_ARCH%.wh python -c "import struct; print(struct.calcsize('P') * 8)" 7z l %%~W pip install %%~W || exit /b 1 + pip install -r src\confluent_kafka\requirements.txt + pip install -r src\confluent_kafka\avro\requirements.txt - SET savedir=%cd% - cd .. python -c "from confluent_kafka import libversion ; print(libversion())" || exit /b 1 - python -m pytest --ignore=confluent-kafka-python\tests\schema_registry --ignore=confluent-kafka-python\tests\integration --import-mode=append confluent-kafka-python\tests || exit /b 1 + python -m pytest --ignore=tests\schema_registry --ignore=tests\integration tests || exit /b 1 pip uninstall -y confluent_kafka || exit /b 1 - - cd %savedir% ) diff --git a/tox.ini b/tox.ini index 9ce91c476..8eb4fb8b6 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = flake8,py27,py34,py35,py36 +envlist = flake8,py27,py36,py38 [testenv] setenv = @@ -11,38 +11,18 @@ passenv = #http://tox.readthedocs.io/en/latest/config.html#confval-passenv=SPACE-SEPARATED-GLOBNAMES * commands = - pip install -v . - py.test -v --timeout 60 --ignore=tmp-build --import-mode append {posargs} + # Install main package and all sub-packages + pip install . .[avro] .[schema-registry] .[json] .[protobuf] + # Early verification that module is loadable + python -c 'import confluent_kafka ; print(confluent_kafka.version())' + # Run tests (large timeout to allow docker image downloads) + python -m pytest --timeout 600 --ignore=tmp-build {posargs} # See tests/README.md for additional notes on testing - python tests/integration/integration_test.py + #python tests/integration/integration_test.py -[base] deps = # https://docs.pytest.org/en/latest/changelog.html#id53 - pytest==4.6.4 - pytest-timeout - fastavro - requests - -[testenv:py27] -deps = - {[base]deps} - avro - -[testenv:py34] -deps = - {[base]deps} - avro-python3 - -[testenv:py35] -deps = - {[base]deps} - avro-python3 - -[testenv:py36] -deps = - {[base]deps} - avro-python3 + -rtests/requirements.txt [testenv:flake8] deps = flake8