Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AddCat Bitswap Integration Tests #444

Merged
merged 12 commits into from
Dec 13, 2014
1 change: 1 addition & 0 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

var ValueTypeMismatch = errors.New("The retrieved value is not a Block")

// Blockstore wraps a ThreadSafeDatastore
type Blockstore interface {
DeleteBlock(u.Key) error
Has(u.Key) (bool, error)
Expand Down
21 changes: 2 additions & 19 deletions blockservice/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
offline "github.com/jbenet/go-ipfs/exchange/offline"
"github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
)

Expand Down Expand Up @@ -63,23 +60,9 @@ func TestBlocks(t *testing.T) {
}

func TestGetBlocksSequential(t *testing.T) {
net := tn.VirtualNetwork()
rs := mock.VirtualRoutingServer()
sg := bitswap.NewSessionGenerator(net, rs)
var servs = Mocks(t, 4)
bg := blocksutil.NewBlockGenerator()

instances := sg.Instances(4)
blks := bg.Blocks(50)
// TODO: verify no duplicates

var servs []*BlockService
for _, i := range instances {
bserv, err := New(i.Blockstore, i.Exchange)
if err != nil {
t.Fatal(err)
}
servs = append(servs, bserv)
}

var keys []u.Key
for _, blk := range blks {
Expand All @@ -89,7 +72,7 @@ func TestGetBlocksSequential(t *testing.T) {

t.Log("one instance at a time, get blocks concurrently")

for i := 1; i < len(instances); i++ {
for i := 1; i < len(servs); i++ {
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
out := servs[i].GetBlocks(ctx, keys)
gotten := make(map[u.Key]*blocks.Block)
Expand Down
29 changes: 29 additions & 0 deletions blockservice/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package blockservice

import (
"testing"

bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
delay "github.com/jbenet/go-ipfs/util/delay"
)

// Mocks returns |n| connected mock Blockservices
func Mocks(t *testing.T, n int) []*BlockService {
net := tn.VirtualNetwork(delay.Fixed(0))
rs := mockrouting.NewServer()
sg := bitswap.NewSessionGenerator(net, rs)

instances := sg.Instances(n)

var servs []*BlockService
for _, i := range instances {
bserv, err := New(i.Blockstore(), i.Exchange)
if err != nil {
t.Fatal(err)
}
servs = append(servs, bserv)
}
return servs
}
6 changes: 3 additions & 3 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
pin "github.com/jbenet/go-ipfs/pin"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
"github.com/jbenet/go-ipfs/util/eventlog"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)

const IpnsValidatorTag = "ipns"
Expand All @@ -52,7 +52,7 @@ type IpfsNode struct {
Peerstore peer.Peerstore

// the local datastore
Datastore u.ThreadSafeDatastoreCloser
Datastore ds2.ThreadSafeDatastoreCloser

// the network message stream
Network inet.Network
Expand Down
9 changes: 5 additions & 4 deletions core/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (

config "github.com/jbenet/go-ipfs/config"
u "github.com/jbenet/go-ipfs/util"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
"github.com/jbenet/go-ipfs/util/debugerror"
)

func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
func makeDatastore(cfg config.Datastore) (ds2.ThreadSafeDatastoreCloser, error) {
if len(cfg.Type) == 0 {
return nil, debugerror.Errorf("config datastore.type required")
}
Expand All @@ -23,7 +24,7 @@ func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
return makeLevelDBDatastore(cfg)

case "memory":
return u.CloserWrap(syncds.MutexWrap(ds.NewMapDatastore())), nil
return ds2.CloserWrap(syncds.MutexWrap(ds.NewMapDatastore())), nil

case "fs":
log.Warning("using fs.Datastore at .datastore for testing.")
Expand All @@ -32,13 +33,13 @@ func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
return nil, err
}
ktd := ktds.Wrap(d, u.B58KeyConverter)
return u.CloserWrap(syncds.MutexWrap(ktd)), nil
return ds2.CloserWrap(syncds.MutexWrap(ktd)), nil
}

return nil, debugerror.Errorf("Unknown datastore type: %s", cfg.Type)
}

func makeLevelDBDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) {
func makeLevelDBDatastore(cfg config.Datastore) (ds2.ThreadSafeDatastoreCloser, error) {
if len(cfg.Path) == 0 {
return nil, debugerror.Errorf("config datastore.path required for leveldb")
}
Expand Down
6 changes: 3 additions & 3 deletions core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
path "github.com/jbenet/go-ipfs/path"
peer "github.com/jbenet/go-ipfs/peer"
mdht "github.com/jbenet/go-ipfs/routing/mock"
"github.com/jbenet/go-ipfs/util"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
)

// NewMockNode constructs an IpfsNode for use in tests.
Expand All @@ -39,10 +39,10 @@ func NewMockNode() (*IpfsNode, error) {

// Temp Datastore
dstore := ds.NewMapDatastore()
nd.Datastore = util.CloserWrap(syncds.MutexWrap(dstore))
nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore))

// Routing
dht := mdht.NewMockRouter(nd.Identity, nd.Datastore)
dht := mdht.NewServer().ClientWithDatastore(nd.Identity, nd.Datastore)
nd.Routing = dht

// Bitswap
Expand Down
161 changes: 161 additions & 0 deletions epictest/addcat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package epictest

import (
"bytes"
"fmt"
"io"
"os"
"testing"
"time"

blockservice "github.com/jbenet/go-ipfs/blockservice"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag"
path "github.com/jbenet/go-ipfs/path"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
uio "github.com/jbenet/go-ipfs/unixfs/io"
util "github.com/jbenet/go-ipfs/util"
errors "github.com/jbenet/go-ipfs/util/debugerror"
delay "github.com/jbenet/go-ipfs/util/delay"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random"
)

const kSeed = 1

func Test100MBInstantaneous(t *testing.T) {
t.Log("a sanity check")

t.Parallel()

conf := Config{
NetworkLatency: 0,
RoutingLatency: 0,
BlockstoreLatency: 0,
DataAmountBytes: 100 * 1024 * 1024,
}

AddCatBytes(conf)
}

func TestDegenerateSlowBlockstore(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{BlockstoreLatency: 50 * time.Millisecond}

if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
}

func TestDegenerateSlowNetwork(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{NetworkLatency: 400 * time.Millisecond}

if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
}

func TestDegenerateSlowRouting(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{RoutingLatency: 400 * time.Millisecond}

if err := AddCatPowers(conf, 128); err != nil {
t.Fatal(err)
}
}

func Test100MBMacbookCoastToCoast(t *testing.T) {
SkipUnlessEpic(t)
t.Parallel()

conf := Config{
DataAmountBytes: 100 * 1024 * 1024,
}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()

if err := AddCatBytes(conf); err != nil {
t.Fatal(err)
}
}

func AddCatPowers(conf Config, megabytesMax int64) error {
var i int64
for i = 1; i < megabytesMax; i = i * 2 {
fmt.Printf("%d MB\n", i)
conf.DataAmountBytes = i * 1024 * 1024
if err := AddCatBytes(conf); err != nil {
return err
}
}
return nil
}

func AddCatBytes(conf Config) error {

sessionGenerator := bitswap.NewSessionGenerator(
tn.VirtualNetwork(delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork
mockrouting.NewServerWithDelay(delay.Fixed(conf.RoutingLatency)),
)

adder := sessionGenerator.Next()
catter := sessionGenerator.Next()
catter.SetBlockstoreLatency(conf.BlockstoreLatency)

adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation
var data bytes.Buffer
random.WritePseudoRandomBytes(conf.DataAmountBytes, &data, kSeed) // FIXME get a lazy reader
keyAdded, err := add(adder, bytes.NewReader(data.Bytes()))
if err != nil {
return err
}
adder.SetBlockstoreLatency(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait

readerCatted, err := cat(catter, keyAdded)
if err != nil {
return err
}

// verify
var bufout bytes.Buffer
io.Copy(&bufout, readerCatted)
if 0 != bytes.Compare(bufout.Bytes(), data.Bytes()) {
return errors.New("catted data does not match added data")
}
return nil
}

func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) {
catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore(), catter.Exchange})
nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String())
if err != nil {
return nil, err
}
return uio.NewDagReader(nodeCatted, catterdag)
}

