Skip to content

Commit

Permalink
Merge pull request #925 from ecordell/sever-singleflight
Browse files Browse the repository at this point in the history
Sever namespace read context
  • Loading branch information
ecordell authored Oct 21, 2022
2 parents 3871326 + eb9cba0 commit ef2738c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
4 changes: 3 additions & 1 deletion internal/datastore/proxy/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func (r *nsCachingReader) ReadNamespace(
// We couldn't use the cached entry, load one
var err error
loadedRaw, err, _ = r.p.readNsGroup.Do(nsRevisionKey, func() (any, error) {
loaded, updatedRev, err := r.Reader.ReadNamespace(ctx, nsName)
// sever the context so that another branch doesn't cancel the
// single-flighted namespace read
loaded, updatedRev, err := r.Reader.ReadNamespace(datastore.SeparateContextWithTracing(ctx), nsName)
if err != nil && !errors.Is(err, &datastore.ErrNamespaceNotFound{}) {
// Propagate this error to the caller
return nil, err
Expand Down
49 changes: 48 additions & 1 deletion internal/datastore/proxy/caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package proxy

import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -135,6 +138,7 @@ func TestRWTNamespaceCaching(t *testing.T) {
func TestSingleFlight(t *testing.T) {
dsMock := &proxy_test.MockDatastore{}

ctx := context.Background()
oneReader := &proxy_test.MockReader{}
dsMock.On("SnapshotReader", one).Return(oneReader)
oneReader.
Expand All @@ -144,7 +148,6 @@ func TestSingleFlight(t *testing.T) {
Once()

require := require.New(t)
ctx := context.Background()

ds := NewCachingDatastoreProxy(dsMock, nil)

Expand Down Expand Up @@ -220,3 +223,47 @@ func TestSnapshotNamespaceCachingRealDatastore(t *testing.T) {
})
}
}

type reader struct {
proxy_test.MockReader
}

func (r *reader) ReadNamespace(ctx context.Context, namespace string) (ns *core.NamespaceDefinition, lastWritten datastore.Revision, err error) {
time.Sleep(10 * time.Millisecond)
if errors.Is(ctx.Err(), context.Canceled) {
return nil, old, fmt.Errorf("error")
}
return &core.NamespaceDefinition{Name: namespace}, old, nil
}

func TestSingleFlightCancelled(t *testing.T) {
dsMock := &proxy_test.MockDatastore{}
ctx1, cancel1 := context.WithCancel(context.Background())
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
defer cancel1()

dsMock.On("SnapshotReader", one).Return(&reader{MockReader: proxy_test.MockReader{}})

ds := NewCachingDatastoreProxy(dsMock, nil)

g := sync.WaitGroup{}
var ns2 *core.NamespaceDefinition
g.Add(2)
go func() {
_, _, _ = ds.SnapshotReader(one).ReadNamespace(ctx1, nsA)
g.Done()
}()
go func() {
time.Sleep(5 * time.Millisecond)
ns2, _, _ = ds.SnapshotReader(one).ReadNamespace(ctx2, nsA)
g.Done()
}()
cancel1()

g.Wait()
require.NotNil(t, ns2)
require.Equal(t, nsA, ns2.Name)

dsMock.AssertExpectations(t)
}

0 comments on commit ef2738c

Please sign in to comment.