diff --git a/go.mod b/go.mod index f3877b2b..e961fce2 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/libp2p/go-libp2p-testing v0.1.1 github.com/libp2p/go-msgio v0.0.4 github.com/multiformats/go-multiaddr v0.2.1 + gopkg.in/src-d/go-log.v1 v1.0.1 ) go 1.12 diff --git a/go.sum b/go.sum index 498858a5..d8e85ec0 100644 --- a/go.sum +++ b/go.sum @@ -165,6 +165,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d h1:68u9r4wEvL3gYg2jvAOgROwZ3H+Y3hIDk4tbbmIjcYQ= github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d/go.mod h1:5Ky9EC2xfoUKUor0Hjgi2BJhCSXJfMOFlmyYrVKGQMk= @@ -309,6 +310,7 @@ github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcncea github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= @@ -406,6 +408,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smola/gocompat v0.2.0 h1:6b1oIMlUXIpz//VKEDzPVBK8KG7beVwmHIUEBIs/Pns= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= @@ -421,6 +424,7 @@ github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tL github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/src-d/envconfig v1.0.0 h1:/AJi6DtjFhZKNx3OB2qMsq7y4yT5//AeSZIe7rk+PX8= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -440,6 +444,7 @@ github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvX github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= +github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -538,6 +543,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= +gopkg.in/src-d/go-log.v1 v1.0.1 h1:heWvX7J6qbGWbeFS/aRmiy1eYaT+QMV6wNvHDyMjQV4= gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 5c7da903..4b2dea49 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -418,7 +418,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { // Create a new message msg := bsmsg.New(true) - // log.Debugf(" %s got %d tasks", lu.P(e.self), len(nextTasks)) + log.Debugw("Bitswap process tasks", "local", e.self, "taskCount", len(nextTasks)) // Amount of data in the request queue still waiting to be popped msg.SetPendingBytes(int32(pendingBytes)) @@ -456,12 +456,11 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { if blk == nil { // If the client requested DONT_HAVE, add DONT_HAVE to the message if t.SendDontHave { - // log.Debugf(" make evlp %s->%s DONT_HAVE (expected block) %s", lu.P(e.self), lu.P(p), lu.C(c)) msg.AddDontHave(c) } } else { // Add the block to the message - // log.Debugf(" make evlp %s->%s block: %s (%d bytes)", lu.P(e.self), lu.P(p), lu.C(c), len(blk.RawData())) + // log.Debugf(" make evlp %s->%s block: %s (%d bytes)", e.self, p, c, len(blk.RawData())) msg.AddBlock(blk) } } @@ -472,7 +471,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { continue } - // log.Debugf(" sending message %s->%s (%d blks / %d presences / %d bytes)\n", lu.P(e.self), lu.P(p), blkCount, presenceCount, msg.Size()) + log.Debugw("Bitswap engine -> msg", "local", e.self, "to", p, "blockCount", len(msg.Blocks()), "presenceCount", len(msg.BlockPresences()), "size", msg.Size()) return &Envelope{ Peer: p, Message: msg, @@ -512,21 +511,21 @@ func (e *Engine) Peers() []peer.ID { func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) { entries := m.Wantlist() - // if len(entries) > 0 { - // log.Debugf("engine-%s received message from %s with %d entries\n", lu.P(e.self), lu.P(p), len(entries)) - // for _, et := range entries { - // if !et.Cancel { - // if et.WantType == pb.Message_Wantlist_Have { - // log.Debugf(" recv %s<-%s: want-have %s\n", lu.P(e.self), lu.P(p), lu.C(et.Cid)) - // } else { - // log.Debugf(" recv %s<-%s: want-block %s\n", lu.P(e.self), lu.P(p), lu.C(et.Cid)) - // } - // } - // } - // } + if len(entries) > 0 { + log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries)) + for _, et := range entries { + if !et.Cancel { + if et.WantType == pb.Message_Wantlist_Have { + log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid) + } else { + log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid) + } + } + } + } if m.Empty() { - log.Debugf("received empty message from %s", p) + log.Infof("received empty message from %s", p) } newWorkExists := false @@ -556,7 +555,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // Record how many bytes were received in the ledger blks := m.Blocks() for _, block := range blks { - log.Debugf("got block %s %d bytes", block, len(block.RawData())) + log.Debugw("Bitswap engine <- block", "local", e.self, "from", p, "cid", block.Cid(), "size", len(block.RawData())) l.ReceivedBytes(len(block.RawData())) } @@ -569,7 +568,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // Remove cancelled blocks from the queue for _, entry := range cancels { - // log.Debugf("%s<-%s cancel %s", lu.P(e.self), lu.P(p), lu.C(entry.Cid)) + log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid) if l.CancelWant(entry.Cid) { e.peerRequestQueue.Remove(entry.Cid, p) } @@ -585,6 +584,8 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // If the block was not found if !found { + log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) + // Only add the task to the queue if the requester wants a DONT_HAVE if e.sendDontHaves && entry.SendDontHave { newWorkExists = true @@ -593,12 +594,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap isWantBlock = true } - // if isWantBlock { - // log.Debugf(" put rq %s->%s %s as want-block (not found)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid)) - // } else { - // log.Debugf(" put rq %s->%s %s as want-have (not found)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid)) - // } - activeEntries = append(activeEntries, peertask.Task{ Topic: c, Priority: entry.Priority, @@ -611,18 +606,13 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap }, }) } - // log.Debugf(" not putting rq %s->%s %s (not found, SendDontHave false)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid)) } else { // The block was found, add it to the queue newWorkExists = true isWantBlock := e.sendAsBlock(entry.WantType, blockSize) - // if isWantBlock { - // log.Debugf(" put rq %s->%s %s as want-block (%d bytes)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid), blockSize) - // } else { - // log.Debugf(" put rq %s->%s %s as want-have (%d bytes)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid), blockSize) - // } + log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", entry.Cid, "isWantBlock", isWantBlock) // entrySize is the amount of space the entry takes up in the // message we send to the recipient. If we're sending a block, the @@ -695,12 +685,6 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) blockSize := blockSizes[k] isWantBlock := e.sendAsBlock(entry.WantType, blockSize) - // if isWantBlock { - // log.Debugf(" add-block put rq %s->%s %s as want-block (%d bytes)\n", lu.P(e.self), lu.P(l.Partner), lu.C(k), blockSize) - // } else { - // log.Debugf(" add-block put rq %s->%s %s as want-have (%d bytes)\n", lu.P(e.self), lu.P(l.Partner), lu.C(k), blockSize) - // } - entrySize := blockSize if !isWantBlock { entrySize = bsmsg.BlockPresenceSize(k) diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 0ac01107..6f5a193b 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - lu "github.com/ipfs/go-bitswap/internal/logutil" "github.com/ipfs/go-bitswap/internal/testutil" message "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" @@ -780,12 +779,12 @@ func formatBlocksDiff(blks []blocks.Block, expBlks []string) string { var out bytes.Buffer out.WriteString(fmt.Sprintf("Blocks (%d):\n", len(blks))) for _, b := range blks { - out.WriteString(fmt.Sprintf(" %s: %s\n", lu.C(b.Cid()), b.RawData())) + out.WriteString(fmt.Sprintf(" %s: %s\n", b.Cid(), b.RawData())) } out.WriteString(fmt.Sprintf("Expected (%d):\n", len(expBlks))) for _, k := range expBlks { expected := blocks.NewBlock([]byte(k)) - out.WriteString(fmt.Sprintf(" %s: %s\n", lu.C(expected.Cid()), k)) + out.WriteString(fmt.Sprintf(" %s: %s\n", expected.Cid(), k)) } return out.String() } @@ -798,16 +797,16 @@ func formatPresencesDiff(presences []message.BlockPresence, expHaves []string, e if p.Type == pb.Message_DontHave { t = "DONT_HAVE" } - out.WriteString(fmt.Sprintf(" %s - %s\n", lu.C(p.Cid), t)) + out.WriteString(fmt.Sprintf(" %s - %s\n", p.Cid, t)) } out.WriteString(fmt.Sprintf("Expected (%d):\n", len(expHaves)+len(expDontHaves))) for _, k := range expHaves { expected := blocks.NewBlock([]byte(k)) - out.WriteString(fmt.Sprintf(" %s: %s - HAVE\n", lu.C(expected.Cid()), k)) + out.WriteString(fmt.Sprintf(" %s: %s - HAVE\n", expected.Cid(), k)) } for _, k := range expDontHaves { expected := blocks.NewBlock([]byte(k)) - out.WriteString(fmt.Sprintf(" %s: %s - DONT_HAVE\n", lu.C(expected.Cid()), k)) + out.WriteString(fmt.Sprintf(" %s: %s - DONT_HAVE\n", expected.Cid(), k)) } return out.String() } diff --git a/internal/logutil/logutil.go b/internal/logutil/logutil.go deleted file mode 100644 index 8cba2a47..00000000 --- a/internal/logutil/logutil.go +++ /dev/null @@ -1,26 +0,0 @@ -package logutil - -import ( - cid "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-core/peer" -) - -func C(c cid.Cid) string { - if c.Defined() { - str := c.String() - return str[len(str)-6:] - } - return "" -} - -func P(p peer.ID) string { - if p != "" { - str := p.String() - limit := 6 - if len(str) < limit { - limit = len(str) - } - return str[len(str)-limit:] - } - return "" -} diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 922ab633..b3eb5384 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -149,6 +149,7 @@ type DontHaveTimeoutManager interface { // New creates a new MessageQueue. func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue { onTimeout := func(ks []cid.Cid) { + log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p) onDontHaveTimeout(p, ks) } dhTimeoutMgr := newDontHaveTimeoutMgr(ctx, newPeerConnection(p, network), onTimeout) @@ -401,7 +402,7 @@ func (mq *MessageQueue) sendMessage() { return } - // mq.logOutgoingMessage(message) + mq.logOutgoingMessage(message) // Try to send this message repeatedly for i := 0; i < maxRetries; i++ { @@ -450,24 +451,25 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) { mq.dhTimeoutMgr.AddPending(wants) } -// func (mq *MessageQueue) logOutgoingMessage(msg bsmsg.BitSwapMessage) { -// entries := msg.Wantlist() -// for _, e := range entries { -// if e.Cancel { -// if e.WantType == pb.Message_Wantlist_Have { -// log.Debugf("send %s->%s: cancel-have %s\n", lu.P(mq.network.Self()), lu.P(mq.p), lu.C(e.Cid)) -// } else { -// log.Debugf("send %s->%s: cancel-block %s\n", lu.P(mq.network.Self()), lu.P(mq.p), lu.C(e.Cid)) -// } -// } else { -// if e.WantType == pb.Message_Wantlist_Have { -// log.Debugf("send %s->%s: want-have %s\n", lu.P(mq.network.Self()), lu.P(mq.p), lu.C(e.Cid)) -// } else { -// log.Debugf("send %s->%s: want-block %s\n", lu.P(mq.network.Self()), lu.P(mq.p), lu.C(e.Cid)) -// } -// } -// } -// } +func (mq *MessageQueue) logOutgoingMessage(msg bsmsg.BitSwapMessage) { + self := mq.network.Self() + entries := msg.Wantlist() + for _, e := range entries { + if e.Cancel { + if e.WantType == pb.Message_Wantlist_Have { + log.Debugw("Bitswap -> cancel-have", "local", self, "to", mq.p, "cid", e.Cid) + } else { + log.Debugw("Bitswap -> cancel-block", "local", self, "to", mq.p, "cid", e.Cid) + } + } else { + if e.WantType == pb.Message_Wantlist_Have { + log.Debugw("Bitswap -> want-have", "local", self, "to", mq.p, "cid", e.Cid) + } else { + log.Debugw("Bitswap -> want-block", "local", self, "to", mq.p, "cid", e.Cid) + } + } + } +} func (mq *MessageQueue) hasPendingWork() bool { return mq.pendingWorkCount() > 0 diff --git a/internal/peermanager/peerwantmanager.go b/internal/peermanager/peerwantmanager.go index 9833b3e8..2e8658bc 100644 --- a/internal/peermanager/peerwantmanager.go +++ b/internal/peermanager/peerwantmanager.go @@ -4,8 +4,6 @@ import ( "bytes" "fmt" - lu "github.com/ipfs/go-bitswap/internal/logutil" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" ) @@ -194,12 +192,12 @@ func (pwm *peerWantManager) GetWantHaves() []cid.Cid { func (pwm *peerWantManager) String() string { var b bytes.Buffer for p, ws := range pwm.peerWants { - b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", lu.P(p), ws.wantHaves.Len(), ws.wantBlocks.Len())) + b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len())) for _, c := range ws.wantHaves.Keys() { - b.WriteString(fmt.Sprintf(" want-have %s\n", lu.C(c))) + b.WriteString(fmt.Sprintf(" want-have %s\n", c)) } for _, c := range ws.wantBlocks.Keys() { - b.WriteString(fmt.Sprintf(" want-block %s\n", lu.C(c))) + b.WriteString(fmt.Sprintf(" want-block %s\n", c)) } } return b.String() diff --git a/internal/session/peerresponsetracker.go b/internal/session/peerresponsetracker.go index fb3c111b..63e90461 100644 --- a/internal/session/peerresponsetracker.go +++ b/internal/session/peerresponsetracker.go @@ -18,10 +18,14 @@ func newPeerResponseTracker() *peerResponseTracker { } } +// receivedBlockFrom is called when a block is received from a peer +// (only called first time block is received) func (prt *peerResponseTracker) receivedBlockFrom(from peer.ID) { prt.firstResponder[from]++ } +// choose picks a peer from the list of candidate peers, favouring those peers +// that were first to send us previous blocks func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID { if len(peers) == 0 { return "" @@ -41,8 +45,6 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID { for _, p := range peers { counted += float64(prt.getPeerCount(p)) / float64(total) if counted > rnd { - // log.Warnf(" chose %s from %s (%d) / %s (%d) with pivot %.2f", - // lu.P(p), lu.P(peers[0]), prt.firstResponder[peers[0]], lu.P(peers[1]), prt.firstResponder[peers[1]], rnd) return p } } @@ -51,11 +53,11 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID { // math that doesn't quite cover the whole range of peers in the for loop // so just choose the last peer. index := len(peers) - 1 - // log.Warnf(" chose last (indx %d) %s from %s (%d) / %s (%d) with pivot %.2f", - // index, lu.P(peers[index]), lu.P(peers[0]), prt.firstResponder[peers[0]], lu.P(peers[1]), prt.firstResponder[peers[1]], rnd) return peers[index] } +// getPeerCount returns the number of times the peer was first to send us a +// block func (prt *peerResponseTracker) getPeerCount(p peer.ID) int { count, ok := prt.firstResponder[p] if ok { diff --git a/internal/session/session.go b/internal/session/session.go index faf01cb7..079a4f19 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -6,7 +6,6 @@ import ( bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" bsgetter "github.com/ipfs/go-bitswap/internal/getter" - lu "github.com/ipfs/go-bitswap/internal/logutil" notifications "github.com/ipfs/go-bitswap/internal/notifications" bspm "github.com/ipfs/go-bitswap/internal/peermanager" bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" @@ -178,7 +177,7 @@ func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontH ks = interestedRes[0] haves = interestedRes[1] dontHaves = interestedRes[2] - // s.logReceiveFrom(from, ks, haves, dontHaves) + s.logReceiveFrom(from, ks, haves, dontHaves) // Inform the session want sender that a message has been received s.sws.Update(from, ks, haves, dontHaves) @@ -194,19 +193,19 @@ func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontH } } -// func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { -// // log.Infof("Ses%d<-%s: %d blocks, %d haves, %d dont haves\n", -// // s.id, from, len(interestedKs), len(wantedHaves), len(wantedDontHaves)) -// for _, c := range interestedKs { -// log.Warnf("Ses%d %s<-%s: block %s\n", s.id, lu.P(s.self), lu.P(from), lu.C(c)) -// } -// for _, c := range haves { -// log.Warnf("Ses%d %s<-%s: HAVE %s\n", s.id, lu.P(s.self), lu.P(from), lu.C(c)) -// } -// for _, c := range dontHaves { -// log.Warnf("Ses%d %s<-%s: DONT_HAVE %s\n", s.id, lu.P(s.self), lu.P(from), lu.C(c)) -// } -// } +func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { + // log.Debugf("Ses%d<-%s: %d blocks, %d haves, %d dont haves\n", + // s.id, from, len(interestedKs), len(wantedHaves), len(wantedDontHaves)) + for _, c := range interestedKs { + log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id) + } + for _, c := range haves { + log.Debugw("Bitswap <- HAVE", "local", s.self, "from", from, "cid", c, "session", s.id) + } + for _, c := range dontHaves { + log.Debugw("Bitswap <- DONT_HAVE", "local", s.self, "from", from, "cid", c, "session", s.id) + } +} // GetBlock fetches a single block. func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) { @@ -328,9 +327,6 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { wants = s.sw.PrepareBroadcast() } - // log.Warnf("\n\n\n\n\nSes%d: broadcast %d keys\n\n\n\n\n", s.id, len(live)) - // log.Infof("Ses%d: broadcast %d keys\n", s.id, len(live)) - // Broadcast a want-have for the live wants to everyone we're connected to s.wm.BroadcastWantHaves(ctx, s.id, wants) @@ -340,7 +336,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { // Search for providers who have the first want in the list. // Typically if the provider has the first block they will have // the rest of the blocks also. - log.Infof("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, lu.C(wants[0]), len(wants)) + log.Infof("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, wants[0], len(wants)) s.findMorePeers(ctx, wants[0]) } s.resetIdleTick() @@ -453,7 +449,6 @@ func (s *Session) resetIdleTick() { tickDelay = s.initialSearchDelay } else { avLat := s.latencyTrkr.averageLatency() - // log.Warnf("averageLatency %s", avLat) tickDelay = s.baseTickDelay + (3 * avLat) } tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks) diff --git a/internal/session/sessionwantsender.go b/internal/session/sessionwantsender.go index df963f9e..7af7b32a 100644 --- a/internal/session/sessionwantsender.go +++ b/internal/session/sessionwantsender.go @@ -4,7 +4,6 @@ import ( "context" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" - lu "github.com/ipfs/go-bitswap/internal/logutil" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" @@ -135,7 +134,6 @@ func (sws *sessionWantSender) Add(ks []cid.Cid) { // Update is called when the session receives a message with incoming blocks // or HAVE / DONT_HAVE func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { - // fmt.Printf("Update(%s, %d, %d, %d, %t)\n", lu.P(from), len(ks), len(haves), len(dontHaves)) hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0 if !hasUpdate { return @@ -149,7 +147,6 @@ func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid // SignalAvailability is called by the PeerManager to signal that a peer has // connected / disconnected func (sws *sessionWantSender) SignalAvailability(p peer.ID, isAvailable bool) { - // fmt.Printf("SignalAvailability(%s, %t)\n", lu.P(p), isAvailable) availability := peerAvailability{p, isAvailable} sws.addChange(change{availability: availability}) } @@ -236,9 +233,7 @@ func (sws *sessionWantSender) onChange(changes []change) { // If there are some connected peers, send any pending wants if sws.spm.HasPeers() { - // fmt.Printf("sendNextWants()\n") sws.sendNextWants(newlyAvailable) - // fmt.Println(sws) } } @@ -280,7 +275,6 @@ func (sws *sessionWantSender) processAvailability(availability map[peer.ID]bool) // trackWant creates a new entry in the map of CID -> want info func (sws *sessionWantSender) trackWant(c cid.Cid) { - // fmt.Printf("trackWant %s\n", lu.C(c)) if _, ok := sws.wants[c]; ok { return } @@ -304,7 +298,7 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid { for _, upd := range updates { for _, c := range upd.ks { blkCids.Add(c) - log.Warnf("received block %s", lu.C(c)) + // Remove the want removed := sws.removeWant(c) if removed != nil { @@ -382,7 +376,7 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid { go func() { for p := range prunePeers { // Peer doesn't have anything we want, so remove it - log.Infof("peer %s sent too many dont haves", lu.P(p)) + log.Infof("peer %s sent too many dont haves, removing from session %d", p, sws.ID()) sws.SignalAvailability(p, false) } }() @@ -469,7 +463,6 @@ func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) { // We already sent a want-block to a peer and haven't yet received a // response yet if wi.sentTo != "" { - // fmt.Printf(" q - already sent want-block %s to %s\n", lu.C(c), lu.P(wi.sentTo)) continue } @@ -477,12 +470,9 @@ func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) { // corresponding to this want, so we must wait to discover more peers if wi.bestPeer == "" { // TODO: work this out in real time instead of using bestP? - // fmt.Printf(" q - no best peer for %s\n", lu.C(c)) continue } - // fmt.Printf(" q - send best: %s: %s\n", lu.C(c), lu.P(wi.bestPeer)) - // Record that we are sending a want-block for this want to the peer sws.setWantSentTo(c, wi.bestPeer) @@ -503,12 +493,8 @@ func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) { // sendWants sends want-have and want-blocks to the appropriate peers func (sws *sessionWantSender) sendWants(sends allWants) { - // fmt.Printf(" send wants to %d peers\n", len(sends)) - // For each peer we're sending a request to for p, snd := range sends { - // fmt.Printf(" send %d wants to %s\n", snd.wantBlocks.Len(), lu.P(p)) - // Piggyback some other want-haves onto the request to the peer for _, c := range sws.getPiggybackWantHaves(p, snd.wantBlocks) { snd.wantHaves.Add(c) diff --git a/internal/sessionpeermanager/sessionpeermanager.go b/internal/sessionpeermanager/sessionpeermanager.go index 90233c72..499aa830 100644 --- a/internal/sessionpeermanager/sessionpeermanager.go +++ b/internal/sessionpeermanager/sessionpeermanager.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - lu "github.com/ipfs/go-bitswap/internal/logutil" logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-core/peer" @@ -30,6 +29,7 @@ type SessionPeerManager struct { tagger PeerTagger tag string + id uint64 plk sync.RWMutex peers map[peer.ID]struct{} peersDiscovered bool @@ -38,6 +38,7 @@ type SessionPeerManager struct { // New creates a new SessionPeerManager func New(id uint64, tagger PeerTagger) *SessionPeerManager { return &SessionPeerManager{ + id: id, tag: fmt.Sprint("bs-ses-", id), tagger: tagger, peers: make(map[peer.ID]struct{}), @@ -62,7 +63,7 @@ func (spm *SessionPeerManager) AddPeer(p peer.ID) bool { // connection spm.tagger.TagPeer(p, spm.tag, sessionPeerTagValue) - log.Debugf("Added peer %s to session (%d peers)\n", p, len(spm.peers)) + log.Debugw("Bitswap: Added peer to session", "session", spm.id, "peer", p, "peerCount", len(spm.peers)) return true } @@ -79,7 +80,7 @@ func (spm *SessionPeerManager) RemovePeer(p peer.ID) bool { delete(spm.peers, p) spm.tagger.UntagPeer(p, spm.tag) - log.Debugf("Removed peer %s from session (%d peers)", lu.P(p), len(spm.peers)) + log.Debugw("Bitswap: removed peer from session", "session", spm.id, "peer", p, "peerCount", len(spm.peers)) return true } diff --git a/internal/wantmanager/wantmanager.go b/internal/wantmanager/wantmanager.go index 254ea979..0301356d 100644 --- a/internal/wantmanager/wantmanager.go +++ b/internal/wantmanager/wantmanager.go @@ -7,6 +7,7 @@ import ( bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" "github.com/ipfs/go-bitswap/internal/sessionmanager" bsswl "github.com/ipfs/go-bitswap/internal/sessionwantlist" + "gopkg.in/src-d/go-log.v1" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" @@ -75,7 +76,7 @@ func (wm *WantManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Ci // BroadcastWantHaves is called when want-haves should be broadcast to all // connected peers (as part of session discovery) func (wm *WantManager) BroadcastWantHaves(ctx context.Context, ses uint64, wantHaves []cid.Cid) { - // log.Warnf("BroadcastWantHaves session%d: %s", ses, wantHaves) + log.Infof("BroadcastWantHaves session%d: %s", ses, wantHaves) // Record broadcast wants wm.bcwl.Add(wantHaves, ses)