Skip to content

Commit

Permalink
Implement KIP-204 : DeleteRecords API (#969)
Browse files Browse the repository at this point in the history
* Implement KIP-202 : DeleteRecords API

When doing stream processing, it is convinient to use "transient" topic
:
* retention time is infinite
* records get deleted when consumed

The java kafka streams client is using the deleteRecords of the admin
client to perform this operation. It is lacking in aiokafka

The KIP reference https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API

refs #967

* Use common method to get metadata

* Explain the unpacking catch all

* Remove usage of TaggedFields

TaggedFields doesn't seem to work properly at the moment. Maybe they
should be replaced by an implementation closer to the java client with
their "flexibleVersions"

* Fix linting errors (format)

* Add change log

---------

Co-authored-by: Vincent Maurin <[email protected]>
Co-authored-by: Denis Otkidach <[email protected]>
  • Loading branch information
3 people authored Jan 29, 2024
1 parent f8d0d15 commit 82695b0
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES/969.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement DeleteRecords API (KIP-204) (pr #969 by @vmaurin)
3 changes: 2 additions & 1 deletion aiokafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .client import AIOKafkaAdminClient
from .new_partitions import NewPartitions
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"]
__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"]
62 changes: 61 additions & 1 deletion aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import IncompatibleBrokerVersion, for_code
from aiokafka.errors import (
IncompatibleBrokerVersion,
LeaderNotAvailableError,
NotLeaderForPartitionError,
for_code,
)
from aiokafka.protocol.admin import (
AlterConfigsRequest,
ApiVersionRequest_v0,
CreatePartitionsRequest,
CreateTopicsRequest,
DeleteRecordsRequest,
DeleteTopicsRequest,
DescribeConfigsRequest,
DescribeGroupsRequest,
Expand All @@ -24,6 +30,7 @@

from .config_resource import ConfigResource, ConfigResourceType
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -605,3 +612,56 @@ async def list_consumer_group_offsets(
offset_plus_meta = OffsetAndMetadata(offset, metadata)
response_dict[tp] = offset_plus_meta
return response_dict

async def delete_records(
self,
records_to_delete: Dict[TopicPartition, RecordsToDelete],
timeout_ms: Optional[int] = None,
) -> Dict[TopicPartition, int]:
"""Delete records from partitions.
:param records_to_delete: A map of RecordsToDelete for each TopicPartition
:param timeout_ms: Milliseconds to wait for the deletion to complete.
:return: Appropriate version of DeleteRecordsResponse class.
"""
version = self._matching_api_version(DeleteRecordsRequest)

metadata = await self._get_cluster_metadata()

self._client.cluster.update_metadata(metadata)

requests = defaultdict(lambda: defaultdict(list))
responses = {}

for tp, records in records_to_delete.items():
leader = self._client.cluster.leader_for_partition(tp)
if leader is None:
raise NotLeaderForPartitionError()
elif leader == -1:
raise LeaderNotAvailableError()
requests[leader][tp.topic].append((tp.partition, records))

req_cls = DeleteRecordsRequest[version]

for leader, delete_request in requests.items():
request = req_cls(
self._convert_records_to_delete(delete_request),
timeout_ms or self._request_timeout_ms,
)
response = await self._client.send(leader, request)
for topic, partitions in response.topics:
for partition_index, low_watermark, error_code in partitions:
if error_code:
err = for_code(error_code)
raise err
responses[TopicPartition(topic, partition_index)] = low_watermark
return responses

@staticmethod
def _convert_records_to_delete(
records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]],
):
return [
(topic, [(partition, rec.before_offset) for partition, rec in records])
for topic, records in records_to_delete.items()
]
12 changes: 12 additions & 0 deletions aiokafka/admin/records_to_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class RecordsToDelete:
"""A class for deleting records on existing topics.
Arguments:
before_offset (int):
delete all the records before the given offset
"""

def __init__(
self,
before_offset,
):
self.before_offset = before_offset
3 changes: 2 additions & 1 deletion aiokafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ def update_metadata(self, metadata):
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
_new_partitions[topic] = {}
for p_error, partition, leader, replicas, isr in partitions:
# Starting with v5, MetadataResponse contains more than 5 fields
for p_error, partition, leader, replicas, isr, *_ in partitions:
_new_partitions[topic][partition] = PartitionMetadata(
topic=topic,
partition=partition,
Expand Down
123 changes: 123 additions & 0 deletions aiokafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,3 +1276,126 @@ class ListPartitionReassignmentsRequest_v0(Request):
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]

ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]


class DeleteRecordsResponse_v0(Response):
API_KEY = 21
API_VERSION = 0
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
),
),
),
),
)


