Skip to content

Commit

Permalink
Merge pull request #444 from jbenet/test/mocks-on-mocks-on-mocks
Browse files Browse the repository at this point in the history
AddCat Bitswap Integration Tests
  • Loading branch information
Brian Tiger Chow committed Dec 13, 2014
2 parents 4a5a742 + 71d6e5c commit 19894c4
Show file tree
Hide file tree
Showing 28 changed files with 707 additions and 297 deletions.
1 change: 1 addition & 0 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

var ValueTypeMismatch = errors.New("The retrieved value is not a Block")

// Blockstore wraps a ThreadSafeDatastore
type Blockstore interface {
DeleteBlock(u.Key) error
Has(u.Key) (bool, error)
Expand Down
21 changes: 2 additions & 19 deletions blockservice/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
offline "github.com/jbenet/go-ipfs/exchange/offline"
"github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
)

Expand Down Expand Up @@ -63,23 +60,9 @@ func TestBlocks(t *testing.T) {
}

func TestGetBlocksSequential(t *testing.T) {
net := tn.VirtualNetwork()
rs := mock.VirtualRoutingServer()
sg := bitswap.NewSessionGenerator(net, rs)
var servs = Mocks(t, 4)
bg := blocksutil.NewBlockGenerator()

instances := sg.Instances(4)
blks := bg.Blocks(50)
// TODO: verify no duplicates

var servs []*BlockService
for _, i := range instances {
bserv, err := New(i.Blockstore, i.Exchange)
if err != nil {
t.Fatal(err)
}
servs = append(servs, bserv)
}

var keys []u.Key
for _, blk := range blks {
Expand All @@ -89,7 +72,7 @@ func TestGetBlocksSequential(t *testing.T) {

t.Log("one instance at a time, get blocks concurrently")

for i := 1; i < len(instances); i++ {
for i := 1; i < len(servs); i++ {
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
out := servs[i].GetBlocks(ctx, keys)
gotten := make(map[u.Key]*blocks.Block)
Expand Down
29 changes: 29 additions & 0 deletions blockservice/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package blockservice

import (
"testing"

bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
delay "github.com/jbenet/go-ipfs/util/delay"
)

// Mocks returns |n| connected mock Blockservices
func Mocks(t *testing.T, n int) []*BlockService {
net := tn.VirtualNetwork(delay.Fixed(0))
rs := mockrouting.NewServer()
sg := bitswap.NewSessionGenerator(net, rs)

instances := sg.Instances(n)

var servs []*BlockService
for _, i := range instances {
bserv, err := New(i.Blockstore(), i.Exchange)
if err != nil {
t.Fatal(err)
}
servs = append(servs, bserv)
}
return servs
}
6 changes: 3 additions & 3 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
pin "github.com/jbenet/go-ipfs/pin"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
"github.com/jbenet/go-ipfs/util/eventlog"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)

const IpnsValidatorTag = "ipns"
Expand All @@ -52,7 +52,7 @@ type IpfsNode struct {
Peerstore peer.Peerstore

// the local datastore
Datastore u.ThreadSafeDatastoreCloser
Datastore ds2.ThreadSafeDatastoreCloser

// the network message stream
Network inet.Network
Expand Down
9 changes: 5 additions & 4 deletions core/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (

config "github.com/jbenet/go-ipfs/config"
u "github.com/jbenet/go-ipfs/util"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
"github.com/jbenet/go-ipfs/util/debugerror"
)

func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
func makeDatastore(cfg config.Datastore) (ds2.ThreadSafeDatastoreCloser, error) {
if len(cfg.Type) == 0 {
return nil, debugerror.Errorf("config datastore.type required")
}
Expand All @@ -23,7 +24,7 @@ func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
return makeLevelDBDatastore(cfg)

case "memory":
return u.CloserWrap(syncds.MutexWrap(ds.NewMapDatastore())), nil
return ds2.CloserWrap(syncds.MutexWrap(ds.NewMapDatastore())), nil

case "fs":
log.Warning("using fs.Datastore at .datastore for testing.")
Expand All @@ -32,13 +33,13 @@ func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
return nil, err
}
ktd := ktds.Wrap(d, u.B58KeyConverter)
return u.CloserWrap(syncds.MutexWrap(ktd)), nil
return ds2.CloserWrap(syncds.MutexWrap(ktd)), nil
}

return nil, debugerror.Errorf("Unknown datastore type: %s", cfg.Type)
}

func makeLevelDBDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
func makeLevelDBDatastore(cfg config.Datastore) (ds2.ThreadSafeDatastoreCloser, error) {
if len(cfg.Path) == 0 {
return nil, debugerror.Errorf("config datastore.path required for leveldb")
}
Expand Down
6 changes: 3 additions & 3 deletions core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
path "github.com/jbenet/go-ipfs/path"
peer "github.com/jbenet/go-ipfs/peer"
mdht "github.com/jbenet/go-ipfs/routing/mock"
"github.com/jbenet/go-ipfs/util"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
)

// NewMockNode constructs an IpfsNode for use in tests.
Expand All @@ -39,10 +39,10 @@ func NewMockNode() (*IpfsNode, error) {

// Temp Datastore
dstore := ds.NewMapDatastore()
nd.Datastore = util.CloserWrap(syncds.MutexWrap(dstore))
nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore))

// Routing
dht := mdht.NewMockRouter(nd.Identity, nd.Datastore)
dht := mdht.NewServer().ClientWithDatastore(nd.Identity, nd.Datastore)
nd.Routing = dht

// Bitswap
Expand Down
161 changes: 161 additions & 0 deletions epictest/addcat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package epictest

import (
"bytes"
"fmt"
"io"
"os"
"testing"
"time"

blockservice "github.com/jbenet/go-ipfs/blockservice"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag"
path "github.com/jbenet/go-ipfs/path"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
uio "github.com/jbenet/go-ipfs/unixfs/io"
util "github.com/jbenet/go-ipfs/util"
errors "github.com/jbenet/go-ipfs/util/debugerror"
delay "github.com/jbenet/go-ipfs/util/delay"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random"
)

const kSeed = 1

func Test100MBInstantaneous(t *testing.T) {
t.Log("a sanity check")

t.Parallel()

conf := Config{
NetworkLatency: 0,
RoutingLatency: 0,
BlockstoreLatency: 0,
DataAmountBytes: 100 * 1024 * 1024,
}

AddCatBytes(conf)
}

func TestDegenerateSlowBlockstore(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{BlockstoreLatency: 50 * time.Millisecond}

if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
}

func TestDegenerateSlowNetwork(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{NetworkLatency: 400 * time.Millisecond}

if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
}

func TestDegenerateSlowRouting(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{RoutingLatency: 400 * time.Millisecond}

if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
}

func Test100MBMacbookCoastToCoast(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{
DataAmountBytes: 100 * 1024 * 1024,
}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()

if err := AddCatBytes(conf); err != nil {
t.Fatal(err)
}
}

func AddCatPowers(conf Config, megabytesMax int64) error {
var i int64
for i = 1; i < megabytesMax; i = i * 2 {
fmt.Printf("%d MB\n", i)
conf.DataAmountBytes = i * 1024 * 1024
if err := AddCatBytes(conf); err != nil {
return err
}
}
return nil
}

func AddCatBytes(conf Config) error {

sessionGenerator := bitswap.NewSessionGenerator(
tn.VirtualNetwork(delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork
mockrouting.NewServerWithDelay(delay.Fixed(conf.RoutingLatency)),
)

adder := sessionGenerator.Next()
catter := sessionGenerator.Next()
catter.SetBlockstoreLatency(conf.BlockstoreLatency)

adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation
var data bytes.Buffer
random.WritePseudoRandomBytes(conf.DataAmountBytes, &data, kSeed) // FIXME get a lazy reader
keyAdded, err := add(adder, bytes.NewReader(data.Bytes()))
if err != nil {
return err
}
adder.SetBlockstoreLatency(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait

readerCatted, err := cat(catter, keyAdded)
if err != nil {
return err
}

// verify
var bufout bytes.Buffer
io.Copy(&bufout, readerCatted)
if 0 != bytes.Compare(bufout.Bytes(), data.Bytes()) {
return errors.New("catted data does not match added data")
}
return nil
}

func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) {
catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore(), catter.Exchange})
nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String())
if err != nil {
return nil, err
}
return uio.NewDagReader(nodeCatted, catterdag)
}

