Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
fix: engine test TestTaggingUseful (#297)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored Mar 12, 2020
1 parent 12021fa commit 5c18cf5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
12 changes: 10 additions & 2 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ type Engine struct {

// how frequently the engine should sample peer usefulness
peerSampleInterval time.Duration
// used by the tests to detect when a sample is taken
sampleCh chan struct{}

sendDontHaves bool

Expand All @@ -167,12 +169,12 @@ type Engine struct {

// NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm)
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm, nil)
}

// This constructor is used by the tests
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
maxReplaceSize int, peerSampleInterval time.Duration) *Engine {
maxReplaceSize int, peerSampleInterval time.Duration, sampleCh chan struct{}) *Engine {

e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
Expand All @@ -183,6 +185,7 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger,
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
peerSampleInterval: peerSampleInterval,
sampleCh: sampleCh,
taskWorkerCount: taskWorkerCount,
sendDontHaves: true,
self: self,
Expand Down Expand Up @@ -315,6 +318,11 @@ func (e *Engine) scoreWorker(ctx context.Context) {
}
// Keep the memory. It's not much and it saves us from having to allocate.
updates = updates[:0]

// Used by the tests
if e.sampleCh != nil {
e.sampleCh <- struct{}{}
}
}
}

Expand Down
62 changes: 42 additions & 20 deletions internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ type engineSet struct {
Blockstore blockstore.Blockstore
}

func newTestEngine(ctx context.Context, idStr string, peerSampleInterval time.Duration) engineSet {
func newTestEngine(ctx context.Context, idStr string) engineSet {
return newTestEngineWithSampling(ctx, idStr, shortTerm, nil)
}

func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval)
e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval, sampleCh)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
Expand All @@ -108,8 +112,8 @@ func newTestEngine(ctx context.Context, idStr string, peerSampleInterval time.Du
func TestConsistentAccounting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sender := newTestEngine(ctx, "Ernie", shortTerm)
receiver := newTestEngine(ctx, "Bert", shortTerm)
sender := newTestEngine(ctx, "Ernie")
receiver := newTestEngine(ctx, "Bert")

// Send messages from Ernie to Bert
for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -143,8 +147,8 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sanfrancisco := newTestEngine(ctx, "sf", shortTerm)
seattle := newTestEngine(ctx, "sea", shortTerm)
sanfrancisco := newTestEngine(ctx, "sf")
seattle := newTestEngine(ctx, "sea")

m := message.New(true)

Expand Down Expand Up @@ -181,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
ctx := context.Background()
t.SkipNow() // TODO implement *Engine.Close
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm)
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -509,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases {
t.Logf("Test case %d:", i)
Expand Down Expand Up @@ -665,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

var next envChan
Expand Down Expand Up @@ -850,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
Expand All @@ -875,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -919,7 +923,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -981,8 +985,8 @@ func TestSendDontHave(t *testing.T) {
func TestTaggingPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
sanfrancisco := newTestEngine(ctx, "sf", shortTerm)
seattle := newTestEngine(ctx, "sea", shortTerm)
sanfrancisco := newTestEngine(ctx, "sf")
seattle := newTestEngine(ctx, "sea")

keys := []string{"a", "b", "c", "d", "e"}
for _, letter := range keys {
Expand All @@ -1007,11 +1011,13 @@ func TestTaggingPeers(t *testing.T) {
}

func TestTaggingUseful(t *testing.T) {
peerSampleInterval := 10 * time.Millisecond
peerSampleInterval := 1 * time.Millisecond

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
me := newTestEngine(ctx, "engine", peerSampleInterval)

sampleCh := make(chan struct{})
me := newTestEngineWithSampling(ctx, "engine", peerSampleInterval, sampleCh)
friend := peer.ID("friend")

block := blocks.NewBlock([]byte("foobar"))
Expand All @@ -1022,22 +1028,38 @@ func TestTaggingUseful(t *testing.T) {
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("Peers should be untagged but weren't")
}

me.Engine.MessageSent(friend, msg)
time.Sleep(15 * time.Millisecond)

for j := 0; j < 3; j++ {
<-sampleCh
}

if me.PeerTagger.count(me.Engine.tagUseful) != 1 {
t.Fatal("Peers should be tagged but weren't")
}
time.Sleep(peerSampleInterval * 10)

for j := 0; j < longTermRatio; j++ {
<-sampleCh
}
}

if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(peerSampleInterval * 2)

for j := 0; j < longTermRatio; j++ {
<-sampleCh
}

if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(peerSampleInterval * 30)

for j := 0; j < longTermRatio; j++ {
<-sampleCh
}

if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("peers should finally be untagged")
}
Expand Down

0 comments on commit 5c18cf5

Please sign in to comment.