class DeleteRecordsResponse_v1(Response):
API_KEY = 21
API_VERSION = 1
SCHEMA = DeleteRecordsResponse_v0.SCHEMA


class DeleteRecordsResponse_v2(Response):
API_KEY = 21
API_VERSION = 2
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
)


class DeleteRecordsRequest_v0(Request):
API_KEY = 21
API_VERSION = 0
RESPONSE_TYPE = DeleteRecordsResponse_v0
SCHEMA = Schema(
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("offset", Int64),
),
),
),
),
("timeout_ms", Int32),
)


class DeleteRecordsRequest_v1(Request):
API_KEY = 21
API_VERSION = 1
RESPONSE_TYPE = DeleteRecordsResponse_v1
SCHEMA = DeleteRecordsRequest_v0.SCHEMA


class DeleteRecordsRequest_v2(Request):
API_KEY = 21
API_VERSION = 2
FLEXIBLE_VERSION = True
RESPONSE_TYPE = DeleteRecordsResponse_v2
SCHEMA = Schema(
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("offset", Int64),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("timeout_ms", Int32),
("tags", TaggedFields),
)


DeleteRecordsRequest = [
DeleteRecordsRequest_v0,
DeleteRecordsRequest_v1,
# FIXME: We have some problems with `TaggedFields`
# DeleteRecordsRequest_v2,
]

DeleteRecordsResponse = [
DeleteRecordsResponse_v0,
DeleteRecordsResponse_v1,
# FIXME: We have some problems with `TaggedFields`
# DeleteRecordsResponse_v2,
]
1 change: 1 addition & 0 deletions aiokafka/protocol/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def encode(self, value):
return UnsignedVarInt32.encode(len(value) + 1) + value


# FIXME: TaggedFields doesn't seem to work properly so they should be avoided
class TaggedFields(AbstractType):
@classmethod
def decode(cls, data):
Expand Down
37 changes: 36 additions & 1 deletion tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic
from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete
from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType
from aiokafka.consumer import AIOKafkaConsumer
from aiokafka.producer import AIOKafkaProducer
Expand Down Expand Up @@ -201,3 +201,38 @@ async def test_list_consumer_group_offsets(self):
assert resp[tp].offset == msg.offset + 1
resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp])
assert resp[tp].offset == msg.offset + 1

@kafka_versions(">=1.1.0")
@run_until_complete
async def test_delete_records(self):
admin = await self.create_admin()

await admin.create_topics([NewTopic(self.topic, 1, 1)])

async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer:
first_message = await producer.send_and_wait(
self.topic, partition=0, value=b"some-message"
)
await producer.send_and_wait(
self.topic, partition=0, value=b"other-message"
)

await admin.delete_records(
{
TopicPartition(self.topic, 0): RecordsToDelete(
before_offset=first_message.offset + 1
)
}
)

consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=self.hosts,
enable_auto_commit=False,
auto_offset_reset="earliest",
)
await consumer.start()
self.add_cleanup(consumer.stop)

msg = await consumer.getone()
assert msg.value == b"other-message"

0 comments on commit 82695b0

Please sign in to comment.