Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AddCat Bitswap Integration Tests #444

Merged
merged 12 commits into from
Dec 13, 2014
1 change: 1 addition & 0 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

var ValueTypeMismatch = errors.New("The retrieved value is not a Block")

// Blockstore wraps a ThreadSafeDatastore
type Blockstore interface {
DeleteBlock(u.Key) error
Has(u.Key) (bool, error)
Expand Down
21 changes: 2 additions & 19 deletions blockservice/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
offline "github.com/jbenet/go-ipfs/exchange/offline"
"github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
)

Expand Down Expand Up @@ -63,23 +60,9 @@ func TestBlocks(t *testing.T) {
}

func TestGetBlocksSequential(t *testing.T) {
net := tn.VirtualNetwork()
rs := mock.VirtualRoutingServer()
sg := bitswap.NewSessionGenerator(net, rs)
var servs = Mocks(t, 4)
bg := blocksutil.NewBlockGenerator()

instances := sg.Instances(4)
blks := bg.Blocks(50)
// TODO: verify no duplicates

var servs []*BlockService
for _, i := range instances {
bserv, err := New(i.Blockstore, i.Exchange)
if err != nil {
t.Fatal(err)
}
servs = append(servs, bserv)
}

var keys []u.Key
for _, blk := range blks {
Expand All @@ -89,7 +72,7 @@ func TestGetBlocksSequential(t *testing.T) {

t.Log("one instance at a time, get blocks concurrently")

for i := 1; i < len(instances); i++ {
for i := 1; i < len(servs); i++ {
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
out := servs[i].GetBlocks(ctx, keys)
gotten := make(map[u.Key]*blocks.Block)
Expand Down
29 changes: 29 additions & 0 deletions blockservice/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package blockservice

import (
"testing"

bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
delay "github.com/jbenet/go-ipfs/util/delay"
)

// Mocks returns |n| connected mock Blockservices
func Mocks(t *testing.T, n int) []*BlockService {
net := tn.VirtualNetwork(delay.Fixed(0))
rs := mockrouting.NewServer()
sg := bitswap.NewSessionGenerator(net, rs)

instances := sg.Instances(n)

var servs []*BlockService
for _, i := range instances {
bserv, err := New(i.Blockstore(), i.Exchange)
if err != nil {
t.Fatal(err)
}
servs = append(servs, bserv)
}
return servs
}
6 changes: 3 additions & 3 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
pin "github.com/jbenet/go-ipfs/pin"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
"github.com/jbenet/go-ipfs/util/eventlog"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)

const IpnsValidatorTag = "ipns"
Expand All @@ -52,7 +52,7 @@ type IpfsNode struct {
Peerstore peer.Peerstore

// the local datastore
Datastore u.ThreadSafeDatastoreCloser
Datastore ds2.ThreadSafeDatastoreCloser

// the network message stream
Network inet.Network
Expand Down
9 changes: 5 additions & 4 deletions core/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (

config "github.com/jbenet/go-ipfs/config"
u "github.com/jbenet/go-ipfs/util"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
"github.com/jbenet/go-ipfs/util/debugerror"
)

func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
func makeDatastore(cfg config.Datastore) (ds2.ThreadSafeDatastoreCloser, error) {
if len(cfg.Type) == 0 {
return nil, debugerror.Errorf("config datastore.type required")
}
Expand All @@ -23,7 +24,7 @@ func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
return makeLevelDBDatastore(cfg)

case "memory":
return u.CloserWrap(syncds.MutexWrap(ds.NewMapDatastore())), nil
return ds2.CloserWrap(syncds.MutexWrap(ds.NewMapDatastore())), nil

case "fs":
log.Warning("using fs.Datastore at .datastore for testing.")
Expand All @@ -32,13 +33,13 @@ func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
return nil, err
}
ktd := ktds.Wrap(d, u.B58KeyConverter)
return u.CloserWrap(syncds.MutexWrap(ktd)), nil
return ds2.CloserWrap(syncds.MutexWrap(ktd)), nil
}

return nil, debugerror.Errorf("Unknown datastore type: %s", cfg.Type)
}

func makeLevelDBDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
func makeLevelDBDatastore(cfg config.Datastore) (ds2.ThreadSafeDatastoreCloser, error) {
if len(cfg.Path) == 0 {
return nil, debugerror.Errorf("config datastore.path required for leveldb")
}
Expand Down
6 changes: 3 additions & 3 deletions core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
path "github.com/jbenet/go-ipfs/path"
peer "github.com/jbenet/go-ipfs/peer"
mdht "github.com/jbenet/go-ipfs/routing/mock"
"github.com/jbenet/go-ipfs/util"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
)

// NewMockNode constructs an IpfsNode for use in tests.
Expand All @@ -39,10 +39,10 @@ func NewMockNode() (*IpfsNode, error) {

// Temp Datastore
dstore := ds.NewMapDatastore()
nd.Datastore = util.CloserWrap(syncds.MutexWrap(dstore))
nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore))

// Routing
dht := mdht.NewMockRouter(nd.Identity, nd.Datastore)
dht := mdht.NewServer().ClientWithDatastore(nd.Identity, nd.Datastore)
nd.Routing = dht

// Bitswap
Expand Down
160 changes: 160 additions & 0 deletions epictest/addcat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package epictest

import (
"bytes"
randcrypto "crypto/rand"
"fmt"
"io"
"os"
"testing"
"time"

blockservice "github.com/jbenet/go-ipfs/blockservice"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag"
path "github.com/jbenet/go-ipfs/path"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
uio "github.com/jbenet/go-ipfs/unixfs/io"
util "github.com/jbenet/go-ipfs/util"
errors "github.com/jbenet/go-ipfs/util/debugerror"
delay "github.com/jbenet/go-ipfs/util/delay"
)

func Test100MBInstantaneous(t *testing.T) {
t.Log("a sanity check")

t.Parallel()

conf := Config{
NetworkLatency: 0,
RoutingLatency: 0,
BlockstoreLatency: 0,
DataAmountBytes: 100 * 1024 * 1024,
}

AddCatBytes(conf)
}

func TestDegenerateSlowBlockstore(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{BlockstoreLatency: 50 * time.Millisecond}

if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
}

func TestDegenerateSlowNetwork(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{NetworkLatency: 400 * time.Millisecond}

if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
}

func TestDegenerateSlowRouting(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{RoutingLatency: 400 * time.Millisecond}

if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
}

func Test100MBMacbookCoastToCoast(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{
DataAmountBytes: 100 * 1024 * 1024,
}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()

if err := AddCatBytes(conf); err != nil {
t.Fatal(err)
}
}

func AddCatPowers(conf Config, megabytesMax int64) error {
var i int64
for i = 1; i < megabytesMax; i = i * 2 {
fmt.Printf("%d MB\n", i)
conf.DataAmountBytes = i * 1024 * 1024
if err := AddCatBytes(conf); err != nil {
return err
}
}
return nil
}

func AddCatBytes(conf Config) error {

sessionGenerator := bitswap.NewSessionGenerator(
tn.VirtualNetwork(delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork
mockrouting.NewServerWithDelay(delay.Fixed(conf.RoutingLatency)),
)

adder := sessionGenerator.Next()
catter := sessionGenerator.Next()
catter.SetBlockstoreLatency(conf.BlockstoreLatency)

adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation
var data bytes.Buffer
// FIXME replace with a random data generator that reproduces data given a seed value
io.Copy(&data, &io.LimitedReader{R: randcrypto.Reader, N: conf.DataAmountBytes})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use go-random for a Reader with much faster randomness generation. dont really need cryptographic strength for these tests, and randcrypto is slow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Wonderful. I was looking for a way to get determinism.

71d6e5c

The icing on the cake would be for random to provide an io.Reader. It would cut down peak memory usage and make it possible to create larger tests. Not critical yet. The blockstore will limit test size before that will.

random.NewPseudoRandomReader(count, seed)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh, i thought it did. oh well

On Sat, Dec 13, 2014 at 7:41 AM, Brian Tiger Chow [email protected]
wrote:

In epictest/addcat_test.go
#444 (diff):

+func AddCatBytes(conf Config) error {
+

  • sessionGenerator := bitswap.NewSessionGenerator(
  •   tn.VirtualNetwork(delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork
    
  •   mockrouting.NewServerWithDelay(delay.Fixed(conf.RoutingLatency)),
    
  • )
  • adder := sessionGenerator.Next()
  • catter := sessionGenerator.Next()
  • catter.SetBlockstoreLatency(conf.BlockstoreLatency)
  • adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation
  • var data bytes.Buffer
  • // FIXME replace with a random data generator that reproduces data given a seed value
  • io.Copy(&data, &io.LimitedReader{R: randcrypto.Reader, N: conf.DataAmountBytes})

Ah. Wonderful. I was looking for a way to get determinism.

71d6e5c
71d6e5c

The icing on the cake would be for random to provide an io.Reader

random.NewPseudoRandomReader(count, seed)


Reply to this email directly or view it on GitHub
https://github.com/jbenet/go-ipfs/pull/444/files#r21788868.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before

BenchmarkInstantaneousAddCat1MB       20      91798436 ns/op      11.42 MB/s
BenchmarkInstantaneousAddCat2MB       10     180849588 ns/op      11.60 MB/s
BenchmarkInstantaneousAddCat4MB        5     360574235 ns/op      11.63 MB/s
BenchmarkInstantaneousAddCat8MB        2     719927342 ns/op      11.65 MB/s

after

BenchmarkInstantaneousAddCat1MB       50      32412242 ns/op      32.35 MB/s
BenchmarkInstantaneousAddCat2MB       50      64622451 ns/op      32.45 MB/s
BenchmarkInstantaneousAddCat4MB       20     199377593 ns/op      21.04 MB/s
BenchmarkInstantaneousAddCat8MB        5     419447024 ns/op      20.00 MB/s

keyAdded, err := add(adder, bytes.NewReader(data.Bytes()))
if err != nil {
return err
}
adder.SetBlockstoreLatency(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait

readerCatted, err := cat(catter, keyAdded)
if err != nil {
return err
}

// verify
var bufout bytes.Buffer
io.Copy(&bufout, readerCatted)
if 0 != bytes.Compare(bufout.Bytes(), data.Bytes()) {
return errors.New("catted data does not match added data")
}
return nil
}

func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) {
catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore(), catter.Exchange})
nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String())
if err != nil {
return nil, err
}
return uio.NewDagReader(nodeCatted, catterdag)
}

func add(adder bitswap.Instance, r io.Reader) (util.Key, error) {
nodeAdded, err := importer.BuildDagFromReader(
r,
merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore(), adder.Exchange}),
nil,
chunk.DefaultSplitter,
)
if err != nil {
return "", err
}
return nodeAdded.Key()
}