func add(adder bitswap.Instance, r io.Reader) (util.Key, error) {
nodeAdded, err := importer.BuildDagFromReader(
r,
merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore(), adder.Exchange}),
nil,
chunk.DefaultSplitter,
)
if err != nil {
return "", err
}
return nodeAdded.Key()
}

func SkipUnlessEpic(t *testing.T) {
if os.Getenv("IPFS_EPIC_TEST") == "" {
t.SkipNow()
}
}
57 changes: 57 additions & 0 deletions epictest/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package epictest

import "testing"

func benchmarkAddCat(conf Config, b *testing.B) {
b.SetBytes(conf.DataAmountBytes)
for n := 0; n < b.N; n++ {
if err := AddCatBytes(conf); err != nil {
b.Fatal(err)
}
}
}

var instant = Config{}.All_Instantaneous()

func BenchmarkInstantaneousAddCat1MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(1), b) }
func BenchmarkInstantaneousAddCat2MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(2), b) }
func BenchmarkInstantaneousAddCat4MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(4), b) }
func BenchmarkInstantaneousAddCat8MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(8), b) }
func BenchmarkInstantaneousAddCat16MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(16), b) }
func BenchmarkInstantaneousAddCat32MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(32), b) }
func BenchmarkInstantaneousAddCat64MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(64), b) }
func BenchmarkInstantaneousAddCat128MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(128), b) }
func BenchmarkInstantaneousAddCat256MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(256), b) }

