From 42ee2da85e12a028c35ace0796f13632fb92cecc Mon Sep 17 00:00:00 2001 From: Kai Hayashi Date: Wed, 12 Nov 2014 13:18:42 -0800 Subject: [PATCH] adding in evan's suggestions --- client.go | 27 +++++++++++++++------ consumer_test.go | 1 - metadata_fanout.go | 52 +++++++++++++++++++++-------------------- metadata_fanout_test.go | 15 +++++------- mockbroker.go | 5 ++++ 5 files changed, 58 insertions(+), 42 deletions(-) diff --git a/client.go b/client.go index 9d4c50631..96960f3d5 100644 --- a/client.go +++ b/client.go @@ -232,7 +232,7 @@ func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTim func (client *Client) disconnectBroker(broker *Broker) { client.lock.Lock() defer client.lock.Unlock() - Logger.Printf("Disconnecting Broker %d\n", broker.ID()) + Logger.Printf("Disconnecting Broker %d %s\n", broker.ID(), broker.Addr()) client.deadBrokerAddrs[broker.addr] = struct{}{} @@ -264,17 +264,21 @@ func (client *Client) fanoutMetadataRequest(topics []string) (*MetadataResponse, fanout := newMetadataFanout(client.config.MetadataConcurrentConnections) - if len(client.brokers) < 1 { - fanout.Fetch(client.extraBroker, client.id, &MetadataRequest{Topics: topics}) + if len(client.brokers) < 1 && client.seedBroker != nil { + Logger.Printf("Fanning out %v\n", client.seedBroker) + fanout.Fetch(client.seedBroker, client.id, &MetadataRequest{Topics: topics}) } else { for _, broker := range client.brokers { Logger.Printf("Fanning out %v\n", broker) - b := broker - fanout.Fetch(b, client.id, &MetadataRequest{Topics: topics}) + fanout.Fetch(broker, client.id, &MetadataRequest{Topics: topics}) } } + go fanout.WaitAndCleanup() result := <-fanout.GetResult + if result == nil { + return nil, OutOfBrokers + } return result.response, result.err } @@ -317,8 +321,17 @@ func (client *Client) refreshMetadata(topics []string, retries int) error { // didn't even send, return the error return err default: - // means all brokers are timedout... At what level should we disconnect a broker? - if retries > 0 { + client.lock.RLock() + shouldTryNextSeedBroker := len(client.brokers) < 1 && client.seedBroker != nil + client.lock.RUnlock() + + if retries > 0 && shouldTryNextSeedBroker { + // We are getting broker information and our first seed errored on metadata connect. + // Retry on the next seed. + client.disconnectBroker(client.seedBroker) + return client.refreshMetadata(topics, retries-1) + } else if retries > 0 { + // means all brokers are timedout... At what level should we disconnect a broker? Logger.Printf("Out of available brokers. Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retries) time.Sleep(client.config.WaitForElection) client.resurrectDeadBrokers() diff --git a/consumer_test.go b/consumer_test.go index a0d53856e..bcbc981fc 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -52,7 +52,6 @@ func TestSimpleConsumer(t *testing.T) { t.Error("Incorrect message offset!") } } - } func TestConsumerRawOffset(t *testing.T) { diff --git a/metadata_fanout.go b/metadata_fanout.go index 796d61a50..ef95c9b01 100644 --- a/metadata_fanout.go +++ b/metadata_fanout.go @@ -3,12 +3,10 @@ package sarama import "sync" type metadataFanout struct { - wg sync.WaitGroup - cleanupWaiting sync.Once - initClose sync.Once - results chan *fanoutResult - closer chan struct{} - throttle chan struct{} + wg sync.WaitGroup + results chan *fanoutResult + closer chan struct{} + throttle chan struct{} GetResult chan *fanoutResult // Stores the first non-error result from the metadata fanout. Done chan struct{} // optional channel to notify outside when clean up is complete. @@ -26,12 +24,9 @@ type metadataFetcher interface { func newMetadataFanout(maxConcurrentConnections int) *metadataFanout { f := &metadataFanout{ - wg: sync.WaitGroup{}, - cleanupWaiting: sync.Once{}, - initClose: sync.Once{}, - closer: make(chan struct{}), - results: make(chan *fanoutResult, 1), - throttle: make(chan struct{}, maxConcurrentConnections), + closer: make(chan struct{}), + results: make(chan *fanoutResult, 1), + throttle: make(chan struct{}, maxConcurrentConnections), GetResult: make(chan *fanoutResult), Done: make(chan struct{}), @@ -43,7 +38,6 @@ func newMetadataFanout(maxConcurrentConnections int) *metadataFanout { // Fetch is called by the thread initiating the fanout. func (f *metadataFanout) Fetch(broker metadataFetcher, clientId string, request *MetadataRequest) { f.wg.Add(1) - f.cleanupWaiting.Do(func() { go f.waitAndCleanup(f.Done) }) go func(worker metadataFetcher) { // Wait until throttle allows us to continue or someone else closes our channel var checkout struct{} @@ -54,21 +48,29 @@ func (f *metadataFanout) Fetch(broker metadataFetcher, clientId string, request case f.throttle <- checkout: } + // Since the above select block can potentially choose the second case even if f.closer is closed + // we double check that case here. See https://goo.gl/lqydfp select { case <-f.closer: f.wg.Done() return default: - r, e := worker.GetMetadata(clientId, request) - f.results <- &fanoutResult{ - err: e, - response: r, - } + } + + r, e := worker.GetMetadata(clientId, request) + select { + case <-f.closer: + f.wg.Done() + return + case f.results <- &fanoutResult{ + err: e, + response: r, + }: } }(broker) } -func (f *metadataFanout) waitAndCleanup(doneChan chan struct{}) { +func (f *metadataFanout) WaitAndCleanup() { // Wait until all workers have returned or a successful result has come // and workers can shutdown f.wg.Wait() @@ -82,16 +84,16 @@ func (f *metadataFanout) waitAndCleanup(doneChan chan struct{}) { // If no valid result is found it emits the last failing result. func (f *metadataFanout) Listen() { var lastResult *fanoutResult + goodResultFound := false for r := range f.results { - f.wg.Done() lastResult = r - if r.err == nil { - f.initClose.Do(func() { - close(f.closer) - f.GetResult <- r - }) + if r.err == nil && !goodResultFound { + close(f.closer) + f.GetResult <- r + goodResultFound = true } <-f.throttle + f.wg.Done() } select { case <-f.closer: diff --git a/metadata_fanout_test.go b/metadata_fanout_test.go index 6f0f1efe5..2df4b1287 100644 --- a/metadata_fanout_test.go +++ b/metadata_fanout_test.go @@ -86,10 +86,9 @@ func TestMetadataFanout(t *testing.T) { }, } for _, fetch := range fetchers { - func(ff metadataFetcher) { - f.Fetch(ff, "id1", &MetadataRequest{Topics: []string{"topics"}}) - }(fetch) + f.Fetch(fetch, "id1", &MetadataRequest{Topics: []string{"topics"}}) } + go f.WaitAndCleanup() result := <-f.GetResult if result.err == expected.err && metadataResponseIdsEqual(result.response, expected.response) { t.Errorf("expected %+v but got %+v\n", expected, result) @@ -130,10 +129,9 @@ func TestMetadataFanoutAllErrored(t *testing.T) { }, } for _, fetch := range fetchers { - func(ff metadataFetcher) { - f.Fetch(ff, "id1", &MetadataRequest{Topics: []string{"topics"}}) - }(fetch) + f.Fetch(fetch, "id1", &MetadataRequest{Topics: []string{"topics"}}) } + go f.WaitAndCleanup() result := <-f.GetResult if result.err == expected.err && metadataResponseIdsEqual(result.response, expected.response) { t.Errorf("expected %+v but got %+v\n", expected, result) @@ -173,10 +171,9 @@ func TestMetadataFanoutSingleConnection(t *testing.T) { }, } for _, fetch := range fetchers { - func(ff metadataFetcher) { - f.Fetch(ff, "id1", &MetadataRequest{Topics: []string{"topics"}}) - }(fetch) + f.Fetch(fetch, "id1", &MetadataRequest{Topics: []string{"topics"}}) } + go f.WaitAndCleanup() result := <-f.GetResult if result.err == expected.err && metadataResponseIdsEqual(result.response, expected.response) { t.Errorf("expected %+v but got %+v\n", expected, result) diff --git a/mockbroker.go b/mockbroker.go index c569a2e19..f7844714c 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -5,6 +5,7 @@ import ( "errors" "io" "net" + "runtime" "strconv" ) @@ -104,6 +105,7 @@ func (b *MockBroker) serverLoop() (ok bool) { if _, err = conn.Write(response); err != nil { return b.serverError(err, conn) } + } if err = conn.Close(); err != nil { return b.serverError(err, nil) @@ -116,6 +118,9 @@ func (b *MockBroker) serverLoop() (ok bool) { } func (b *MockBroker) serverError(err error, conn net.Conn) bool { + buf := make([]byte, 1024) + n := runtime.Stack(buf, false) + Logger.Println(buf[:n]) b.t.Error(err) if conn != nil { conn.Close()