func add(adder bitswap.Instance, r io.Reader) (util.Key, error) {
nodeAdded, err := importer.BuildDagFromReader(
r,
merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore(), adder.Exchange}),
nil,
chunk.DefaultSplitter,
)
if err != nil {
return "", err
}
return nodeAdded.Key()
}

func SkipUnlessEpic(t *testing.T) {
if os.Getenv("IPFS_EPIC_TEST") == "" {
t.SkipNow()
}
}
57 changes: 57 additions & 0 deletions epictest/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package epictest

import "testing"

func benchmarkAddCat(conf Config, b *testing.B) {
b.SetBytes(conf.DataAmountBytes)
for n := 0; n < b.N; n++ {
if err := AddCatBytes(conf); err != nil {
b.Fatal(err)
}
}
}

var instant = Config{}.All_Instantaneous()

func BenchmarkInstantaneousAddCat1MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(1), b) }
func BenchmarkInstantaneousAddCat2MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(2), b) }
func BenchmarkInstantaneousAddCat4MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(4), b) }
func BenchmarkInstantaneousAddCat8MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(8), b) }
func BenchmarkInstantaneousAddCat16MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(16), b) }
func BenchmarkInstantaneousAddCat32MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(32), b) }
func BenchmarkInstantaneousAddCat64MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(64), b) }
func BenchmarkInstantaneousAddCat128MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(128), b) }
func BenchmarkInstantaneousAddCat256MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(256), b) }

