Skip to content

Commit

Permalink
feat: add block based read timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Aug 1, 2024
1 parent 4737f6f commit 84c4aa3
Showing 1 changed file with 62 additions and 1 deletion.
63 changes: 62 additions & 1 deletion setup_bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/libp2p/go-libp2p/core/routing"
)

const perBlockTimeout = time.Second * 10

func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface {
bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)
Expand Down Expand Up @@ -78,7 +80,13 @@ func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routi
bsclient.WithoutDuplicatedBlockStats(),
)
bn.Start(bswap)
return bswap

wrapExch := &timeoutBlockExchange{
inner: bswap,
perBlockTimeout: perBlockTimeout,
}

return wrapExch
}

type noopPeerLedger struct{}
Expand Down Expand Up @@ -120,3 +128,56 @@ func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks
// Rainbow does not notify when we get new blocks in our Blockservice.
return nil
}

type timeoutBlockExchange struct {
inner exchange.Interface
perBlockTimeout time.Duration
}

func (t *timeoutBlockExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, cancel := context.WithTimeout(ctx, t.perBlockTimeout)
defer cancel()
return t.inner.GetBlock(ctx, c)
}

func (t *timeoutBlockExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
ctx, cancel := context.WithCancel(ctx)
blocksCh, err := t.inner.GetBlocks(ctx, cids)
if err != nil {
cancel()
return nil, err

Check warning on line 148 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L143-L148

Added lines #L143 - L148 were not covered by tests
}

retCh := make(chan blocks.Block)
go func() {
defer close(retCh)
defer cancel()
timer := time.NewTimer(t.perBlockTimeout)
for b := range blocksCh {
select {
case retCh <- b:
if !timer.Stop() {
<-timer.C

Check warning on line 160 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L151-L160

Added lines #L151 - L160 were not covered by tests
}
timer.Reset(t.perBlockTimeout)
case <-timer.C:
return

Check warning on line 164 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L162-L164

Added lines #L162 - L164 were not covered by tests
}
}
if !timer.Stop() {
<-timer.C

Check warning on line 168 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L167-L168

Added lines #L167 - L168 were not covered by tests
}
}()

return retCh, nil

Check warning on line 172 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L172

Added line #L172 was not covered by tests
}

func (t *timeoutBlockExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
return t.inner.NotifyNewBlocks(ctx, blocks...)
}

func (t *timeoutBlockExchange) Close() error {
return t.inner.Close()

Check warning on line 180 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L179-L180

Added lines #L179 - L180 were not covered by tests
}

var _ exchange.Interface = (*timeoutBlockExchange)(nil)

0 comments on commit 84c4aa3

Please sign in to comment.