From 97f65c78943cbc4956eca1277befd8df655e6f28 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Wed, 14 Feb 2018 22:45:53 -0500 Subject: [PATCH] Document recordbatch offset changes --- produce_set.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/produce_set.go b/produce_set.go index 98cb26eaf..13be2b3c9 100644 --- a/produce_set.go +++ b/produce_set.go @@ -123,6 +123,13 @@ func (ps *produceSet) buildRequest() *ProduceRequest { for topic, partitionSet := range ps.msgs { for partition, set := range partitionSet { if req.Version >= 3 { + // If the API version we're hitting is 3 or greater, we need to calculate + // offsets for each record in the batch relative to FirstOffset. + // Additionally, we must set LastOffsetDelta to the value of the last offset + // in the batch. Since the OffsetDelta of the first record is 0, we know that the + // final record of any batch will have an offset of (# of records in batch) - 1. + // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + // under the RecordBatch section for details.) rb := set.recordsToSend.RecordBatch if len(rb.Records) > 0 { rb.LastOffsetDelta = int32(len(rb.Records) - 1)