diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 15e6ad8c..5c7da903 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -159,6 +159,8 @@ type Engine struct { // how frequently the engine should sample peer usefulness peerSampleInterval time.Duration + // used by the tests to detect when a sample is taken + sampleCh chan struct{} sendDontHaves bool @@ -167,12 +169,12 @@ type Engine struct { // NewEngine creates a new block sending engine for the given block store func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine { - return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm) + return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm, nil) } // This constructor is used by the tests func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, - maxReplaceSize int, peerSampleInterval time.Duration) *Engine { + maxReplaceSize int, peerSampleInterval time.Duration, sampleCh chan struct{}) *Engine { e := &Engine{ ledgerMap: make(map[peer.ID]*ledger), @@ -183,6 +185,7 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, ticker: time.NewTicker(time.Millisecond * 100), maxBlockSizeReplaceHasWithBlock: maxReplaceSize, peerSampleInterval: peerSampleInterval, + sampleCh: sampleCh, taskWorkerCount: taskWorkerCount, sendDontHaves: true, self: self, @@ -315,6 +318,11 @@ func (e *Engine) scoreWorker(ctx context.Context) { } // Keep the memory. It's not much and it saves us from having to allocate. updates = updates[:0] + + // Used by the tests + if e.sampleCh != nil { + e.sampleCh <- struct{}{} + } } } diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 6313ee16..0ac01107 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -91,10 +91,14 @@ type engineSet struct { Blockstore blockstore.Blockstore } -func newTestEngine(ctx context.Context, idStr string, peerSampleInterval time.Duration) engineSet { +func newTestEngine(ctx context.Context, idStr string) engineSet { + return newTestEngineWithSampling(ctx, idStr, shortTerm, nil) +} + +func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet { fpt := &fakePeerTagger{} bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval) + e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval, sampleCh) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) return engineSet{ Peer: peer.ID(idStr), @@ -108,8 +112,8 @@ func newTestEngine(ctx context.Context, idStr string, peerSampleInterval time.Du func TestConsistentAccounting(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sender := newTestEngine(ctx, "Ernie", shortTerm) - receiver := newTestEngine(ctx, "Bert", shortTerm) + sender := newTestEngine(ctx, "Ernie") + receiver := newTestEngine(ctx, "Bert") // Send messages from Ernie to Bert for i := 0; i < 1000; i++ { @@ -143,8 +147,8 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sanfrancisco := newTestEngine(ctx, "sf", shortTerm) - seattle := newTestEngine(ctx, "sea", shortTerm) + sanfrancisco := newTestEngine(ctx, "sf") + seattle := newTestEngine(ctx, "sea") m := message.New(true) @@ -181,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool { func TestOutboxClosedWhenEngineClosed(t *testing.T) { ctx := context.Background() t.SkipNow() // TODO implement *Engine.Close - e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm) + e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) var wg sync.WaitGroup wg.Add(1) @@ -509,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) { testCases = onlyTestCases } - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm) + e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) for i, testCase := range testCases { t.Logf("Test case %d:", i) @@ -665,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) { testCases = onlyTestCases } - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm) + e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) var next envChan @@ -850,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { ctx := context.Background() for i := 0; i < numRounds; i++ { expected := make([][]string, 0, len(testcases)) - e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm) + e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) for _, testcase := range testcases { set := testcase[0] @@ -875,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm) + e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) @@ -919,7 +923,7 @@ func TestSendDontHave(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm) + e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) @@ -981,8 +985,8 @@ func TestSendDontHave(t *testing.T) { func TestTaggingPeers(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - sanfrancisco := newTestEngine(ctx, "sf", shortTerm) - seattle := newTestEngine(ctx, "sea", shortTerm) + sanfrancisco := newTestEngine(ctx, "sf") + seattle := newTestEngine(ctx, "sea") keys := []string{"a", "b", "c", "d", "e"} for _, letter := range keys { @@ -1007,11 +1011,13 @@ func TestTaggingPeers(t *testing.T) { } func TestTaggingUseful(t *testing.T) { - peerSampleInterval := 10 * time.Millisecond + peerSampleInterval := 1 * time.Millisecond ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - me := newTestEngine(ctx, "engine", peerSampleInterval) + + sampleCh := make(chan struct{}) + me := newTestEngineWithSampling(ctx, "engine", peerSampleInterval, sampleCh) friend := peer.ID("friend") block := blocks.NewBlock([]byte("foobar")) @@ -1022,22 +1028,38 @@ func TestTaggingUseful(t *testing.T) { if me.PeerTagger.count(me.Engine.tagUseful) != 0 { t.Fatal("Peers should be untagged but weren't") } + me.Engine.MessageSent(friend, msg) - time.Sleep(15 * time.Millisecond) + + for j := 0; j < 3; j++ { + <-sampleCh + } + if me.PeerTagger.count(me.Engine.tagUseful) != 1 { t.Fatal("Peers should be tagged but weren't") } - time.Sleep(peerSampleInterval * 10) + + for j := 0; j < longTermRatio; j++ { + <-sampleCh + } } if me.PeerTagger.count(me.Engine.tagUseful) == 0 { t.Fatal("peers should still be tagged due to long-term usefulness") } - time.Sleep(peerSampleInterval * 2) + + for j := 0; j < longTermRatio; j++ { + <-sampleCh + } + if me.PeerTagger.count(me.Engine.tagUseful) == 0 { t.Fatal("peers should still be tagged due to long-term usefulness") } - time.Sleep(peerSampleInterval * 30) + + for j := 0; j < longTermRatio; j++ { + <-sampleCh + } + if me.PeerTagger.count(me.Engine.tagUseful) != 0 { t.Fatal("peers should finally be untagged") }