Skip to content

Commit

Permalink
Merge pull request #394 from Shopify/retry-get-offset
Browse files Browse the repository at this point in the history
Add retry logic to GetOffset() just like Leader()
  • Loading branch information
eapache committed Apr 7, 2015
2 parents 9b048b0 + f922f12 commit fe647d8
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 21 deletions.
57 changes: 36 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,31 +292,16 @@ func (client *client) RefreshMetadata(topics ...string) error {
}

func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
broker, err := client.Leader(topic, partitionID)
if err != nil {
return -1, err
}

request := &OffsetRequest{}
request.AddBlock(topic, partitionID, time, 1)
offset, err := client.getOffset(topic, partitionID, time)

response, err := broker.GetAvailableOffsets(request)
if err != nil {
return -1, err
}

block := response.GetBlock(topic, partitionID)
if block == nil {
return -1, ErrIncompleteResponse
}
if block.Err != ErrNoError {
return -1, block.Err
}
if len(block.Offsets) != 1 {
return -1, ErrOffsetOutOfRange
if err := client.RefreshMetadata(topic); err != nil {
return -1, err
}
return client.getOffset(topic, partitionID, time)
}

return block.Offsets[0], nil
return offset, err
}

// private broker management helpers
Expand Down Expand Up @@ -442,6 +427,36 @@ func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, er
return nil, ErrUnknownTopicOrPartition
}

func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
broker, err := client.Leader(topic, partitionID)
if err != nil {
return -1, err
}

request := &OffsetRequest{}
request.AddBlock(topic, partitionID, time, 1)

response, err := broker.GetAvailableOffsets(request)
if err != nil {
_ = broker.Close()
return -1, err
}

block := response.GetBlock(topic, partitionID)
if block == nil {
_ = broker.Close()
return -1, ErrIncompleteResponse
}
if block.Err != ErrNoError {
return -1, block.Err
}
if len(block.Offsets) != 1 {
return -1, ErrOffsetOutOfRange
}

return block.Offsets[0], nil
}

// core metadata update logic

func (client *client) backgroundMetadataUpdater() {
Expand Down
48 changes: 48 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,54 @@ func TestClientMetadata(t *testing.T) {
safeClose(t, client)
}

func TestClientGetOffset(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
leaderAddr := leader.Addr()

metadata := new(MetadataResponse)
metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadata.AddBroker(leaderAddr, leader.BrokerID())
seedBroker.Returns(metadata)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}

offsetResponse := new(OffsetResponse)
offsetResponse.AddTopicPartition("foo", 0, 123)
leader.Returns(offsetResponse)

offset, err := client.GetOffset("foo", 0, OffsetNewest)
if err != nil {
t.Error(err)
}
if offset != 123 {
t.Error("Unexpected offset, got ", offset)
}

leader.Close()
seedBroker.Returns(metadata)

leader = newMockBrokerAddr(t, 2, leaderAddr)
offsetResponse = new(OffsetResponse)
offsetResponse.AddTopicPartition("foo", 0, 456)
leader.Returns(offsetResponse)

offset, err = client.GetOffset("foo", 0, OffsetNewest)
if err != nil {
t.Error(err)
}
if offset != 456 {
t.Error("Unexpected offset, got ", offset)
}

seedBroker.Close()
leader.Close()
safeClose(t, client)
}

func TestClientReceivingUnknownTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)

Expand Down

0 comments on commit fe647d8

Please sign in to comment.