Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workaround an error to import "range" from "kafka.vendor.six.moves" #2376

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import struct

from kafka.vendor import six
from kafka.vendor.six.moves import range
from kafka.vendor.six import moves

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
Expand Down Expand Up @@ -160,7 +160,7 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
chunker = lambda payload, i, size: memoryview(payload)[i:size+i].tobytes()

for chunk in (chunker(payload, i, xerial_blocksize)
for i in range(0, len(payload), xerial_blocksize)):
for i in moves.range(0, len(payload), xerial_blocksize)):

block = snappy.compress(chunk)
block_size = len(block)
Expand Down
17 changes: 8 additions & 9 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import uuid

import py
from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from kafka.vendor.six import moves

from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from kafka.errors import InvalidReplicationFactorError
Expand Down Expand Up @@ -96,12 +95,12 @@ def download_official_distribution(cls,
try:
url = url_base + distfile + '.tgz'
log.info("Attempting to download %s", url)
response = urllib.request.urlopen(url)
except urllib.error.HTTPError:
response = moves.urllib.request.urlopen(url)
except moves.urllib.error.HTTPError:
log.exception("HTTP Error")
url = url_base + distfile + '.tar.gz'
log.info("Attempting to download %s", url)
response = urllib.request.urlopen(url)
response = moves.urllib.request.urlopen(url)

log.info("Saving distribution file to %s", output_file)
with open(output_file, 'w') as output_file_fd:
Expand Down Expand Up @@ -160,7 +159,7 @@ class ZookeeperFixture(Fixture):
@classmethod
def instance(cls):
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
parse = moves.urllib.parse.urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
Expand Down Expand Up @@ -256,7 +255,7 @@ def instance(cls, broker_id, zookeeper, zk_chroot=None,
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
parse = urlparse(os.environ["KAFKA_URI"])
parse = moves.urllib.parse.urlparse(os.environ["KAFKA_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
Expand Down Expand Up @@ -530,7 +529,7 @@ def _failure(error):
retries = 10
while True:
node_id = self._client.least_loaded_node()
for connect_retry in range(40):
for connect_retry in moves.range(40):
self._client.maybe_connect(node_id)
if self._client.connected(node_id):
break
Expand Down Expand Up @@ -646,7 +645,7 @@ def _enrich_client_params(self, params, **defaults):
@staticmethod
def _create_many_clients(cnt, cls, *args, **params):
client_id = params['client_id']
for _ in range(cnt):
for _ in moves.range(cnt):
params['client_id'] = '%s_%s' % (client_id, random_string(4))
yield cls(*args, **params)

Expand Down
14 changes: 7 additions & 7 deletions test/test_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import struct

import pytest
from kafka.vendor.six.moves import range
from kafka.vendor.six import moves

from kafka.codec import (
has_snappy, has_lz4, has_zstd,
Expand All @@ -19,15 +19,15 @@


def test_gzip():
for i in range(1000):
for i in moves.range(1000):
b1 = random_string(100).encode('utf-8')
b2 = gzip_decode(gzip_encode(b1))
assert b1 == b2


@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
def test_snappy():
for i in range(1000):
for i in moves.range(1000):
b1 = random_string(100).encode('utf-8')
b2 = snappy_decode(snappy_encode(b1))
assert b1 == b2
Expand Down Expand Up @@ -87,7 +87,7 @@ def test_snappy_encode_xerial():
@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
reason="python-lz4 crashes on old versions of pypy")
def test_lz4():
for i in range(1000):
for i in moves.range(1000):
b1 = random_string(100).encode('utf-8')
b2 = lz4_decode(lz4_encode(b1))
assert len(b1) == len(b2)
Expand All @@ -97,7 +97,7 @@ def test_lz4():
@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
reason="python-lz4 crashes on old versions of pypy")
def test_lz4_old():
for i in range(1000):
for i in moves.range(1000):
b1 = random_string(100).encode('utf-8')
b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1))
assert len(b1) == len(b2)
Expand All @@ -107,7 +107,7 @@ def test_lz4_old():
@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
reason="python-lz4 crashes on old versions of pypy")
def test_lz4_incremental():
for i in range(1000):
for i in moves.range(1000):
# lz4 max single block size is 4MB
# make sure we test with multiple-blocks
b1 = random_string(100).encode('utf-8') * 50000
Expand All @@ -118,7 +118,7 @@ def test_lz4_incremental():

@pytest.mark.skipif(not has_zstd(), reason="Zstd not available")
def test_zstd():
for _ in range(1000):
for _ in moves.range(1000):
b1 = random_string(100).encode('utf-8')
b2 = zstd_decode(zstd_encode(b1))
assert b1 == b2