Skip to content

Commit

Permalink
Fix race in syncer test
Browse files Browse the repository at this point in the history
The test did not ensure that the previous Sync run was finished when
the next Sync run was started.

It was reproducible locally wiht:

GOMAXPROCS=1 go test -race  -count=100 ./syncer | grep -A30 'DATA RACE'

Output:

WARNING: DATA RACE
Write at 0x00c000161e60 by goroutine 71:
  runtime.mapassign_faststr()
      /usr/local/Cellar/go/1.19.4/libexec/src/runtime/map_faststr.go:203 +0x0
  powerdns.com/platform/lightningstream/syncer.(*Syncer).LoadOnce()
      /Users/wojas/pdns/work-ls/lightningstream/syncer/sync.go:505 +0xc66
  ...

Previous read at 0x00c000161e60 by goroutine 64:
  runtime.mapiterinit()
      /usr/local/Cellar/go/1.19.4/libexec/src/runtime/map.go:815 +0x0
  powerdns.com/platform/lightningstream/syncer/cleaner.(*Worker).SetCommitted()
      /Users/wojas/pdns/work-ls/lightningstream/syncer/cleaner/cleaner.go:55 +0xee
  ...
  • Loading branch information
wojas committed Mar 24, 2023
1 parent 9353f38 commit 20c4eee
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
6 changes: 3 additions & 3 deletions docs/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ of changes made by Lightning Stream when syncing data from other instances.
For example, it could have an in-memory cache of the state and only invalidate this cache when itself writes
a change to the LMDB.

Another example would be where the LMDB would map IDs to Names, and the applications maintains an in-memory
Another example would be where the LMDB would map IDs to Names, and the application maintains an in-memory
reverse index from Name to ID.

#### Solution

Since LMDB is so fast, it may be feasible to store all state in the LMDB and read it on demand.

Alternatively, a cache could be short-lived (e.g. 1 second), or the application should check if the LMDB's LastTxnID
Alternatively, a cache could be short-lived (e.g. 1 second), or the application could check if the LMDB's LastTxnID
has changed since the last cache update.

!!! TODO
Expand Down Expand Up @@ -89,7 +89,7 @@ Example:

The value part can be kept empty, or contain some data, as needed.

When you need a list of values, perform an `MDB_SET_RANGE` query for fetch all keys that start with
When you need a list of values, perform an `MDB_SET_RANGE` query to fetch all keys that start with
"accounts-owned-by-jane:".

If a safe separator cannot be found, the key can be prefixed with one or two bytes indicating the length
Expand Down
35 changes: 31 additions & 4 deletions syncer/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -60,7 +61,7 @@ func doTest(t *testing.T, withHeader bool) {

// Start syncer A with one key
t.Log("Starting syncer A")
go runSync(ctxA, syncerA)
goRunSync(ctxA, syncerA)

t.Log("----------")

Expand All @@ -71,7 +72,7 @@ func doTest(t *testing.T, withHeader bool) {
// Starting with an empty LMDB is a special case that will not trigger any
// local snapshot.
t.Log("Starting syncer B")
go runSync(ctxB, syncerB)
goRunSync(ctxB, syncerB)

t.Log("----------")

Expand All @@ -98,7 +99,7 @@ func doTest(t *testing.T, withHeader bool) {
cancelA()
ctxA, cancelA = context.WithCancel(ctx)
t.Log("----------")
go runSync(ctxA, syncerA)
goRunSync(ctxA, syncerA)

t.Log("----------")

Expand All @@ -119,7 +120,7 @@ func doTest(t *testing.T, withHeader bool) {
t.Log("----------")
t.Log("Starting syncer A again")
ctxA, cancelA = context.WithCancel(ctx)
go runSync(ctxA, syncerA)
goRunSync(ctxA, syncerA)
t.Log("----------")

// New value in A should get synced to B
Expand Down Expand Up @@ -191,6 +192,32 @@ func requireSnapshotsLenWait(t *testing.T, st simpleblob.Interface, expLen int,
require.Len(t, list, expLen, instance)
}

// Ensure that we there are never two Sync goroutines running at the same time,
// because this can cause a data race.
// Protected by mutex, just in case the test is ever run in parallel mode.
var (
runningSyncersMu sync.Mutex
runningSyncers = map[*Syncer]*sync.WaitGroup{}
)

func goRunSync(ctx context.Context, syncer *Syncer) {
runningSyncersMu.Lock()
wg, exists := runningSyncers[syncer]
if !exists {
wg = &sync.WaitGroup{}
runningSyncers[syncer] = wg
}
runningSyncersMu.Unlock()
go func() {
logrus.Info("Wait for any previous Syncer instance to exit")
wg.Wait()
logrus.Info("Wait for any previous Syncer done")
wg.Add(1)
defer wg.Done()
runSync(ctx, syncer)
}()
}

func runSync(ctx context.Context, syncer *Syncer) {
err := syncer.Sync(ctx)
if err != nil && err != context.Canceled {
Expand Down
3 changes: 2 additions & 1 deletion test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ go test -count=1 "$@" ./...
go test -count=1 "$@" github.com/PowerDNS/simpleblob/...

# Run again with race detector
go test -race -count=1 "$@" ./...
go test -race -count=5 "$@" ./...
GOMAXPROCS=1 go test -race -count=5 "$@" ./...

# This one used to be flaky, run a few more times
go test -count 20 -run TestSyncer_Sync_startup powerdns.com/platform/lightningstream/syncer
Expand Down

0 comments on commit 20c4eee

Please sign in to comment.