diff --git a/bitswap.go b/bitswap.go index c7875307..73ca266e 100644 --- a/bitswap.go +++ b/bitswap.go @@ -154,8 +154,15 @@ func WithTargetMessageSize(tms int) Option { } } +func WithPeerBlockRequestFilter(pbrf PeerBlockRequestFilter) Option { + return func(bs *Bitswap) { + bs.peerBlockRequestFilter = pbrf + } +} + type TaskInfo = decision.TaskInfo type TaskComparator = decision.TaskComparator +type PeerBlockRequestFilter = decision.PeerBlockRequestFilter // WithTaskComparator configures custom task prioritization logic. func WithTaskComparator(comparator TaskComparator) Option { @@ -291,6 +298,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, activeBlocksGauge, decision.WithTaskComparator(bs.taskComparator), decision.WithTargetMessageSize(bs.engineTargetMessageSize), + decision.WithPeerBlockRequestFilter(bs.peerBlockRequestFilter), ) bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves) @@ -399,6 +407,9 @@ type Bitswap struct { simulateDontHavesOnTimeout bool taskComparator TaskComparator + + // an optional feature to accept / deny requests for blocks + peerBlockRequestFilter PeerBlockRequestFilter } type counters struct { diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 24e45f16..c8c33097 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -180,6 +180,8 @@ type Engine struct { metricUpdateCounter int taskComparator TaskComparator + + peerBlockRequestFilter PeerBlockRequestFilter } // TaskInfo represents the details of a request from a peer. @@ -201,6 +203,10 @@ type TaskInfo struct { // It should return true if task 'ta' has higher priority than task 'tb' type TaskComparator func(ta, tb *TaskInfo) bool +// PeerBlockRequestFilter is used to accept / deny requests for a CID coming from a PeerID +// It should return true if the request should be fullfilled. +type PeerBlockRequestFilter func(p peer.ID, c cid.Cid) bool + type Option func(*Engine) func WithTaskComparator(comparator TaskComparator) Option { @@ -209,6 +215,12 @@ func WithTaskComparator(comparator TaskComparator) Option { } } +func WithPeerBlockRequestFilter(pbrf PeerBlockRequestFilter) Option { + return func(e *Engine) { + e.peerBlockRequestFilter = pbrf + } +} + func WithTargetMessageSize(size int) Option { return func(e *Engine) { e.targetMessageSize = size @@ -598,8 +610,11 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } }() - // Get block sizes + // Dispatch entries wants, cancels := e.splitWantsCancels(entries) + wants, denials := e.splitWantsDenials(p, wants) + + // Get block sizes wantKs := cid.NewSet() for _, entry := range wants { wantKs.Add(entry.Cid) @@ -639,6 +654,38 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } } + // Cancel a block operation + sendDontHave := func(entry bsmsg.Entry) { + // Only add the task to the queue if the requester wants a DONT_HAVE + if e.sendDontHaves && entry.SendDontHave { + c := entry.Cid + + newWorkExists = true + isWantBlock := false + if entry.WantType == pb.Message_Wantlist_Block { + isWantBlock = true + } + + activeEntries = append(activeEntries, peertask.Task{ + Topic: c, + Priority: int(entry.Priority), + Work: bsmsg.BlockPresenceSize(c), + Data: &taskData{ + BlockSize: 0, + HaveBlock: false, + IsWantBlock: isWantBlock, + SendDontHave: entry.SendDontHave, + }, + }) + } + } + + // Deny access to blocks + for _, entry := range denials { + log.Debugw("Bitswap engine: block denied access", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) + sendDontHave(entry) + } + // For each want-have / want-block for _, entry := range wants { c := entry.Cid @@ -650,27 +697,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // If the block was not found if !found { log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) - - // Only add the task to the queue if the requester wants a DONT_HAVE - if e.sendDontHaves && entry.SendDontHave { - newWorkExists = true - isWantBlock := false - if entry.WantType == pb.Message_Wantlist_Block { - isWantBlock = true - } - - activeEntries = append(activeEntries, peertask.Task{ - Topic: c, - Priority: int(entry.Priority), - Work: bsmsg.BlockPresenceSize(c), - Data: &taskData{ - BlockSize: 0, - HaveBlock: false, - IsWantBlock: isWantBlock, - SendDontHave: entry.SendDontHave, - }, - }) - } + sendDontHave(entry) } else { // The block was found, add it to the queue newWorkExists = true @@ -722,6 +749,26 @@ func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Ent return wants, cancels } +// Split the want-have / want-block entries from the block that will be denied access +func (e *Engine) splitWantsDenials(p peer.ID, allWants []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) { + if e.peerBlockRequestFilter == nil { + return allWants, nil + } + + wants := make([]bsmsg.Entry, 0, len(allWants)) + denied := make([]bsmsg.Entry, 0, len(allWants)) + + for _, et := range allWants { + if e.peerBlockRequestFilter(p, et.Cid) { + wants = append(wants, et) + } else { + denied = append(denied, et) + } + } + + return wants, denied +} + // ReceiveFrom is called when new blocks are received and added to the block // store, meaning there may be peers who want those blocks, so we should send // the blocks to them. diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 315604aa..c4dc5348 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1112,6 +1112,334 @@ func TestTaskComparator(t *testing.T) { } } +func TestPeerBlockFilter(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // Generate a few keys + keys := []string{"a", "b", "c", "d"} + blks := make([]blocks.Block, 0, len(keys)) + for _, letter := range keys { + block := blocks.NewBlock([]byte(letter)) + blks = append(blks, block) + } + + // Generate a few partner peers + peerIDs := make([]peer.ID, 3) + peerIDs[0] = libp2ptest.RandPeerIDFatal(t) + peerIDs[1] = libp2ptest.RandPeerIDFatal(t) + peerIDs[2] = libp2ptest.RandPeerIDFatal(t) + + // Setup the main peer + fpt := &fakePeerTagger{} + sl := NewTestScoreLedger(shortTerm, nil, clock.New()) + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + if err := bs.PutMany(ctx, blks); err != nil { + t.Fatal(err) + } + + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, + WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { + // peer 0 has access to everything + if p == peerIDs[0] { + return true + } + // peer 1 can only access key c and d + if p == peerIDs[1] { + return blks[2].Cid().Equals(c) || blks[3].Cid().Equals(c) + } + // peer 2 and other can only access key d + return blks[3].Cid().Equals(c) + }), + ) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + + // Setup the test + type testCaseEntry struct { + peerIndex int + wantBlks string + wantHaves string + } + + type testCaseExp struct { + blks string + haves string + dontHaves string + } + + type testCase struct { + only bool + wl testCaseEntry + exp testCaseExp + } + + testCases := []testCase{ + // Peer 0 has access to everything: want-block `a` succeeds. + { + wl: testCaseEntry{ + peerIndex: 0, + wantBlks: "a", + }, + exp: testCaseExp{ + blks: "a", + }, + }, + // Peer 0 has access to everything: want-have `b` succeeds. + { + wl: testCaseEntry{ + peerIndex: 0, + wantHaves: "b1", + }, + exp: testCaseExp{ + haves: "b", + dontHaves: "1", + }, + }, + // Peer 1 has access to [c, d]: want-have `a` result in dont-have. + { + wl: testCaseEntry{ + peerIndex: 1, + wantHaves: "ac", + }, + exp: testCaseExp{ + haves: "c", + dontHaves: "a", + }, + }, + // Peer 1 has access to [c, d]: want-block `b` result in dont-have. + { + wl: testCaseEntry{ + peerIndex: 1, + wantBlks: "bd", + }, + exp: testCaseExp{ + blks: "d", + dontHaves: "b", + }, + }, + // Peer 2 has access to [d]: want-have `a` and want-block `b` result in dont-have. + { + wl: testCaseEntry{ + peerIndex: 2, + wantHaves: "a", + wantBlks: "bcd1", + }, + exp: testCaseExp{ + haves: "", + blks: "d", + dontHaves: "abc1", + }, + }, + } + + var onlyTestCases []testCase + for _, testCase := range testCases { + if testCase.only { + onlyTestCases = append(onlyTestCases, testCase) + } + } + if len(onlyTestCases) > 0 { + testCases = onlyTestCases + } + + for i, testCase := range testCases { + // Create wants requests + wl := testCase.wl + + t.Logf("test case %v: Peer%v / want-blocks '%s' / want-haves '%s'", + i, wl.peerIndex, wl.wantBlks, wl.wantHaves) + + wantBlks := strings.Split(wl.wantBlks, "") + wantHaves := strings.Split(wl.wantHaves, "") + + partnerWantBlocksHaves(e, wantBlks, wantHaves, true, peerIDs[wl.peerIndex]) + + // Check result + exp := testCase.exp + + next := <-e.Outbox() + envelope := <-next + + expBlks := strings.Split(exp.blks, "") + expHaves := strings.Split(exp.haves, "") + expDontHaves := strings.Split(exp.dontHaves, "") + + err := checkOutput(t, e, envelope, expBlks, expHaves, expDontHaves) + if err != nil { + t.Fatal(err) + } + } +} + +func TestPeerBlockFilterMutability(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // Generate a few keys + keys := []string{"a", "b", "c", "d"} + blks := make([]blocks.Block, 0, len(keys)) + for _, letter := range keys { + block := blocks.NewBlock([]byte(letter)) + blks = append(blks, block) + } + + partnerID := libp2ptest.RandPeerIDFatal(t) + + // Setup the main peer + fpt := &fakePeerTagger{} + sl := NewTestScoreLedger(shortTerm, nil, clock.New()) + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + if err := bs.PutMany(ctx, blks); err != nil { + t.Fatal(err) + } + + filterAllowList := make(map[cid.Cid]bool) + + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, + WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { + return filterAllowList[c] + }), + ) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + + // Setup the test + type testCaseEntry struct { + allowList string + wantBlks string + wantHaves string + } + + type testCaseExp struct { + blks string + haves string + dontHaves string + } + + type testCase struct { + only bool + wls []testCaseEntry + exps []testCaseExp + } + + testCases := []testCase{ + { + wls: []testCaseEntry{ + { + // Peer has no accesses & request a want-block + allowList: "", + wantBlks: "a", + }, + { + // Then Peer is allowed access to a + allowList: "a", + wantBlks: "a", + }, + }, + exps: []testCaseExp{ + { + dontHaves: "a", + }, + { + blks: "a", + }, + }, + }, + { + wls: []testCaseEntry{ + { + // Peer has access to bc + allowList: "bc", + wantHaves: "bc", + }, + { + // Then Peer loses access to b + allowList: "c", + wantBlks: "bc", // Note: We request a block here to force a response from the node + }, + }, + exps: []testCaseExp{ + { + haves: "bc", + }, + { + blks: "c", + dontHaves: "b", + }, + }, + }, + { + wls: []testCaseEntry{ + { + // Peer has no accesses & request a want-have + allowList: "", + wantHaves: "d", + }, + { + // Then Peer gains access to d + allowList: "d", + wantHaves: "d", + }, + }, + exps: []testCaseExp{ + { + dontHaves: "d", + }, + { + haves: "d", + }, + }, + }, + } + + var onlyTestCases []testCase + for _, testCase := range testCases { + if testCase.only { + onlyTestCases = append(onlyTestCases, testCase) + } + } + if len(onlyTestCases) > 0 { + testCases = onlyTestCases + } + + for i, testCase := range testCases { + for j := range testCase.wls { + wl := testCase.wls[j] + exp := testCase.exps[j] + + // Create wants requests + t.Logf("test case %v, %v: allow-list '%s' / want-blocks '%s' / want-haves '%s'", + i, j, wl.allowList, wl.wantBlks, wl.wantHaves) + + allowList := strings.Split(wl.allowList, "") + wantBlks := strings.Split(wl.wantBlks, "") + wantHaves := strings.Split(wl.wantHaves, "") + + // Update the allow list + filterAllowList = make(map[cid.Cid]bool) + for _, letter := range allowList { + block := blocks.NewBlock([]byte(letter)) + filterAllowList[block.Cid()] = true + } + + // Send the request + partnerWantBlocksHaves(e, wantBlks, wantHaves, true, partnerID) + + // Check result + next := <-e.Outbox() + envelope := <-next + + expBlks := strings.Split(exp.blks, "") + expHaves := strings.Split(exp.haves, "") + expDontHaves := strings.Split(exp.dontHaves, "") + + err := checkOutput(t, e, envelope, expBlks, expHaves, expDontHaves) + if err != nil { + t.Fatal(err) + } + } + } +} + func TestTaggingPeers(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -1199,24 +1527,24 @@ func TestTaggingUseful(t *testing.T) { } } -func partnerWantBlocks(e *Engine, keys []string, partner peer.ID) { +func partnerWantBlocks(e *Engine, wantBlocks []string, partner peer.ID) { add := message.New(false) - for i, letter := range keys { + for i, letter := range wantBlocks { block := blocks.NewBlock([]byte(letter)) - add.AddEntry(block.Cid(), int32(len(keys)-i), pb.Message_Wantlist_Block, true) + add.AddEntry(block.Cid(), int32(len(wantBlocks)-i), pb.Message_Wantlist_Block, true) } e.MessageReceived(context.Background(), partner, add) } -func partnerWantBlocksHaves(e *Engine, keys []string, wantHaves []string, sendDontHave bool, partner peer.ID) { +func partnerWantBlocksHaves(e *Engine, wantBlocks []string, wantHaves []string, sendDontHave bool, partner peer.ID) { add := message.New(false) - priority := int32(len(wantHaves) + len(keys)) + priority := int32(len(wantHaves) + len(wantBlocks)) for _, letter := range wantHaves { block := blocks.NewBlock([]byte(letter)) add.AddEntry(block.Cid(), priority, pb.Message_Wantlist_Have, sendDontHave) priority-- } - for _, letter := range keys { + for _, letter := range wantBlocks { block := blocks.NewBlock([]byte(letter)) add.AddEntry(block.Cid(), priority, pb.Message_Wantlist_Block, sendDontHave) priority--