Skip to content

Commit

Permalink
Memdb Txn Commit race condition fix (#16871)
Browse files Browse the repository at this point in the history
* Add a test to reproduce the race condition

* Fix race condition by publishing the event after the commit and adding a lock to prevent out of order events.

* split publish to generate the list of events before committing the transaction.

* add changelog

* remove extra func

* Apply suggestions from code review

Co-authored-by: Dan Upton <[email protected]>

* add comment to explain test

---------

Co-authored-by: Dan Upton <[email protected]>
  • Loading branch information
dhiaayachi and boxofrad authored Apr 12, 2023
1 parent 1384b34 commit b85a149
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 19 deletions.
3 changes: 3 additions & 0 deletions .changelog/16871.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
Fix a race condition where an event is published before the data associated is commited to memdb.
```
53 changes: 34 additions & 19 deletions agent/consul/state/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package state

import (
"fmt"
"sync"

"github.com/hashicorp/go-memdb"

Expand Down Expand Up @@ -93,23 +93,15 @@ func (c *changeTrackerDB) ReadTxn() *memdb.Txn {
// data directly into the DB. These cases may use WriteTxnRestore.
func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txn{
Txn: c.db.Txn(true),
Index: idx,
publish: c.publish,
Txn: c.db.Txn(true),
Index: idx,
publish: c.publisher.Publish,
prePublish: c.processChanges,
}
t.Txn.TrackChanges()
return t
}

func (c *changeTrackerDB) publish(tx ReadTxn, changes Changes) error {
events, err := c.processChanges(tx, changes)
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.Publish(events)
return nil
}

// WriteTxnRestore returns a wrapped RW transaction that should only be used in
// Restore where we need to replace the entire contents of the Store.
// WriteTxnRestore uses a zero index since the whole restore doesn't really
Expand All @@ -127,6 +119,11 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn {
return t
}

type prePublishFuncType func(tx ReadTxn, changes Changes) ([]stream.Event, error)

//go:generate mockery --name publishFuncType --inpackage
type publishFuncType func(events []stream.Event)

// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
//
// This can not be done with txn.Defer because the callback passed to Defer is
Expand All @@ -140,7 +137,11 @@ type txn struct {
// Index is stored so that it may be passed along to any subscribers as part
// of a change event.
Index uint64
publish func(tx ReadTxn, changes Changes) error
publish publishFuncType

prePublish prePublishFuncType

commitLock sync.Mutex
}

// Commit first pushes changes to EventPublisher, then calls Commit on the
Expand All @@ -161,16 +162,30 @@ func (tx *txn) Commit() error {
}
}

// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
if err := tx.publish(tx.Txn, changes); err != nil {
// This lock prevents events from concurrent transactions getting published out of order.
tx.commitLock.Lock()
defer tx.commitLock.Unlock()

var events []stream.Event
var err error

// prePublish need to generate a list of events before the transaction is commited,
// as we loose the changes in the transaction after the call to Commit().
if tx.prePublish != nil {
events, err = tx.prePublish(tx.Txn, changes)
if err != nil {
return err
}
}

tx.Txn.Commit()

// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases events should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
tx.publish(events)
}
return nil
}

Expand Down
98 changes: 98 additions & 0 deletions agent/consul/state/memdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package state

import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"testing"
"time"
)

func testValidSchema() *memdb.DBSchema {
return &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
"main": {
Name: "main",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "ID"},
},
"foo": {
Name: "foo",
Indexer: &memdb.StringFieldIndex{Field: "Foo"},
},
},
},
},
}
}

type TestObject struct {
ID string
Foo string
}

// This test verify that the new data in a TXN is commited at the time that publishFunc is called.
// To do so, the publish func is mocked, a read on ch1 means that publish is called and blocked,
// ch2 permit to control the publish func and unblock it when receiving a signal.
func Test_txn_Commit(t *testing.T) {
db, err := memdb.NewMemDB(testValidSchema())
require.NoError(t, err)
publishFunc := mockPublishFuncType{}
tx := txn{
Txn: db.Txn(true),
Index: 0,
publish: publishFunc.Execute,
}
ch1 := make(chan struct{})
ch2 := make(chan struct{})
getCh := make(chan memdb.ResultIterator)
group := errgroup.Group{}
group.Go(func() error {
after := time.After(2 * time.Second)
select {
case <-ch1:
tx2 := txn{
Txn: db.Txn(false),
Index: 0,
publish: publishFunc.Execute,
}
get, err := tx2.Get("main", "id")
if err != nil {
return err
}
close(ch2)
getCh <- get
case <-after:
close(ch2)
return fmt.Errorf("test timed out")
}
return nil
})

publishFunc.On("Execute", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(ch1)
<-ch2
}).Return(nil)

err = tx.Insert("main", TestObject{ID: "1", Foo: "foo"})
require.NoError(t, err)
err = tx.Commit()
require.NoError(t, err)
get := <-getCh
require.NotNil(t, get)
next := get.Next()
require.NotNil(t, next)

val := next.(TestObject)
require.Equal(t, val.ID, "1")
require.Equal(t, val.Foo, "foo")

err = group.Wait()
require.NoError(t, err)

}
33 changes: 33 additions & 0 deletions agent/consul/state/mock_publishFuncType.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b85a149

Please sign in to comment.