Skip to content

Commit

Permalink
broker: test offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Oct 28, 2017
1 parent 790c1e9 commit f929635
Showing 1 changed file with 60 additions and 26 deletions.
86 changes: 60 additions & 26 deletions broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,37 +234,71 @@ func TestBroker_Run(t *testing.T) {
},
},
{
name: "produce version 2 ok",
name: "offsets",
fields: newFields(),
args: args{
requestCh: make(chan jocko.Request, 2),
responseCh: make(chan jocko.Response, 2),
requests: []jocko.Request{{
Header: &protocol.RequestHeader{CorrelationID: 1},
Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{
Topic: "the-topic",
NumPartitions: 1,
ReplicationFactor: 1,
}}}}, {
Header: &protocol.RequestHeader{CorrelationID: 2},
Request: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{
Topic: "the-topic",
Data: []*protocol.Data{{
RecordSet: mustEncode(&protocol.MessageSet{Offset: 1, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}}},
requests: []jocko.Request{
{
Header: &protocol.RequestHeader{CorrelationID: 1},
Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{
Topic: "the-topic",
NumPartitions: 1,
ReplicationFactor: 1,
}}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 2},
Request: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{
Topic: "the-topic",
Data: []*protocol.Data{{
RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 3},
Request: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -1}}}}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 4},
Request: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -2}}}}},
},
},
responses: []jocko.Response{{
Header: &protocol.RequestHeader{CorrelationID: 1},
Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{
TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}},
}},
}, {
Header: &protocol.RequestHeader{CorrelationID: 2},
Response: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{
Responses: []*protocol.ProduceResponse{{
Topic: "the-topic",
PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, ErrorCode: protocol.ErrNone.Code()}},
responses: []jocko.Response{
{
Header: &protocol.RequestHeader{CorrelationID: 1},
Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{
TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}},
}},
}}}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 2},
Response: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{
Responses: []*protocol.ProduceResponse{{
Topic: "the-topic",
PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, BaseOffset: 0, ErrorCode: protocol.ErrNone.Code()}},
}},
}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 3},
Response: &protocol.Response{CorrelationID: 3, Body: &protocol.OffsetsResponse{
Responses: []*protocol.OffsetResponse{{
Topic: "the-topic",
PartitionResponses: []*protocol.PartitionResponse{{Partition: 0, Offsets: []int64{1}, ErrorCode: protocol.ErrNone.Code()}},
}},
}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 4},
Response: &protocol.Response{CorrelationID: 4, Body: &protocol.OffsetsResponse{
Responses: []*protocol.OffsetResponse{{
Topic: "the-topic",
PartitionResponses: []*protocol.PartitionResponse{{Partition: 0, Offsets: []int64{0}, ErrorCode: protocol.ErrNone.Code()}},
}},
}},
},
},
},
},
{
Expand Down Expand Up @@ -316,7 +350,7 @@ func TestBroker_Run(t *testing.T) {
response := <-tt.args.responseCh

switch res := response.Response.(*protocol.Response).Body.(type) {
// handle timepstamp explicity since we don't know what
// handle timestamp explicitly since we don't know what
// it'll be set to
case *protocol.ProduceResponses:
for _, response := range res.Responses {
Expand Down

0 comments on commit f929635

Please sign in to comment.