Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proto): add remaining protocol for V2.1 #2573

Merged
merged 3 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (a *AddPartitionsToTxnResponse) encode(pe packetEncoder) error {
}

func (a *AddPartitionsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
a.Version = version
throttleTime, err := pd.getInt32()
if err != nil {
return err
Expand Down
23 changes: 12 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,18 +912,19 @@ func (client *client) getOffset(topic string, partitionID int32, time int64) (in
}

request := &OffsetRequest{}
// Version 1 removes MaxNumOffsets. From this version forward, only a single
// offset can be returned.
if client.conf.Version.IsAtLeast(V0_10_1_0) {
request.Version = 1
}
// Version 2 adds the isolation level, which is used for transactional reads.
if client.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 2
}
// Version 3 is the same as version 2.
if client.conf.Version.IsAtLeast(V2_0_0_0) {
if client.conf.Version.IsAtLeast(V2_1_0_0) {
// Version 4 adds the current leader epoch, which is used for fencing.
request.Version = 4
} else if client.conf.Version.IsAtLeast(V2_0_0_0) {
// Version 3 is the same as version 2.
request.Version = 3
} else if client.conf.Version.IsAtLeast(V0_11_0_0) {
// Version 2 adds the isolation level, which is used for transactional reads.
request.Version = 2
} else if client.conf.Version.IsAtLeast(V0_10_1_0) {
// Version 1 removes MaxNumOffsets. From this version forward, only a single
// offset can be returned.
request.Version = 1
}

request.AddBlock(topic, partitionID, time, 1)
Expand Down
24 changes: 21 additions & 3 deletions offset_request.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package sarama

type offsetRequestBlock struct {
time int64
maxOffsets int32 // Only used in version 0
currentLeaderEpoch int32 // used in version 4+
time int64
maxOffsets int32 // Only used in version 0
}

func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
if version >= 4 {
pe.putInt32(b.currentLeaderEpoch)
}

pe.putInt64(b.time)

if version == 0 {
pe.putInt32(b.maxOffsets)
}
Expand All @@ -15,14 +21,23 @@ func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
}

func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
b.currentLeaderEpoch = -1
if version >= 4 {
if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
}

if b.time, err = pd.getInt64(); err != nil {
return err
}

if version == 0 {
if b.maxOffsets, err = pd.getInt32(); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -138,11 +153,13 @@ func (r *OffsetRequest) headerVersion() int16 {
}

func (r *OffsetRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 3
return r.Version >= 0 && r.Version <= 4
}

func (r *OffsetRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 4:
return V2_1_0_0
case 3:
return V2_0_0_0
case 2:
Expand Down Expand Up @@ -178,6 +195,7 @@ func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, ma
}

tmp := new(offsetRequestBlock)
tmp.currentLeaderEpoch = -1
tmp.time = time
if r.Version == 0 {
tmp.maxOffsets = maxOffsets
Expand Down
20 changes: 20 additions & 0 deletions offset_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ var (
0x00, 0x00, 0x00, 0x2a,
0x00, 0x00, 0x00, 0x00,
}

offsetRequestV4 = []byte{
0xff, 0xff, 0xff, 0xff, // replicaID
0x01, // IsolationLevel
0x00, 0x00, 0x00, 0x01,
0x00, 0x04,
0x64, 0x6e, 0x77, 0x65, // topic name
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x09, // partitionID
0xff, 0xff, 0xff, 0xff, // leader epoch
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // timestamp
}
)

func TestOffsetRequest(t *testing.T) {
Expand Down Expand Up @@ -86,3 +98,11 @@ func TestOffsetRequestReplicaID(t *testing.T) {

testRequest(t, "with replica ID", request, offsetRequestReplicaID)
}

func TestOffsetRequestV4(t *testing.T) {
request := new(OffsetRequest)
request.Version = 4
request.IsolationLevel = ReadCommitted
request.AddBlock("dnwe", 9, -1, -1)
testRequest(t, "V4", request, offsetRequestV4)
}
54 changes: 37 additions & 17 deletions offset_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package sarama
import "time"

type OffsetResponseBlock struct {
Err KError
Offsets []int64 // Version 0
Offset int64 // Version 1
Timestamp int64 // Version 1
Err KError
// Offsets contains the result offsets (for V0/V1 compatibility)
Offsets []int64 // Version 0
// Timestamp contains the timestamp associated with the returned offset.
Timestamp int64 // Version 1
// Offset contains the returned offset.
Offset int64 // Version 1
// LeaderEpoch contains the current leader epoch of the partition.
LeaderEpoch int32
}

func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand All @@ -18,22 +23,29 @@ func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error

if version == 0 {
b.Offsets, err = pd.getInt64Array()

return err
}

b.Timestamp, err = pd.getInt64()
if err != nil {
return err
}
if version >= 1 {
b.Timestamp, err = pd.getInt64()
if err != nil {
return err
}

b.Offset, err = pd.getInt64()
if err != nil {
return err
b.Offset, err = pd.getInt64()
if err != nil {
return err
}

// For backwards compatibility put the offset in the offsets array too
b.Offsets = []int64{b.Offset}
}

// For backwards compatibility put the offset in the offsets array too
b.Offsets = []int64{b.Offset}
if version >= 4 {
if b.LeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
}

return nil
}
Expand All @@ -45,8 +57,14 @@ func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error
return pe.putInt64Array(b.Offsets)
}

pe.putInt64(b.Timestamp)
pe.putInt64(b.Offset)
if version >= 1 {
pe.putInt64(b.Timestamp)
pe.putInt64(b.Offset)
}

if version >= 4 {
pe.putInt32(b.LeaderEpoch)
}

return nil
}
Expand Down Expand Up @@ -168,11 +186,13 @@ func (r *OffsetResponse) headerVersion() int16 {
}

func (r *OffsetResponse) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 3
return r.Version >= 0 && r.Version <= 4
}