func SkipUnlessEpic(t *testing.T) {
if os.Getenv("IPFS_EPIC_TEST") == "" {
t.SkipNow()
}
}
57 changes: 57 additions & 0 deletions epictest/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package epictest

import "testing"

func benchmarkAddCat(conf Config, b *testing.B) {
b.SetBytes(conf.DataAmountBytes)
for n := 0; n < b.N; n++ {
if err := AddCatBytes(conf); err != nil {
b.Fatal(err)
}
}
}

var instant = Config{}.All_Instantaneous()

func BenchmarkInstantaneousAddCat1MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(1), b) }
func BenchmarkInstantaneousAddCat2MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(2), b) }
func BenchmarkInstantaneousAddCat4MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(4), b) }
func BenchmarkInstantaneousAddCat8MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(8), b) }
func BenchmarkInstantaneousAddCat16MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(16), b) }
func BenchmarkInstantaneousAddCat32MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(32), b) }
func BenchmarkInstantaneousAddCat64MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(64), b) }
func BenchmarkInstantaneousAddCat128MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(128), b) }
func BenchmarkInstantaneousAddCat256MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(256), b) }

var routing = Config{}.Routing_Slow()

func BenchmarkRoutingSlowAddCat1MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(1), b) }
func BenchmarkRoutingSlowAddCat2MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(2), b) }
func BenchmarkRoutingSlowAddCat4MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(4), b) }
func BenchmarkRoutingSlowAddCat8MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(8), b) }
func BenchmarkRoutingSlowAddCat16MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(16), b) }
func BenchmarkRoutingSlowAddCat32MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(32), b) }