var routing = Config{}.Routing_Slow()

func BenchmarkRoutingSlowAddCat1MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(1), b) }
func BenchmarkRoutingSlowAddCat2MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(2), b) }
func BenchmarkRoutingSlowAddCat4MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(4), b) }
func BenchmarkRoutingSlowAddCat8MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(8), b) }
func BenchmarkRoutingSlowAddCat16MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(16), b) }
func BenchmarkRoutingSlowAddCat32MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(32), b) }

var network = Config{}.Network_NYtoSF()

func BenchmarkNetworkSlowAddCat1MB(b *testing.B) { benchmarkAddCat(network.Megabytes(1), b) }
func BenchmarkNetworkSlowAddCat2MB(b *testing.B) { benchmarkAddCat(network.Megabytes(2), b) }
func BenchmarkNetworkSlowAddCat4MB(b *testing.B) { benchmarkAddCat(network.Megabytes(4), b) }
func BenchmarkNetworkSlowAddCat8MB(b *testing.B) { benchmarkAddCat(network.Megabytes(8), b) }
func BenchmarkNetworkSlowAddCat16MB(b *testing.B) { benchmarkAddCat(network.Megabytes(16), b) }
func BenchmarkNetworkSlowAddCat32MB(b *testing.B) { benchmarkAddCat(network.Megabytes(32), b) }
func BenchmarkNetworkSlowAddCat64MB(b *testing.B) { benchmarkAddCat(network.Megabytes(64), b) }
func BenchmarkNetworkSlowAddCat128MB(b *testing.B) { benchmarkAddCat(network.Megabytes(128), b) }
func BenchmarkNetworkSlowAddCat256MB(b *testing.B) { benchmarkAddCat(network.Megabytes(256), b) }

var blockstore = Config{}.Blockstore_7200RPM()

func BenchmarkBlockstoreSlowAddCat1MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(1), b) }
func BenchmarkBlockstoreSlowAddCat2MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(2), b) }
func BenchmarkBlockstoreSlowAddCat4MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(4), b) }
func BenchmarkBlockstoreSlowAddCat8MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(8), b) }
func BenchmarkBlockstoreSlowAddCat16MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(16), b) }
func BenchmarkBlockstoreSlowAddCat32MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(32), b) }
func BenchmarkBlockstoreSlowAddCat64MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(64), b) }
func BenchmarkBlockstoreSlowAddCat128MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(128), b) }
func BenchmarkBlockstoreSlowAddCat256MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(256), b) }
Loading

0 comments on commit 19894c4

Please sign in to comment.