Skip to content

Commit

Permalink
bugfix: flaky queue/Example_connectionPool
Browse files Browse the repository at this point in the history
We need to wait for a queue configuration on all instances before
start a work.

Closes #278
  • Loading branch information
oleg-jukovec committed May 12, 2023
1 parent adc6b51 commit 56bf9a9
Showing 1 changed file with 21 additions and 19 deletions.
40 changes: 21 additions & 19 deletions queue/example_connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ type QueueConnectionHandler struct {
name string
cfg queue.Cfg

uuid uuid.UUID
registered bool
err error
mutex sync.Mutex
masterUpdated chan struct{}
masterCnt int32
uuid uuid.UUID
registered bool
err error
mutex sync.Mutex
updated chan struct{}
masterCnt int32
}

// QueueConnectionHandler implements the ConnectionHandler interface.
Expand All @@ -32,9 +32,9 @@ var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
// NewQueueConnectionHandler creates a QueueConnectionHandler object.
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
return &QueueConnectionHandler{
name: name,
cfg: cfg,
masterUpdated: make(chan struct{}, 10),
name: name,
cfg: cfg,
updated: make(chan struct{}, 10),
}
}

Expand All @@ -53,11 +53,9 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
}

master := role == connection_pool.MasterRole
if master {
defer func() {
h.masterUpdated <- struct{}{}
}()
}
defer func() {
h.updated <- struct{}{}
}()

// Set up a queue module configuration for an instance.
q := queue.New(conn, h.name)
Expand Down Expand Up @@ -106,7 +104,7 @@ func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,

// Closes closes a QueueConnectionHandler object.
func (h *QueueConnectionHandler) Close() {
close(h.masterUpdated)
close(h.updated)
}

// Example demonstrates how to use the queue package with the connection_pool
Expand Down Expand Up @@ -155,8 +153,10 @@ func Example_connectionPool() {
}
defer connPool.Close()

// Wait for a master instance identification in the queue.
<-h.masterUpdated
// Wait for a queue initialization and master instance identification in
// the queue.
<-h.updated
<-h.updated
if h.err != nil {
fmt.Printf("Unable to identify in the pool: %s", h.err)
return
Expand All @@ -183,8 +183,10 @@ func Example_connectionPool() {
return
}

// Wait for a new master instance re-identification.
<-h.masterUpdated
// Wait for a replica instance connection and a new master instance
// re-identification.
<-h.updated
<-h.updated
h.mutex.Lock()
err = h.err
h.mutex.Unlock()
Expand Down

0 comments on commit 56bf9a9

Please sign in to comment.