func (r *OffsetResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 4:
return V2_1_0_0
case 3:
return V2_0_0_0
case 2:
Expand Down
18 changes: 18 additions & 0 deletions offset_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ var (
0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
}

offsetResponseV4 = []byte{
0x00, 0x00, 0x00, 0x00, // throttle time
0x00, 0x00, 0x00, 0x01, // length of topics
0x00, 0x04, 0x64, 0x6e, 0x77, 0x65, // topic name
0x00, 0x00, 0x00, 0x01, // length of partitions
0x00, 0x00, 0x00, 0x09, // partitionID
0x00, 0x00, // err
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // timestamp
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset
0xff, 0xff, 0xff, 0xff, // leaderEpoch
}
)

func TestEmptyOffsetResponse(t *testing.T) {
Expand Down Expand Up @@ -115,3 +127,9 @@ func TestNormalOffsetResponseV1(t *testing.T) {
t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
}
}

func TestOffsetResponseV4(t *testing.T) {
response := OffsetResponse{}

testVersionDecodable(t, "v4", &response, offsetResponseV4, 4)
}
29 changes: 29 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func allocateBody(key, version int16) protocolBody {
return &OffsetRequest{Version: version}
case 3:
return &MetadataRequest{Version: version}
// 4: LeaderAndIsrRequest
// 5: StopReplicaRequest
// 6: UpdateMetadataRequest
// 7: ControlledShutdownRequest
case 8:
return &OffsetCommitRequest{Version: version}
case 9:
Expand Down Expand Up @@ -157,12 +161,14 @@ func allocateBody(key, version int16) protocolBody {
return &DeleteRecordsRequest{Version: version}
case 22:
return &InitProducerIDRequest{Version: version}
// 23: OffsetForLeaderEpochRequest
case 24:
return &AddPartitionsToTxnRequest{Version: version}
case 25:
return &AddOffsetsToTxnRequest{Version: version}
case 26:
return &EndTxnRequest{Version: version}
// 27: WriteTxnMarkersRequest
case 28:
return &TxnOffsetCommitRequest{Version: version}
case 29:
Expand All @@ -175,14 +181,20 @@ func allocateBody(key, version int16) protocolBody {
return &DescribeConfigsRequest{Version: version}
case 33:
return &AlterConfigsRequest{Version: version}
// 34: AlterReplicaLogDirsRequest
case 35:
return &DescribeLogDirsRequest{Version: version}
case 36:
return &SaslAuthenticateRequest{Version: version}
case 37:
return &CreatePartitionsRequest{Version: version}
// 38: CreateDelegationTokenRequest
// 39: RenewDelegationTokenRequest
// 40: ExpireDelegationTokenRequest
// 41: DescribeDelegationTokenRequest
case 42:
return &DeleteGroupsRequest{Version: version}
// 43: ElectLeadersRequest
case 44:
return &IncrementalAlterConfigsRequest{Version: version}
case 45:
Expand All @@ -199,6 +211,23 @@ func allocateBody(key, version int16) protocolBody {
return &DescribeUserScramCredentialsRequest{Version: version}
case 51:
return &AlterUserScramCredentialsRequest{Version: version}
// 52: VoteRequest
// 53: BeginQuorumEpochRequest
// 54: EndQuorumEpochRequest
// 55: DescribeQuorumRequest
// 56: AlterPartitionRequest
// 57: UpdateFeaturesRequest
// 58: EnvelopeRequest
// 59: FetchSnapshotRequest
// 60: DescribeClusterRequest
// 61: DescribeProducersRequest
// 62: BrokerRegistrationRequest
// 63: BrokerHeartbeatRequest
// 64: UnregisterBrokerRequest
// 65: DescribeTransactionsRequest
// 66: ListTransactionsRequest
// 67: AllocateProducerIdsRequest
// 68: ConsumerGroupHeartbeatRequest
}
return nil
}
48 changes: 48 additions & 0 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,54 @@ func TestAllocateBodyProtocolVersions(t *testing.T) {
42: 1,
},
},
{
V2_1_0_0,
map[int16]int16{
0: 7,
1: 10,
2: 4,
3: 7,
4: 1,
5: 0,
6: 4,
7: 1,
8: 6,
9: 5,
10: 2,
11: 3,
12: 2,
13: 2,
14: 2,
15: 2,
16: 2,
17: 1,
18: 2,
19: 3,
20: 3,
21: 1,
22: 1,
23: 2,
24: 1,
25: 1,
26: 1,
27: 0,
28: 2,
29: 1,
30: 1,
31: 1,
32: 2,
33: 1,
34: 1,
35: 1,
36: 0,
37: 1,
38: 1,
39: 1,
40: 1,
41: 1,
42: 1,
},
},
}

for _, tt := range tests {
Expand Down
5 changes: 4 additions & 1 deletion transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,10 @@ func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets,
GroupID: groupId,
Topics: offsets.mapToRequest(),
}
if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
if t.client.Config().Version.IsAtLeast(V2_1_0_0) {
// Version 2 adds the committed leader epoch.
request.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}
Expand Down
Loading