var network = Config{}.Network_NYtoSF()

func BenchmarkNetworkSlowAddCat1MB(b *testing.B) { benchmarkAddCat(network.Megabytes(1), b) }
func BenchmarkNetworkSlowAddCat2MB(b *testing.B) { benchmarkAddCat(network.Megabytes(2), b) }
func BenchmarkNetworkSlowAddCat4MB(b *testing.B) { benchmarkAddCat(network.Megabytes(4), b) }
func BenchmarkNetworkSlowAddCat8MB(b *testing.B) { benchmarkAddCat(network.Megabytes(8), b) }
func BenchmarkNetworkSlowAddCat16MB(b *testing.B) { benchmarkAddCat(network.Megabytes(16), b) }
func BenchmarkNetworkSlowAddCat32MB(b *testing.B) { benchmarkAddCat(network.Megabytes(32), b) }
func BenchmarkNetworkSlowAddCat64MB(b *testing.B) { benchmarkAddCat(network.Megabytes(64), b) }
func BenchmarkNetworkSlowAddCat128MB(b *testing.B) { benchmarkAddCat(network.Megabytes(128), b) }
func BenchmarkNetworkSlowAddCat256MB(b *testing.B) { benchmarkAddCat(network.Megabytes(256), b) }

var blockstore = Config{}.Blockstore_7200RPM()

func BenchmarkBlockstoreSlowAddCat1MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(1), b) }
func BenchmarkBlockstoreSlowAddCat2MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(2), b) }
func BenchmarkBlockstoreSlowAddCat4MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(4), b) }
func BenchmarkBlockstoreSlowAddCat8MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(8), b) }
func BenchmarkBlockstoreSlowAddCat16MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(16), b) }
func BenchmarkBlockstoreSlowAddCat32MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(32), b) }
func BenchmarkBlockstoreSlowAddCat64MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(64), b) }
func BenchmarkBlockstoreSlowAddCat128MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(128), b) }
func BenchmarkBlockstoreSlowAddCat256MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(256), b) }
Loading