var routing = Config{}.Routing_Slow()

func BenchmarkRoutingSlowAddCat1MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(1), b) }
func BenchmarkRoutingSlowAddCat2MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(2), b) }
func BenchmarkRoutingSlowAddCat4MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(4), b) }
func BenchmarkRoutingSlowAddCat8MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(8), b) }
func BenchmarkRoutingSlowAddCat16MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(16), b) }
func BenchmarkRoutingSlowAddCat32MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(32), b) }

var network = Config{}.Network_NYtoSF()

func BenchmarkNetworkSlowAddCat1MB(b *testing.B) { benchmarkAddCat(network.Megabytes(1), b) }
func BenchmarkNetworkSlowAddCat2MB(b *testing.B) { benchmarkAddCat(network.Megabytes(2), b) }
func BenchmarkNetworkSlowAddCat4MB(b *testing.B) { benchmarkAddCat(network.Megabytes(4), b) }
func BenchmarkNetworkSlowAddCat8MB(b *testing.B) { benchmarkAddCat(network.Megabytes(8), b) }
func BenchmarkNetworkSlowAddCat16MB(b *testing.B) { benchmarkAddCat(network.Megabytes(16), b) }
func BenchmarkNetworkSlowAddCat32MB(b *testing.B) { benchmarkAddCat(network.Megabytes(32), b) }
func BenchmarkNetworkSlowAddCat64MB(b *testing.B) { benchmarkAddCat(network.Megabytes(64), b) }
func BenchmarkNetworkSlowAddCat128MB(b *testing.B) { benchmarkAddCat(network.Megabytes(128), b) }
func BenchmarkNetworkSlowAddCat256MB(b *testing.B) { benchmarkAddCat(network.Megabytes(256), b) }

var blockstore = Config{}.Blockstore_7200RPM()

func BenchmarkBlockstoreSlowAddCat1MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(1), b) }
func BenchmarkBlockstoreSlowAddCat2MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(2), b) }
func BenchmarkBlockstoreSlowAddCat4MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(4), b) }
func BenchmarkBlockstoreSlowAddCat8MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(8), b) }
func BenchmarkBlockstoreSlowAddCat16MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(16), b) }
func BenchmarkBlockstoreSlowAddCat32MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(32), b) }
func BenchmarkBlockstoreSlowAddCat64MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(64), b) }
func BenchmarkBlockstoreSlowAddCat128MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(128), b) }
func BenchmarkBlockstoreSlowAddCat256MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(256), b) }
Loading