Skip to content

Commit

Permalink
adding in evan's suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
Kai Hayashi committed Nov 13, 2014
1 parent 6c4fe8d commit 42ee2da
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 42 deletions.
27 changes: 20 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTim
func (client *Client) disconnectBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()
Logger.Printf("Disconnecting Broker %d\n", broker.ID())
Logger.Printf("Disconnecting Broker %d %s\n", broker.ID(), broker.Addr())

client.deadBrokerAddrs[broker.addr] = struct{}{}

Expand Down Expand Up @@ -264,17 +264,21 @@ func (client *Client) fanoutMetadataRequest(topics []string) (*MetadataResponse,

fanout := newMetadataFanout(client.config.MetadataConcurrentConnections)

if len(client.brokers) < 1 {
fanout.Fetch(client.extraBroker, client.id, &MetadataRequest{Topics: topics})
if len(client.brokers) < 1 && client.seedBroker != nil {
Logger.Printf("Fanning out %v\n", client.seedBroker)
fanout.Fetch(client.seedBroker, client.id, &MetadataRequest{Topics: topics})
} else {
for _, broker := range client.brokers {
Logger.Printf("Fanning out %v\n", broker)
b := broker
fanout.Fetch(b, client.id, &MetadataRequest{Topics: topics})
fanout.Fetch(broker, client.id, &MetadataRequest{Topics: topics})
}
}

go fanout.WaitAndCleanup()
result := <-fanout.GetResult
if result == nil {
return nil, OutOfBrokers
}
return result.response, result.err
}

Expand Down Expand Up @@ -317,8 +321,17 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
// didn't even send, return the error
return err
default:
// means all brokers are timedout... At what level should we disconnect a broker?
if retries > 0 {
client.lock.RLock()
shouldTryNextSeedBroker := len(client.brokers) < 1 && client.seedBroker != nil
client.lock.RUnlock()

if retries > 0 && shouldTryNextSeedBroker {
// We are getting broker information and our first seed errored on metadata connect.
// Retry on the next seed.
client.disconnectBroker(client.seedBroker)
return client.refreshMetadata(topics, retries-1)
} else if retries > 0 {
// means all brokers are timedout... At what level should we disconnect a broker?
Logger.Printf("Out of available brokers. Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retries)
time.Sleep(client.config.WaitForElection)
client.resurrectDeadBrokers()
Expand Down
1 change: 0 additions & 1 deletion consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func TestSimpleConsumer(t *testing.T) {
t.Error("Incorrect message offset!")
}
}

}

func TestConsumerRawOffset(t *testing.T) {
Expand Down
52 changes: 27 additions & 25 deletions metadata_fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package sarama
import "sync"

type metadataFanout struct {
wg sync.WaitGroup
cleanupWaiting sync.Once
initClose sync.Once
results chan *fanoutResult
closer chan struct{}
throttle chan struct{}
wg sync.WaitGroup
results chan *fanoutResult
closer chan struct{}
throttle chan struct{}

GetResult chan *fanoutResult // Stores the first non-error result from the metadata fanout.
Done chan struct{} // optional channel to notify outside when clean up is complete.
Expand All @@ -26,12 +24,9 @@ type metadataFetcher interface {

func newMetadataFanout(maxConcurrentConnections int) *metadataFanout {
f := &metadataFanout{
wg: sync.WaitGroup{},
cleanupWaiting: sync.Once{},
initClose: sync.Once{},
closer: make(chan struct{}),
results: make(chan *fanoutResult, 1),
throttle: make(chan struct{}, maxConcurrentConnections),
closer: make(chan struct{}),
results: make(chan *fanoutResult, 1),
throttle: make(chan struct{}, maxConcurrentConnections),

GetResult: make(chan *fanoutResult),
Done: make(chan struct{}),
Expand All @@ -43,7 +38,6 @@ func newMetadataFanout(maxConcurrentConnections int) *metadataFanout {
// Fetch is called by the thread initiating the fanout.
func (f *metadataFanout) Fetch(broker metadataFetcher, clientId string, request *MetadataRequest) {
f.wg.Add(1)
f.cleanupWaiting.Do(func() { go f.waitAndCleanup(f.Done) })
go func(worker metadataFetcher) {
// Wait until throttle allows us to continue or someone else closes our channel
var checkout struct{}
Expand All @@ -54,21 +48,29 @@ func (f *metadataFanout) Fetch(broker metadataFetcher, clientId string, request
case f.throttle <- checkout:
}

// Since the above select block can potentially choose the second case even if f.closer is closed
// we double check that case here. See https://goo.gl/lqydfp
select {
case <-f.closer:
f.wg.Done()
return
default:
r, e := worker.GetMetadata(clientId, request)
f.results <- &fanoutResult{
err: e,
response: r,
}
}

r, e := worker.GetMetadata(clientId, request)
select {
case <-f.closer:
f.wg.Done()
return
case f.results <- &fanoutResult{
err: e,
response: r,
}:
}
}(broker)
}

func (f *metadataFanout) waitAndCleanup(doneChan chan struct{}) {
func (f *metadataFanout) WaitAndCleanup() {
// Wait until all workers have returned or a successful result has come
// and workers can shutdown
f.wg.Wait()
Expand All @@ -82,16 +84,16 @@ func (f *metadataFanout) waitAndCleanup(doneChan chan struct{}) {
// If no valid result is found it emits the last failing result.
func (f *metadataFanout) Listen() {
var lastResult *fanoutResult
goodResultFound := false
for r := range f.results {
f.wg.Done()
lastResult = r
if r.err == nil {
f.initClose.Do(func() {
close(f.closer)
f.GetResult <- r
})
if r.err == nil && !goodResultFound {
close(f.closer)
f.GetResult <- r
goodResultFound = true
}
<-f.throttle
f.wg.Done()
}
select {
case <-f.closer:
Expand Down
15 changes: 6 additions & 9 deletions metadata_fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,9 @@ func TestMetadataFanout(t *testing.T) {
},
}
for _, fetch := range fetchers {
func(ff metadataFetcher) {
f.Fetch(ff, "id1", &MetadataRequest{Topics: []string{"topics"}})
}(fetch)
f.Fetch(fetch, "id1", &MetadataRequest{Topics: []string{"topics"}})
}
go f.WaitAndCleanup()
result := <-f.GetResult
if result.err == expected.err && metadataResponseIdsEqual(result.response, expected.response) {
t.Errorf("expected %+v but got %+v\n", expected, result)
Expand Down Expand Up @@ -130,10 +129,9 @@ func TestMetadataFanoutAllErrored(t *testing.T) {
},
}
for _, fetch := range fetchers {
func(ff metadataFetcher) {
f.Fetch(ff, "id1", &MetadataRequest{Topics: []string{"topics"}})
}(fetch)
f.Fetch(fetch, "id1", &MetadataRequest{Topics: []string{"topics"}})
}
go f.WaitAndCleanup()
result := <-f.GetResult
if result.err == expected.err && metadataResponseIdsEqual(result.response, expected.response) {
t.Errorf("expected %+v but got %+v\n", expected, result)
Expand Down Expand Up @@ -173,10 +171,9 @@ func TestMetadataFanoutSingleConnection(t *testing.T) {
},
}
for _, fetch := range fetchers {
func(ff metadataFetcher) {
f.Fetch(ff, "id1", &MetadataRequest{Topics: []string{"topics"}})
}(fetch)
f.Fetch(fetch, "id1", &MetadataRequest{Topics: []string{"topics"}})
}
go f.WaitAndCleanup()
result := <-f.GetResult
if result.err == expected.err && metadataResponseIdsEqual(result.response, expected.response) {
t.Errorf("expected %+v but got %+v\n", expected, result)
Expand Down
5 changes: 5 additions & 0 deletions mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"net"
"runtime"
"strconv"
)

Expand Down Expand Up @@ -104,6 +105,7 @@ func (b *MockBroker) serverLoop() (ok bool) {
if _, err = conn.Write(response); err != nil {
return b.serverError(err, conn)
}

}
if err = conn.Close(); err != nil {
return b.serverError(err, nil)
Expand All @@ -116,6 +118,9 @@ func (b *MockBroker) serverLoop() (ok bool) {
}

func (b *MockBroker) serverError(err error, conn net.Conn) bool {
buf := make([]byte, 1024)
n := runtime.Stack(buf, false)
Logger.Println(buf[:n])
b.t.Error(err)
if conn != nil {
conn.Close()
Expand Down

0 comments on commit 42ee2da

Please sign in to comment.