Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat: add basic tracing (#562)
Browse files Browse the repository at this point in the history
This adds tracing spans to the costly Bitswap entry points. It doesn't
instrument the bitswap internals, which will take some time. In
go-ipfs, this will at least let us know the contribution of Bitswap to
the overall request handling time.

This also plumbs contexts through internally so that they reach the
content routing APIs, so that traces are propagated through and we can
start instrumenting e.g. the DHT.
  • Loading branch information
guseggert authored Jun 14, 2022
1 parent f8443ed commit b18a91d
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 14 deletions.
15 changes: 13 additions & 2 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"time"

delay "github.com/ipfs/go-ipfs-delay"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

deciface "github.com/ipfs/go-bitswap/decision"
"github.com/ipfs/go-bitswap/internal"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
"github.com/ipfs/go-bitswap/internal/decision"
"github.com/ipfs/go-bitswap/internal/defaults"
Expand Down Expand Up @@ -425,8 +428,10 @@ type counters struct {

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
func (bs *Bitswap) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "GetBlock", trace.WithAttributes(attribute.String("Key", k.String())))
defer span.End()
return bsgetter.SyncGetBlock(ctx, k, bs.GetBlocks)
}

// WantlistForPeer returns the currently understood list of blocks requested by a
Expand All @@ -453,13 +458,17 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()
session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return session.GetBlocks(ctx, keys)
}

// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.String("Block", blk.Cid().String())))
defer span.End()
return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil)
}

Expand Down Expand Up @@ -696,5 +705,7 @@ func (bs *Bitswap) IsOnline() bool {
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
ctx, span := internal.StartSpan(ctx, "NewSession")
defer span.End()
return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ require (
github.com/libp2p/go-msgio v0.0.6
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multistream v0.2.2
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
go.uber.org/zap v1.16.0
)

require (
github.com/btcsuite/btcd v0.21.0-beta // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/huin/goupnp v1.0.0 // indirect
Expand Down
14 changes: 13 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgO
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
Expand Down Expand Up @@ -179,6 +184,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -817,8 +824,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -851,6 +859,10 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
3 changes: 1 addition & 2 deletions internal/blockpresencemanager/blockpresencemanager_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockpresencemanager

import (
"fmt"
"testing"

"github.com/ipfs/go-bitswap/internal/testutil"
Expand Down Expand Up @@ -233,7 +232,7 @@ func TestAllPeersDoNotHaveBlock(t *testing.T) {
bpm.AllPeersDoNotHaveBlock(tc.peers, tc.ks),
tc.exp,
) {
t.Fatal(fmt.Sprintf("test case %d failed: expected matching keys", i))
t.Fatalf("test case %d failed: expected matching keys", i)
}
}
}
6 changes: 6 additions & 0 deletions internal/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/ipfs/go-bitswap/internal"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
logging "github.com/ipfs/go-log"

Expand All @@ -22,6 +23,9 @@ type GetBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)
// blocks that returns a channel, and uses that function to return the
// block syncronously.
func SyncGetBlock(p context.Context, k cid.Cid, gb GetBlocksFunc) (blocks.Block, error) {
p, span := internal.StartSpan(p, "Getter.SyncGetBlock")
defer span.End()

if !k.Defined() {
log.Error("undefined cid in GetBlock")
return nil, ipld.ErrNotFound{Cid: k}
Expand Down Expand Up @@ -65,6 +69,8 @@ type WantFunc func(context.Context, []cid.Cid)
// incoming blocks.
func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid, notif notifications.PubSub,
want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Getter.AsyncGetBlocks")
defer span.End()

// If there are no keys supplied, just return a closed channel
if len(keys) == 0 {
Expand Down
19 changes: 13 additions & 6 deletions internal/providerquerymanager/providerquerymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ type providerQueryMessage interface {
}

type receivedProviderMessage struct {
k cid.Cid
p peer.ID
ctx context.Context
k cid.Cid
p peer.ID
}

type finishedProviderQueryMessage struct {
k cid.Cid
ctx context.Context
k cid.Cid
}

type newProvideQueryMessage struct {
ctx context.Context
k cid.Cid
inProgressRequestChan chan<- inProgressRequest
}
Expand Down Expand Up @@ -120,6 +123,7 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,

select {
case pqm.providerQueryMessages <- &newProvideQueryMessage{
ctx: sessionCtx,
k: k,
inProgressRequestChan: inProgressRequestChan,
}:
Expand Down Expand Up @@ -244,8 +248,9 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
}
select {
case pqm.providerQueryMessages <- &receivedProviderMessage{
k: k,
p: p,
ctx: findProviderCtx,
k: k,
p: p,
}:
case <-pqm.ctx.Done():
return
Expand All @@ -256,7 +261,8 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
cancel()
select {
case pqm.providerQueryMessages <- &finishedProviderQueryMessage{
k: k,
ctx: findProviderCtx,
k: k,
}:
case <-pqm.ctx.Done():
}
Expand Down Expand Up @@ -372,6 +378,7 @@ func (npqm *newProvideQueryMessage) debugMessage() string {
func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
requestStatus, ok := pqm.inProgressRequestStatuses[npqm.k]
if !ok {

ctx, cancelFn := context.WithCancel(pqm.ctx)
requestStatus = &inProgressRequestStatus{
listeners: make(map[chan peer.ID]struct{}),
Expand Down
10 changes: 8 additions & 2 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/ipfs/go-bitswap/internal"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bsgetter "github.com/ipfs/go-bitswap/internal/getter"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
Expand Down Expand Up @@ -228,14 +229,19 @@ func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []c
}

// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
func (s *Session) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock")
defer span.End()
return bsgetter.SyncGetBlock(ctx, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

ctx = logging.ContextWithLoggable(ctx, s.uuid)

return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
Expand Down
7 changes: 7 additions & 0 deletions internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package sessionmanager

import (
"context"
"strconv"
"sync"
"time"

cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-bitswap/internal"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
bssession "github.com/ipfs/go-bitswap/internal/session"
Expand Down Expand Up @@ -87,6 +91,9 @@ func (sm *SessionManager) NewSession(ctx context.Context,
rebroadcastDelay delay.D) exchange.Fetcher {
id := sm.GetNextSessionID()

ctx, span := internal.StartSpan(ctx, "SessionManager.NewSession", trace.WithAttributes(attribute.String("ID", strconv.FormatUint(id, 10))))
defer span.End()

pm := sm.peerManagerFactory(ctx, id)
session := sm.sessionFactory(ctx, sm, id, pm, sm.sessionInterestManager, sm.peerManager, sm.blockPresenceManager, sm.notif, provSearchDelay, rebroadcastDelay, sm.self)

Expand Down
13 changes: 13 additions & 0 deletions internal/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package internal

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("go-bitswap").Start(ctx, fmt.Sprintf("Bitswap.%s", name), opts...)
}

0 comments on commit b18a91d

Please sign in to comment.