From 20405e5f9893ac48b75ee0bae4d86aa5ff66108b Mon Sep 17 00:00:00 2001 From: Taras Voinarovskiy Date: Fri, 25 Aug 2017 09:38:27 +0000 Subject: [PATCH] Add DefaultRecordBatch implementation aka V2 message format parser/builder. --- kafka/producer/kafka.py | 15 +- kafka/producer/record_accumulator.py | 3 +- kafka/producer/sender.py | 9 +- kafka/record/README | 8 + kafka/record/_crc32c.py | 143 +++++++ kafka/record/abc.py | 10 +- kafka/record/default_records.py | 553 +++++++++++++++++++++++++++ kafka/record/legacy_records.py | 3 +- kafka/record/memory_records.py | 23 +- kafka/record/util.py | 105 +++++ test/record/test_default_records.py | 149 ++++++++ test/record/test_legacy_records.py | 72 +++- test/record/test_records.py | 46 +++ test/record/test_util.py | 92 +++++ test/test_fetcher.py | 2 +- 15 files changed, 1211 insertions(+), 22 deletions(-) create mode 100644 kafka/record/README create mode 100644 kafka/record/_crc32c.py create mode 100644 kafka/record/default_records.py create mode 100644 test/record/test_default_records.py create mode 100644 test/record/test_util.py diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 5638b61a4..2159a4edf 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -15,6 +15,7 @@ from ..codec import has_gzip, has_snappy, has_lz4 from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner +from ..record.default_records import DefaultRecordBatchBuilder from ..record.legacy_records import LegacyRecordBatchBuilder from ..serializer import Serializer from ..structs import TopicPartition @@ -483,15 +484,21 @@ def partitions_for(self, topic): return self._wait_on_metadata(topic, max_wait) def _max_usable_produce_magic(self): - if self.config['api_version'] >= (0, 10): + if self.config['api_version'] >= (0, 11): + return 2 + elif self.config['api_version'] >= (0, 10): return 1 else: return 0 - def _estimate_size_in_bytes(self, key, value): + def _estimate_size_in_bytes(self, key, value, headers=[]): magic = self._max_usable_produce_magic() - return LegacyRecordBatchBuilder.estimate_size_in_bytes( - magic, self.config['compression_type'], key, value) + if magic == 2: + return DefaultRecordBatchBuilder.estimate_size_in_bytes( + key, value, headers) + else: + return LegacyRecordBatchBuilder.estimate_size_in_bytes( + magic, self.config['compression_type'], key, value) def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): """Publish a message to a topic. diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 716ae658e..a530dc3dd 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -57,7 +57,8 @@ def record_count(self): def try_append(self, timestamp_ms, key, value): offset = self.records.next_offset() - checksum, record_size = self.records.append(timestamp_ms, key, value) + checksum, record_size = self.records.append( + timestamp_ms, key, value, headers=[]) if record_size == 0: return None diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 72a15bbdd..ffc67f8a3 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -291,7 +291,11 @@ def _produce_request(self, node_id, acks, timeout, batches): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - if self.config['api_version'] >= (0, 10): + kwargs = {} + if self.config['api_version'] >= (0, 11): + version = 3 + kwargs = dict(transactional_id=None) + elif self.config['api_version'] >= (0, 10): version = 2 elif self.config['api_version'] == (0, 9): version = 1 @@ -302,7 +306,8 @@ def _produce_request(self, node_id, acks, timeout, batches): timeout=timeout, topics=[(topic, list(partition_info.items())) for topic, partition_info - in six.iteritems(produce_records_by_partition)] + in six.iteritems(produce_records_by_partition)], + **kwargs ) def wakeup(self): diff --git a/kafka/record/README b/kafka/record/README new file mode 100644 index 000000000..e4454554c --- /dev/null +++ b/kafka/record/README @@ -0,0 +1,8 @@ +Module structured mostly based on +kafka/clients/src/main/java/org/apache/kafka/common/record/ module of Java +Client. + +See abc.py for abstract declarations. `ABCRecords` is used as a facade to hide +version differences. `ABCRecordBatch` subclasses will implement actual parsers +for different versions (v0/v1 as LegacyBatch and v2 as DefaultBatch. Names +taken from Java). diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py new file mode 100644 index 000000000..5704f8238 --- /dev/null +++ b/kafka/record/_crc32c.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python +# +# Taken from https://cloud.google.com/appengine/docs/standard/python/refdocs/\ +# modules/google/appengine/api/files/crc32c?hl=ru +# +# Copyright 2007 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Implementation of CRC-32C checksumming as in rfc3720 section B.4. +See http://en.wikipedia.org/wiki/Cyclic_redundancy_check for details on CRC-32C +This code is a manual python translation of c code generated by +pycrc 0.7.1 (http://www.tty1.net/pycrc/). Command line used: +'./pycrc.py --model=crc-32c --generate c --algorithm=table-driven' +""" + +import array + +CRC_TABLE = ( + 0x00000000, 0xf26b8303, 0xe13b70f7, 0x1350f3f4, + 0xc79a971f, 0x35f1141c, 0x26a1e7e8, 0xd4ca64eb, + 0x8ad958cf, 0x78b2dbcc, 0x6be22838, 0x9989ab3b, + 0x4d43cfd0, 0xbf284cd3, 0xac78bf27, 0x5e133c24, + 0x105ec76f, 0xe235446c, 0xf165b798, 0x030e349b, + 0xd7c45070, 0x25afd373, 0x36ff2087, 0xc494a384, + 0x9a879fa0, 0x68ec1ca3, 0x7bbcef57, 0x89d76c54, + 0x5d1d08bf, 0xaf768bbc, 0xbc267848, 0x4e4dfb4b, + 0x20bd8ede, 0xd2d60ddd, 0xc186fe29, 0x33ed7d2a, + 0xe72719c1, 0x154c9ac2, 0x061c6936, 0xf477ea35, + 0xaa64d611, 0x580f5512, 0x4b5fa6e6, 0xb93425e5, + 0x6dfe410e, 0x9f95c20d, 0x8cc531f9, 0x7eaeb2fa, + 0x30e349b1, 0xc288cab2, 0xd1d83946, 0x23b3ba45, + 0xf779deae, 0x05125dad, 0x1642ae59, 0xe4292d5a, + 0xba3a117e, 0x4851927d, 0x5b016189, 0xa96ae28a, + 0x7da08661, 0x8fcb0562, 0x9c9bf696, 0x6ef07595, + 0x417b1dbc, 0xb3109ebf, 0xa0406d4b, 0x522bee48, + 0x86e18aa3, 0x748a09a0, 0x67dafa54, 0x95b17957, + 0xcba24573, 0x39c9c670, 0x2a993584, 0xd8f2b687, + 0x0c38d26c, 0xfe53516f, 0xed03a29b, 0x1f682198, + 0x5125dad3, 0xa34e59d0, 0xb01eaa24, 0x42752927, + 0x96bf4dcc, 0x64d4cecf, 0x77843d3b, 0x85efbe38, + 0xdbfc821c, 0x2997011f, 0x3ac7f2eb, 0xc8ac71e8, + 0x1c661503, 0xee0d9600, 0xfd5d65f4, 0x0f36e6f7, + 0x61c69362, 0x93ad1061, 0x80fde395, 0x72966096, + 0xa65c047d, 0x5437877e, 0x4767748a, 0xb50cf789, + 0xeb1fcbad, 0x197448ae, 0x0a24bb5a, 0xf84f3859, + 0x2c855cb2, 0xdeeedfb1, 0xcdbe2c45, 0x3fd5af46, + 0x7198540d, 0x83f3d70e, 0x90a324fa, 0x62c8a7f9, + 0xb602c312, 0x44694011, 0x5739b3e5, 0xa55230e6, + 0xfb410cc2, 0x092a8fc1, 0x1a7a7c35, 0xe811ff36, + 0x3cdb9bdd, 0xceb018de, 0xdde0eb2a, 0x2f8b6829, + 0x82f63b78, 0x709db87b, 0x63cd4b8f, 0x91a6c88c, + 0x456cac67, 0xb7072f64, 0xa457dc90, 0x563c5f93, + 0x082f63b7, 0xfa44e0b4, 0xe9141340, 0x1b7f9043, + 0xcfb5f4a8, 0x3dde77ab, 0x2e8e845f, 0xdce5075c, + 0x92a8fc17, 0x60c37f14, 0x73938ce0, 0x81f80fe3, + 0x55326b08, 0xa759e80b, 0xb4091bff, 0x466298fc, + 0x1871a4d8, 0xea1a27db, 0xf94ad42f, 0x0b21572c, + 0xdfeb33c7, 0x2d80b0c4, 0x3ed04330, 0xccbbc033, + 0xa24bb5a6, 0x502036a5, 0x4370c551, 0xb11b4652, + 0x65d122b9, 0x97baa1ba, 0x84ea524e, 0x7681d14d, + 0x2892ed69, 0xdaf96e6a, 0xc9a99d9e, 0x3bc21e9d, + 0xef087a76, 0x1d63f975, 0x0e330a81, 0xfc588982, + 0xb21572c9, 0x407ef1ca, 0x532e023e, 0xa145813d, + 0x758fe5d6, 0x87e466d5, 0x94b49521, 0x66df1622, + 0x38cc2a06, 0xcaa7a905, 0xd9f75af1, 0x2b9cd9f2, + 0xff56bd19, 0x0d3d3e1a, 0x1e6dcdee, 0xec064eed, + 0xc38d26c4, 0x31e6a5c7, 0x22b65633, 0xd0ddd530, + 0x0417b1db, 0xf67c32d8, 0xe52cc12c, 0x1747422f, + 0x49547e0b, 0xbb3ffd08, 0xa86f0efc, 0x5a048dff, + 0x8ecee914, 0x7ca56a17, 0x6ff599e3, 0x9d9e1ae0, + 0xd3d3e1ab, 0x21b862a8, 0x32e8915c, 0xc083125f, + 0x144976b4, 0xe622f5b7, 0xf5720643, 0x07198540, + 0x590ab964, 0xab613a67, 0xb831c993, 0x4a5a4a90, + 0x9e902e7b, 0x6cfbad78, 0x7fab5e8c, 0x8dc0dd8f, + 0xe330a81a, 0x115b2b19, 0x020bd8ed, 0xf0605bee, + 0x24aa3f05, 0xd6c1bc06, 0xc5914ff2, 0x37faccf1, + 0x69e9f0d5, 0x9b8273d6, 0x88d28022, 0x7ab90321, + 0xae7367ca, 0x5c18e4c9, 0x4f48173d, 0xbd23943e, + 0xf36e6f75, 0x0105ec76, 0x12551f82, 0xe03e9c81, + 0x34f4f86a, 0xc69f7b69, 0xd5cf889d, 0x27a40b9e, + 0x79b737ba, 0x8bdcb4b9, 0x988c474d, 0x6ae7c44e, + 0xbe2da0a5, 0x4c4623a6, 0x5f16d052, 0xad7d5351, +) + +CRC_INIT = 0 +_MASK = 0xFFFFFFFF + + +def crc_update(crc, data): + """Update CRC-32C checksum with data. + Args: + crc: 32-bit checksum to update as long. + data: byte array, string or iterable over bytes. + Returns: + 32-bit updated CRC-32C as long. + """ + if type(data) != array.array or data.itemsize != 1: + buf = array.array("B", data) + else: + buf = data + crc = crc ^ _MASK + for b in buf: + table_index = (crc ^ b) & 0xff + crc = (CRC_TABLE[table_index] ^ (crc >> 8)) & _MASK + return crc ^ _MASK + + +def crc_finalize(crc): + """Finalize CRC-32C checksum. + This function should be called as last step of crc calculation. + Args: + crc: 32-bit checksum as long. + Returns: + finalized 32-bit checksum as long + """ + return crc & _MASK + + +def crc(data): + """Compute CRC-32C checksum of the data. + Args: + data: byte array, string or iterable over bytes. + Returns: + 32-bit CRC-32C checksum of data as long. + """ + return crc_finalize(crc_update(CRC_INIT, data)) + + +if __name__ == "__main__": + import sys + data = sys.stdin.read() + print(hex(crc(data))) diff --git a/kafka/record/abc.py b/kafka/record/abc.py index 8a2727663..83121c6f6 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -36,12 +36,18 @@ def checksum(self): be the checksum for v0 and v1 and None for v2 and above. """ + @abc.abstractproperty + def headers(self): + """ If supported by version list of key-value tuples, or empty list if + not supported by format. + """ + class ABCRecordBatchBuilder(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def append(self, offset, timestamp, key, value): + def append(self, offset, timestamp, key, value, headers=None): """ Writes record to internal buffer. Arguments: @@ -51,6 +57,8 @@ def append(self, offset, timestamp, key, value): set to current time. key (bytes or None): Key of the record value (bytes or None): Value of the record + headers (List[Tuple[str, bytes]]): Headers of the record. Header + keys can not be ``None``. Returns: (bytes, int): Checksum of the written record (or None for v2 and diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py new file mode 100644 index 000000000..d67cdbdb7 --- /dev/null +++ b/kafka/record/default_records.py @@ -0,0 +1,553 @@ +# See: +# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\ +# apache/kafka/common/record/DefaultRecordBatch.java +# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\ +# apache/kafka/common/record/DefaultRecord.java + +# RecordBatch and Record implementation for magic 2 and above. +# The schema is given below: + +# RecordBatch => +# BaseOffset => Int64 +# Length => Int32 +# PartitionLeaderEpoch => Int32 +# Magic => Int8 +# CRC => Uint32 +# Attributes => Int16 +# LastOffsetDelta => Int32 // also serves as LastSequenceDelta +# FirstTimestamp => Int64 +# MaxTimestamp => Int64 +# ProducerId => Int64 +# ProducerEpoch => Int16 +# BaseSequence => Int32 +# Records => [Record] + +# Record => +# Length => Varint +# Attributes => Int8 +# TimestampDelta => Varlong +# OffsetDelta => Varint +# Key => Bytes +# Value => Bytes +# Headers => [HeaderKey HeaderValue] +# HeaderKey => String +# HeaderValue => Bytes + +# Note that when compression is enabled (see attributes below), the compressed +# record data is serialized directly following the count of the number of +# records. (ie Records => [Record], but without length bytes) + +# The CRC covers the data from the attributes to the end of the batch (i.e. all +# the bytes that follow the CRC). It is located after the magic byte, which +# means that clients must parse the magic byte before deciding how to interpret +# the bytes between the batch length and the magic byte. The partition leader +# epoch field is not included in the CRC computation to avoid the need to +# recompute the CRC when this field is assigned for every batch that is +# received by the broker. The CRC-32C (Castagnoli) polynomial is used for the +# computation. + +# The current RecordBatch attributes are given below: +# +# * Unused (6-15) +# * Control (5) +# * Transactional (4) +# * Timestamp Type (3) +# * Compression Type (0-2) + +import io +import struct +import time +from .abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder +from .util import decode_varint, encode_varint, calc_crc32c, size_of_varint + +from kafka.errors import CorruptRecordException +from kafka.codec import ( + gzip_encode, snappy_encode, lz4_encode, + gzip_decode, snappy_decode, lz4_decode +) + + +class DefaultRecordBase(object): + + HEADER_STRUCT = struct.Struct( + ">q" # BaseOffset => Int64 + "i" # Length => Int32 + "i" # PartitionLeaderEpoch => Int32 + "b" # Magic => Int8 + "I" # CRC => Uint32 + "h" # Attributes => Int16 + "i" # LastOffsetDelta => Int32 // also serves as LastSequenceDelta + "q" # FirstTimestamp => Int64 + "q" # MaxTimestamp => Int64 + "q" # ProducerId => Int64 + "h" # ProducerEpoch => Int16 + "i" # BaseSequence => Int32 + "i" # Records count => Int32 + ) + # Byte offset in HEADER_STRUCT of attributes field. Used to calculate CRC + ATTRIBUTES_OFFSET = struct.calcsize(">qiibI") + CRC_OFFSET = struct.calcsize(">qiib") + AFTER_LEN_OFFSET = struct.calcsize(">qi") + + CODEC_MASK = 0x07 + CODEC_NONE = 0x00 + CODEC_GZIP = 0x01 + CODEC_SNAPPY = 0x02 + CODEC_LZ4 = 0x03 + TIMESTAMP_TYPE_MASK = 0x08 + TRANSACTIONAL_MASK = 0x10 + CONTROL_MASK = 0x20 + + LOG_APPEND_TIME = 1 + CREATE_TIME = 0 + + +class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): + + def __init__(self, buffer): + self._buffer = memoryview(buffer) + self._header_data = self.HEADER_STRUCT.unpack_from(self._buffer) + self._pos = self.HEADER_STRUCT.size + self._num_records = self._header_data[12] + self._next_record_index = 0 + self._decompressed = False + + @property + def base_offset(self): + return self._header_data[0] + + @property + def magic(self): + return self._header_data[3] + + @property + def crc(self): + return self._header_data[4] + + @property + def attributes(self): + return self._header_data[5] + + @property + def compression_type(self): + return self.attributes & self.CODEC_MASK + + @property + def timestamp_type(self): + return int(bool(self.attributes & self.TIMESTAMP_TYPE_MASK)) + + @property + def is_transactional(self): + return bool(self.attributes & self.TRANSACTIONAL_MASK) + + @property + def is_control_batch(self): + return bool(self.attributes & self.CONTROL_MASK) + + @property + def first_timestamp(self): + return self._header_data[7] + + @property + def max_timestamp(self): + return self._header_data[8] + + def _maybe_uncompress(self): + if not self._decompressed: + compression_type = self.compression_type + if compression_type != self.CODEC_NONE: + data = self._buffer[self._pos:] + if compression_type == self.CODEC_GZIP: + uncompressed = gzip_decode(data) + if compression_type == self.CODEC_SNAPPY: + uncompressed = snappy_decode(data.tobytes()) + if compression_type == self.CODEC_LZ4: + uncompressed = lz4_decode(data.tobytes()) + self._buffer = memoryview(uncompressed) + self._pos = 0 + self._decompressed = True + + def _read_msg(self): + # Record => + # Length => Varint + # Attributes => Int8 + # TimestampDelta => Varlong + # OffsetDelta => Varint + # Key => Bytes + # Value => Bytes + # Headers => [HeaderKey HeaderValue] + # HeaderKey => String + # HeaderValue => Bytes + + buffer = self._buffer + pos = self._pos + length, pos = decode_varint(buffer, pos) + start_pos = pos + _, pos = decode_varint(buffer, pos) # attrs can be skipped for now + + ts_delta, pos = decode_varint(buffer, pos) + if self.timestamp_type == self.LOG_APPEND_TIME: + timestamp = self.max_timestamp + else: + timestamp = self.first_timestamp + ts_delta + + offset_delta, pos = decode_varint(buffer, pos) + offset = self.base_offset + offset_delta + + key_len, pos = decode_varint(buffer, pos) + if key_len >= 0: + key = buffer[pos: pos + key_len].tobytes() + pos += key_len + else: + key = None + + value_len, pos = decode_varint(buffer, pos) + if value_len >= 0: + value = buffer[pos: pos + value_len].tobytes() + pos += value_len + else: + value = None + + header_count, pos = decode_varint(buffer, pos) + if header_count < 0: + raise CorruptRecordException("Found invalid number of record " + "headers {}".format(header_count)) + headers = [] + for _ in range(header_count): + # Header key is of type String, that can't be None + h_key_len, pos = decode_varint(buffer, pos) + if h_key_len < 0: + raise CorruptRecordException( + "Invalid negative header key size {}".format(h_key_len)) + h_key = buffer[pos: pos + h_key_len].tobytes().decode("utf-8") + pos += h_key_len + + # Value is of type NULLABLE_BYTES, so it can be None + h_value_len, pos = decode_varint(buffer, pos) + if h_value_len >= 0: + h_value = buffer[pos: pos + h_value_len].tobytes() + pos += h_value_len + else: + h_value = None + + headers.append((h_key, h_value)) + + # validate whether we have read all header bytes in the current record + if pos - start_pos != length: + CorruptRecordException( + "Invalid record size: expected to read {} bytes in record " + "payload, but instead read {}".format(length, pos - start_pos)) + self._pos = pos + + return DefaultRecord( + offset, timestamp, self.timestamp_type, key, value, headers) + + def __iter__(self): + self._maybe_uncompress() + return self + + def __next__(self): + if self._next_record_index >= self._num_records: + if self._pos != len(self._buffer): + raise CorruptRecordException( + "{} unconsumed bytes after all records consumed".format( + len(self._buffer) - self._pos)) + raise StopIteration + try: + msg = self._read_msg() + except ValueError as err: + raise CorruptRecordException( + "Found invalid record structure: {!r}".format(err)) + else: + self._next_record_index += 1 + return msg + + next = __next__ + + def validate_crc(self): + assert self._decompressed is False, \ + "Validate should be called before iteration" + + crc = self.crc + data_view = self._buffer[self.ATTRIBUTES_OFFSET:] + verify_crc = calc_crc32c(data_view) + return crc == verify_crc + + +class DefaultRecord(ABCRecord): + + __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", + "_headers") + + def __init__(self, offset, timestamp, timestamp_type, key, value, headers): + self._offset = offset + self._timestamp = timestamp + self._timestamp_type = timestamp_type + self._key = key + self._value = value + self._headers = headers + + @property + def offset(self): + return self._offset + + @property + def timestamp(self): + """ Epoch milliseconds + """ + return self._timestamp + + @property + def timestamp_type(self): + """ CREATE_TIME(0) or APPEND_TIME(1) + """ + return self._timestamp_type + + @property + def key(self): + """ Bytes key or None + """ + return self._key + + @property + def value(self): + """ Bytes value or None + """ + return self._value + + @property + def headers(self): + return self._headers + + @property + def checksum(self): + return None + + def __repr__(self): + return ( + "DefaultRecord(offset={!r}, timestamp={!r}, timestamp_type={!r}," + " key={!r}, value={!r}, headers={!r})".format( + self._offset, self._timestamp, self._timestamp_type, + self._key, self._value, self._headers) + ) + + +class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder): + + # excluding key, value and headers: + # 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes + MAX_RECORD_OVERHEAD = 21 + + def __init__( + self, magic, compression_type, is_transactional, + producer_id, producer_epoch, base_sequence, batch_size): + assert magic >= 2 + self._magic = magic + self._compression_type = compression_type & self.CODEC_MASK + self._batch_size = batch_size + self._is_transactional = bool(is_transactional) + # KIP-98 fields for EOS + self._producer_id = producer_id + self._producer_epoch = producer_epoch + self._base_sequence = base_sequence + + self._first_timestamp = None + self._max_timestamp = None + self._last_offset = 0 + self._num_records = 0 + + self._buffer = bytearray(self.HEADER_STRUCT.size) + + def _get_attributes(self, include_compression_type=True): + attrs = 0 + if include_compression_type: + attrs |= self._compression_type + # Timestamp Type is set by Broker + if self._is_transactional: + attrs |= self.TRANSACTIONAL_MASK + # Control batches are only created by Broker + return attrs + + def append(self, offset, timestamp, key, value, headers): + """ Write message to messageset buffer with MsgVersion 2 + """ + # Check types + if type(offset) != int: + raise TypeError(offset) + if timestamp is None: + timestamp = int(time.time() * 1000) + elif type(timestamp) != int: + raise TypeError(timestamp) + if not (key is None or + isinstance(key, (bytes, bytearray, memoryview))): + raise TypeError( + "Not supported type for key: {}".format(type(key))) + if not (value is None or + isinstance(value, (bytes, bytearray, memoryview))): + raise TypeError( + "Not supported type for value: {}".format(type(value))) + + # We will always add the first message, so those will be set + if self._first_timestamp is None: + self._first_timestamp = timestamp + self._max_timestamp = timestamp + timestamp_delta = 0 + first_message = True + else: + timestamp_delta = timestamp - self._first_timestamp + if self._max_timestamp < timestamp: + self._max_timestamp = timestamp + first_message = False + + self._last_offset = offset + self._num_records += 1 + offset_delta = offset # Base offset is always 0 on Produce + + # We can't write record right away to out buffer, we need to precompute + # the length as first value... + message_buffer = io.BytesIO() + message_buffer.write(b"\x00") # Attributes + message_buffer.write(encode_varint(timestamp_delta)) + message_buffer.write(encode_varint(offset_delta)) + + if key is not None: + message_buffer.write(encode_varint(len(key))) + message_buffer.write(key) + else: + message_buffer.write(encode_varint(-1)) + + if value is not None: + message_buffer.write(encode_varint(len(value))) + message_buffer.write(value) + else: + message_buffer.write(encode_varint(-1)) + + message_buffer.write(encode_varint(len(headers))) + + for h_key, h_value in headers: + h_key = h_key.encode("utf-8") + message_buffer.write(encode_varint(len(h_key))) + message_buffer.write(h_key) + if h_value is not None: + message_buffer.write(encode_varint(len(h_value))) + message_buffer.write(h_value) + else: + message_buffer.write(encode_varint(-1)) + + message_len = message_buffer.tell() + message_len_enc = encode_varint(message_len) + + required_size = message_len + len(message_len_enc) + # Check if we can write this message + if not (first_message or + required_size + len(self._buffer) <= self._batch_size): + return None, 0 + + self._buffer += message_len_enc + message_buffer.getvalue() + + message_buffer.close() + return None, required_size + + def write_header(self, use_compression_type=True): + batch_len = len(self._buffer) + self.HEADER_STRUCT.pack_into( + self._buffer, 0, + 0, # BaseOffset, set by broker + batch_len - self.AFTER_LEN_OFFSET, # Size from here to end + 0, # PartitionLeaderEpoch, set by broker + self._magic, + 0, # CRC will be set below, as we need a filled buffer for it + self._get_attributes(use_compression_type), + self._last_offset, + self._first_timestamp, + self._max_timestamp, + self._producer_id, + self._producer_epoch, + self._base_sequence, + self._num_records + ) + crc = calc_crc32c(self._buffer[self.ATTRIBUTES_OFFSET:]) + struct.pack_into(">I", self._buffer, self.CRC_OFFSET, crc) + + def _maybe_compress(self): + if self._compression_type != self.CODEC_NONE: + header_size = self.HEADER_STRUCT.size + data = bytes(self._buffer[header_size:]) + if self._compression_type == self.CODEC_GZIP: + compressed = gzip_encode(data) + elif self._compression_type == self.CODEC_SNAPPY: + compressed = snappy_encode(data) + elif self._compression_type == self.CODEC_LZ4: + compressed = lz4_encode(data) + compressed_size = len(compressed) + if len(data) <= compressed_size: + # We did not get any benefit from compression, lets send + # uncompressed + return False + else: + # Trim bytearray to the required size + needed_size = header_size + compressed_size + del self._buffer[needed_size:] + self._buffer[header_size:needed_size] = compressed + return True + return False + + def build(self): + send_compressed = self._maybe_compress() + self.write_header(send_compressed) + return self._buffer + + def size(self): + """ Return current size of data written to buffer + """ + return len(self._buffer) + + def size_in_bytes(self, offset, timestamp, key, value, headers): + if self._first_timestamp is not None: + timestamp_delta = timestamp - self._first_timestamp + else: + timestamp_delta = 0 + size_of_body = ( + 1 + # Attrs + size_of_varint(offset) + + size_of_varint(timestamp_delta) + + self.size_of(key, value, headers) + ) + return size_of_body + size_of_varint(size_of_body) + + @classmethod + def size_of(cls, key, value, headers): + size = 0 + # Key size + if key is None: + size += 1 + else: + key_len = len(key) + size += size_of_varint(key_len) + key_len + # Value size + if value is None: + size += 1 + else: + value_len = len(value) + size += size_of_varint(value_len) + value_len + # Header size + size += size_of_varint(len(headers)) + for h_key, h_value in headers: + h_key_len = len(h_key.encode("utf-8")) + size += size_of_varint(h_key_len) + h_key_len + + if h_value is None: + size += 1 + else: + h_value_len = len(h_value) + size += size_of_varint(h_value_len) + h_value_len + return size + + @classmethod + def estimate_size_in_bytes(cls, key, value, headers): + """ Get the upper bound estimate on the size of record + """ + return ( + cls.HEADER_STRUCT.size + cls.MAX_RECORD_OVERHEAD + + cls.size_of(key, value, headers) + ) diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 98c8e3004..7c6cac7c0 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -327,9 +327,10 @@ def __init__(self, magic, compression_type, batch_size): self._batch_size = batch_size self._buffer = bytearray() - def append(self, offset, timestamp, key, value): + def append(self, offset, timestamp, key, value, headers=None): """ Append message to batch. """ + assert not headers, "Headers not supported in v0/v1" # Check types if type(offset) != int: raise TypeError(offset) diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index c6a28be7b..f6ad8702a 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -24,6 +24,7 @@ from kafka.errors import CorruptRecordException from .abc import ABCRecords from .legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder +from .default_records import DefaultRecordBatch, DefaultRecordBatchBuilder class MemoryRecords(ABCRecords): @@ -102,18 +103,24 @@ def next_batch(self, _min_slice=MIN_SLICE, magic, = struct.unpack_from(">b", next_slice, _magic_offset) if magic <= 1: return LegacyRecordBatch(next_slice, magic) - else: # pragma: no cover - raise NotImplementedError("Record V2 still not implemented") + else: + return DefaultRecordBatch(next_slice) class MemoryRecordsBuilder(object): def __init__(self, magic, compression_type, batch_size): - assert magic in [0, 1], "Not supported magic" + assert magic in [0, 1, 2], "Not supported magic" assert compression_type in [0, 1, 2, 3], "Not valid compression type" - self._builder = LegacyRecordBatchBuilder( - magic=magic, compression_type=compression_type, - batch_size=batch_size) + if magic >= 2: + self._builder = DefaultRecordBatchBuilder( + magic=magic, compression_type=compression_type, + is_transactional=False, producer_id=-1, producer_epoch=-1, + base_sequence=-1, batch_size=batch_size) + else: + self._builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, + batch_size=batch_size) self._batch_size = batch_size self._buffer = None @@ -121,7 +128,7 @@ def __init__(self, magic, compression_type, batch_size): self._closed = False self._bytes_written = 0 - def append(self, timestamp, key, value): + def append(self, timestamp, key, value, headers): """ Append a message to the buffer. Returns: @@ -132,7 +139,7 @@ def append(self, timestamp, key, value): offset = self._next_offset checksum, actual_size = self._builder.append( - offset, timestamp, key, value) + offset, timestamp, key, value, headers) # Return of 0 size means there's no space to add a new message if actual_size == 0: return None, 0 diff --git a/kafka/record/util.py b/kafka/record/util.py index 098d6f458..304f6f141 100644 --- a/kafka/record/util.py +++ b/kafka/record/util.py @@ -1,5 +1,110 @@ import binascii +import six + +from ._crc32c import crc as crc32c_py + + +if six.PY3: + def _read_byte(memview, pos): + """ Read a byte from memoryview as an integer + + Raises: + IndexError: if position is out of bounds + """ + return memview[pos] +else: + def _read_byte(memview, pos): + """ Read a byte from memoryview as an integer + + Raises: + IndexError: if position is out of bounds + """ + return ord(memview[pos]) + + +def encode_varint(num): + """ Encode an integer to a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + num (int): Value to encode + + Returns: + bytearray: Encoded presentation of integer with length from 1 to 10 + bytes + """ + # Shift sign to the end of number + num = (num << 1) ^ (num >> 63) + # Max 10 bytes. We assert those are allocated + buf = bytearray(10) + + for i in range(10): + # 7 lowest bits from the number and set 8th if we still have pending + # bits left to encode + buf[i] = num & 0x7f | (0x80 if num > 0x7f else 0) + num = num >> 7 + if num == 0: + break + else: + # Max size of endcoded double is 10 bytes for unsigned values + raise ValueError("Out of double range") + return buf[:i + 1] + + +def size_of_varint(num): + """ Number of bytes needed to encode an integer in variable-length format. + """ + num = (num << 1) ^ (num >> 63) + res = 0 + while True: + res += 1 + num = num >> 7 + if num == 0: + break + return res + + +def decode_varint(buffer, pos=0): + """ Decode an integer from a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + buffer (bytes-like): any object acceptable by ``memoryview`` + pos (int): optional position to read from + + Returns: + (int, int): Decoded int value and next read position + """ + value = 0 + shift = 0 + memview = memoryview(buffer) + for i in range(pos, pos + 10): + try: + byte = _read_byte(memview, i) + except IndexError: + raise ValueError("End of byte stream") + if byte & 0x80 != 0: + value |= (byte & 0x7f) << shift + shift += 7 + else: + value |= byte << shift + break + else: + # Max size of endcoded double is 10 bytes for unsigned values + raise ValueError("Out of double range") + # Normalize sign + return (value >> 1) ^ -(value & 1), i + 1 + + +def calc_crc32c(memview): + """ Calculate CRC-32C (Castagnoli) checksum over a memoryview of data + """ + crc = crc32c_py(memview) + return crc + def calc_crc32(memview): """ Calculate simple CRC-32 checksum over a memoryview of data diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py new file mode 100644 index 000000000..58ede1f85 --- /dev/null +++ b/test/record/test_default_records.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals +import pytest +from kafka.record.default_records import ( + DefaultRecordBatch, DefaultRecordBatchBuilder +) + + +@pytest.mark.parametrize("compression_type", [ + DefaultRecordBatch.CODEC_NONE, + DefaultRecordBatch.CODEC_GZIP, + DefaultRecordBatch.CODEC_SNAPPY, + DefaultRecordBatch.CODEC_LZ4 +]) +def test_read_write_serde_v2(compression_type): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=compression_type, is_transactional=1, + producer_id=123456, producer_epoch=123, base_sequence=9999, + batch_size=999999) + headers = [] # [("header1", b"aaa"), ("header2", b"bbb")] + for offset in range(10): + builder.append( + offset, timestamp=9999999, key=b"test", value=b"Super", + headers=headers) + buffer = builder.build() + reader = DefaultRecordBatch(bytes(buffer)) + msgs = list(reader) + + assert reader.is_transactional is True + assert reader.compression_type == compression_type + assert reader.magic == 2 + assert reader.timestamp_type == 0 + assert reader.base_offset == 0 + for offset, msg in enumerate(msgs): + assert msg.offset == offset + assert msg.timestamp == 9999999 + assert msg.key == b"test" + assert msg.value == b"Super" + assert msg.headers == headers + + +def test_written_bytes_equals_size_in_bytes_v2(): + key = b"test" + value = b"Super" + headers = [("header1", b"aaa"), ("header2", b"bbb"), ("xx", None)] + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=999999) + + size_in_bytes = builder.size_in_bytes( + 0, timestamp=9999999, key=key, value=value, headers=headers) + + pos = builder.size() + builder.append(0, timestamp=9999999, key=key, value=value, headers=headers) + + assert builder.size() - pos == size_in_bytes + + +def test_estimate_size_in_bytes_bigger_than_batch_v2(): + key = b"Super Key" + value = b"1" * 100 + headers = [("header1", b"aaa"), ("header2", b"bbb")] + estimate_size = DefaultRecordBatchBuilder.estimate_size_in_bytes( + key, value, headers) + + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=999999) + builder.append( + 0, timestamp=9999999, key=key, value=value, headers=headers) + buf = builder.build() + assert len(buf) <= estimate_size, \ + "Estimate should always be upper bound" + + +def test_default_batch_builder_validates_arguments(): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=999999) + + # Key should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key="some string", value=None, headers=[]) + + # Value should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key=None, value="some string", headers=[]) + + # Timestamp should be of proper type + with pytest.raises(TypeError): + builder.append( + 0, timestamp="1243812793", key=None, value=b"some string", + headers=[]) + + # Offset of invalid type + with pytest.raises(TypeError): + builder.append( + "0", timestamp=9999999, key=None, value=b"some string", headers=[]) + + # Ok to pass value as None + builder.append( + 0, timestamp=9999999, key=b"123", value=None, headers=[]) + + # Timestamp can be None + builder.append( + 1, timestamp=None, key=None, value=b"some string", headers=[]) + + # Ok to pass offsets in not incremental order. This should not happen thou + builder.append( + 5, timestamp=9999999, key=b"123", value=None, headers=[]) + + # in case error handling code fails to fix inner buffer in builder + assert len(builder.build()) == 104 + + +def test_default_batch_size_limit(): + # First message can be added even if it's too big + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + + crc, size = builder.append( + 0, timestamp=None, key=None, value=b"M" * 2000, headers=[]) + assert size > 0 + assert crc is None + assert len(builder.build()) > 2000 + + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + crc, size = builder.append( + 0, timestamp=None, key=None, value=b"M" * 700, headers=[]) + assert size > 0 + crc, size = builder.append( + 1, timestamp=None, key=None, value=b"M" * 700, headers=[]) + assert size == 0 + assert crc is None + crc, size = builder.append( + 2, timestamp=None, key=None, value=b"M" * 700, headers=[]) + assert size == 0 + assert crc is None + assert len(builder.build()) < 1000 diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py index 2d766956b..40fdad8eb 100644 --- a/test/record/test_legacy_records.py +++ b/test/record/test_legacy_records.py @@ -1,8 +1,8 @@ +from __future__ import unicode_literals import pytest from kafka.record.legacy_records import ( LegacyRecordBatch, LegacyRecordBatchBuilder ) -from kafka.protocol.message import Message @pytest.mark.parametrize("magic", [0, 1]) @@ -27,9 +27,9 @@ def test_read_write_serde_v0_v1_no_compression(magic): @pytest.mark.parametrize("compression_type", [ - Message.CODEC_GZIP, - Message.CODEC_SNAPPY, - Message.CODEC_LZ4 + LegacyRecordBatch.CODEC_GZIP, + LegacyRecordBatch.CODEC_SNAPPY, + LegacyRecordBatch.CODEC_LZ4 ]) @pytest.mark.parametrize("magic", [0, 1]) def test_read_write_serde_v0_v1_with_compression(compression_type, magic): @@ -83,3 +83,67 @@ def test_estimate_size_in_bytes_bigger_than_batch(magic): buf = builder.build() assert len(buf) <= estimate_size, \ "Estimate should always be upper bound" + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_legacy_batch_builder_validates_arguments(magic): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=1024 * 1024) + + # Key should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key="some string", value=None) + + # Value should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key=None, value="some string") + + # Timestamp should be of proper type + with pytest.raises(TypeError): + builder.append( + 0, timestamp="1243812793", key=None, value=b"some string") + + # Offset of invalid type + with pytest.raises(TypeError): + builder.append( + "0", timestamp=9999999, key=None, value=b"some string") + + # Ok to pass value as None + builder.append( + 0, timestamp=9999999, key=b"123", value=None) + + # Timestamp can be None + builder.append( + 1, timestamp=None, key=None, value=b"some string") + + # Ok to pass offsets in not incremental order. This should not happen thou + builder.append( + 5, timestamp=9999999, key=b"123", value=None) + + # in case error handling code fails to fix inner buffer in builder + assert len(builder.build()) == 119 if magic else 95 + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_legacy_batch_size_limit(magic): + # First message can be added even if it's too big + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=1024) + crc, size = builder.append(0, timestamp=None, key=None, value=b"M" * 2000) + assert size > 0 + assert crc is not None + assert len(builder.build()) > 2000 + + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=1024) + crc, size = builder.append(0, timestamp=None, key=None, value=b"M" * 700) + assert size > 0 + crc, size = builder.append(1, timestamp=None, key=None, value=b"M" * 700) + assert size == 0 + assert crc is None + crc, size = builder.append(2, timestamp=None, key=None, value=b"M" * 700) + assert size == 0 + assert crc is None + assert len(builder.build()) < 1000 diff --git a/test/record/test_records.py b/test/record/test_records.py index fc3eacaef..7306bbc52 100644 --- a/test/record/test_records.py +++ b/test/record/test_records.py @@ -2,6 +2,26 @@ from kafka.record import MemoryRecords from kafka.errors import CorruptRecordException +# This is real live data from Kafka 11 broker +record_batch_data_v2 = [ + # First Batch value == "123" + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x00\x00\x01\x02\x03' + b'\x18\xa2p\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff{\x06<\x00\x00\x01]' + b'\xff{\x06<\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00' + b'\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00', + # Second Batch value = "" and value = "". 2 records + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x02\x02\xc8' + b'\\\xbd#\x00\x00\x00\x00\x00\x01\x00\x00\x01]\xff|\xddl\x00\x00\x01]\xff' + b'|\xde\x14\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00' + b'\x00\x00\x02\x0c\x00\x00\x00\x01\x00\x00\x0e\x00\xd0\x02\x02\x01\x00' + b'\x00', + # Third batch value = "123" + b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00;\x00\x00\x00\x02\x02.\x0b' + b'\x85\xb7\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff|\xe7\x9d\x00\x00\x01]' + b'\xff|\xe7\x9d\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' + b'\x00\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00' +] + record_batch_data_v1 = [ # First Message value == "123" b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00' @@ -34,6 +54,32 @@ ] +def test_memory_records_v2(): + data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4 + records = MemoryRecords(data_bytes) + + assert records.size_in_bytes() == 222 + assert records.valid_bytes() == 218 + + assert records.has_next() is True + batch = records.next_batch() + recs = list(batch) + assert len(recs) == 1 + assert recs[0].value == b"123" + assert recs[0].key is None + assert recs[0].timestamp == 1503229838908 + assert recs[0].timestamp_type == 0 + assert recs[0].checksum is None + assert recs[0].headers == [] + + assert records.next_batch() is not None + assert records.next_batch() is not None + + assert records.has_next() is False + assert records.next_batch() is None + assert records.next_batch() is None + + def test_memory_records_v1(): data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4 records = MemoryRecords(data_bytes) diff --git a/test/record/test_util.py b/test/record/test_util.py new file mode 100644 index 000000000..1c0e5958c --- /dev/null +++ b/test/record/test_util.py @@ -0,0 +1,92 @@ +import struct +import pytest +from kafka.record import util + + +varint_data = [ + (b"\x00", 0), + (b"\x01", -1), + (b"\x02", 1), + (b"\x7E", 63), + (b"\x7F", -64), + (b"\x80\x01", 64), + (b"\x81\x01", -65), + (b"\xFE\x7F", 8191), + (b"\xFF\x7F", -8192), + (b"\x80\x80\x01", 8192), + (b"\x81\x80\x01", -8193), + (b"\xFE\xFF\x7F", 1048575), + (b"\xFF\xFF\x7F", -1048576), + (b"\x80\x80\x80\x01", 1048576), + (b"\x81\x80\x80\x01", -1048577), + (b"\xFE\xFF\xFF\x7F", 134217727), + (b"\xFF\xFF\xFF\x7F", -134217728), + (b"\x80\x80\x80\x80\x01", 134217728), + (b"\x81\x80\x80\x80\x01", -134217729), + (b"\xFE\xFF\xFF\xFF\x7F", 17179869183), + (b"\xFF\xFF\xFF\xFF\x7F", -17179869184), + (b"\x80\x80\x80\x80\x80\x01", 17179869184), + (b"\x81\x80\x80\x80\x80\x01", -17179869185), + (b"\xFE\xFF\xFF\xFF\xFF\x7F", 2199023255551), + (b"\xFF\xFF\xFF\xFF\xFF\x7F", -2199023255552), + (b"\x80\x80\x80\x80\x80\x80\x01", 2199023255552), + (b"\x81\x80\x80\x80\x80\x80\x01", -2199023255553), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\x7F", 281474976710655), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -281474976710656), + (b"\x80\x80\x80\x80\x80\x80\x80\x01", 281474976710656), + (b"\x81\x80\x80\x80\x80\x80\x80\x01", -281474976710657), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 36028797018963967), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -36028797018963968), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x01", 36028797018963968), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x01", -36028797018963969), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 4611686018427387903), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -4611686018427387904), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x01", 4611686018427387904), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x01", -4611686018427387905), +] + + +@pytest.mark.parametrize("encoded, decoded", varint_data) +def test_encode_varint(encoded, decoded): + assert util.encode_varint(decoded) == encoded + + +@pytest.mark.parametrize("encoded, decoded", varint_data) +def test_decode_varint(encoded, decoded): + # We add a bit of bytes around just to check position is calculated + # correctly + value, pos = util.decode_varint(b"\x01\xf0" + encoded + b"\xff\x01", 2) + assert value == decoded + assert pos - 2 == len(encoded) + + +@pytest.mark.parametrize("encoded, decoded", varint_data) +def test_size_of_varint(encoded, decoded): + assert util.size_of_varint(decoded) == len(encoded) + + +def test_crc32c(): + def make_crc(data): + crc = util.calc_crc32c(data) + return struct.pack(">I", crc) + assert make_crc(b"") == b"\x00\x00\x00\x00" + assert make_crc(b"a") == b"\xc1\xd0\x43\x30" + + # Took from librdkafka testcase + long_text = b"""\ + This software is provided 'as-is', without any express or implied + warranty. In no event will the author be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + 3. This notice may not be removed or altered from any source distribution.""" + assert make_crc(long_text) == b"\x7d\xcd\xe1\x13" diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 364a80831..ef3f6866d 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -54,7 +54,7 @@ def _build_record_batch(msgs, compression=0): magic=1, compression_type=0, batch_size=9999999) for msg in msgs: key, value, timestamp = msg - builder.append(key=key, value=value, timestamp=timestamp) + builder.append(key=key, value=value, timestamp=timestamp, headers=[]) builder.close() return builder.buffer()