diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 3fe742ef8c5..d4849cb43bd 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -14,6 +14,7 @@ import ( var ValueTypeMismatch = errors.New("The retrieved value is not a Block") +// Blockstore wraps a ThreadSafeDatastore type Blockstore interface { DeleteBlock(u.Key) error Has(u.Key) (bool, error) diff --git a/blockservice/blocks_test.go b/blockservice/blocks_test.go index 2645b202401..e7966729f55 100644 --- a/blockservice/blocks_test.go +++ b/blockservice/blocks_test.go @@ -11,10 +11,7 @@ import ( blocks "github.com/jbenet/go-ipfs/blocks" blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" - bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" - tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" offline "github.com/jbenet/go-ipfs/exchange/offline" - "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" ) @@ -63,23 +60,9 @@ func TestBlocks(t *testing.T) { } func TestGetBlocksSequential(t *testing.T) { - net := tn.VirtualNetwork() - rs := mock.VirtualRoutingServer() - sg := bitswap.NewSessionGenerator(net, rs) + var servs = Mocks(t, 4) bg := blocksutil.NewBlockGenerator() - - instances := sg.Instances(4) blks := bg.Blocks(50) - // TODO: verify no duplicates - - var servs []*BlockService - for _, i := range instances { - bserv, err := New(i.Blockstore, i.Exchange) - if err != nil { - t.Fatal(err) - } - servs = append(servs, bserv) - } var keys []u.Key for _, blk := range blks { @@ -89,7 +72,7 @@ func TestGetBlocksSequential(t *testing.T) { t.Log("one instance at a time, get blocks concurrently") - for i := 1; i < len(instances); i++ { + for i := 1; i < len(servs); i++ { ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) out := servs[i].GetBlocks(ctx, keys) gotten := make(map[u.Key]*blocks.Block) diff --git a/blockservice/mock.go b/blockservice/mock.go new file mode 100644 index 00000000000..27751974624 --- /dev/null +++ b/blockservice/mock.go @@ -0,0 +1,29 @@ +package blockservice + +import ( + "testing" + + bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" + tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" + delay "github.com/jbenet/go-ipfs/util/delay" +) + +// Mocks returns |n| connected mock Blockservices +func Mocks(t *testing.T, n int) []*BlockService { + net := tn.VirtualNetwork(delay.Fixed(0)) + rs := mockrouting.NewServer() + sg := bitswap.NewSessionGenerator(net, rs) + + instances := sg.Instances(n) + + var servs []*BlockService + for _, i := range instances { + bserv, err := New(i.Blockstore(), i.Exchange) + if err != nil { + t.Fatal(err) + } + servs = append(servs, bserv) + } + return servs +} diff --git a/core/core.go b/core/core.go index 969811f237f..95712e6c125 100644 --- a/core/core.go +++ b/core/core.go @@ -28,10 +28,10 @@ import ( pin "github.com/jbenet/go-ipfs/pin" routing "github.com/jbenet/go-ipfs/routing" dht "github.com/jbenet/go-ipfs/routing/dht" - u "github.com/jbenet/go-ipfs/util" ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" + ds2 "github.com/jbenet/go-ipfs/util/datastore2" debugerror "github.com/jbenet/go-ipfs/util/debugerror" - "github.com/jbenet/go-ipfs/util/eventlog" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) const IpnsValidatorTag = "ipns" @@ -52,7 +52,7 @@ type IpfsNode struct { Peerstore peer.Peerstore // the local datastore - Datastore u.ThreadSafeDatastoreCloser + Datastore ds2.ThreadSafeDatastoreCloser // the network message stream Network inet.Network diff --git a/core/datastore.go b/core/datastore.go index 3bca8ec0e51..32e51e8e0fc 100644 --- a/core/datastore.go +++ b/core/datastore.go @@ -10,10 +10,11 @@ import ( config "github.com/jbenet/go-ipfs/config" u "github.com/jbenet/go-ipfs/util" + ds2 "github.com/jbenet/go-ipfs/util/datastore2" "github.com/jbenet/go-ipfs/util/debugerror" ) -func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) { +func makeDatastore(cfg config.Datastore) (ds2.ThreadSafeDatastoreCloser, error) { if len(cfg.Type) == 0 { return nil, debugerror.Errorf("config datastore.type required") } @@ -23,7 +24,7 @@ func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) { return makeLevelDBDatastore(cfg) case "memory": - return u.CloserWrap(syncds.MutexWrap(ds.NewMapDatastore())), nil + return ds2.CloserWrap(syncds.MutexWrap(ds.NewMapDatastore())), nil case "fs": log.Warning("using fs.Datastore at .datastore for testing.") @@ -32,13 +33,13 @@ func makeDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) { return nil, err } ktd := ktds.Wrap(d, u.B58KeyConverter) - return u.CloserWrap(syncds.MutexWrap(ktd)), nil + return ds2.CloserWrap(syncds.MutexWrap(ktd)), nil } return nil, debugerror.Errorf("Unknown datastore type: %s", cfg.Type) } -func makeLevelDBDatastore(cfg config.Datastore) (u.ThreadSafeDatastoreCloser, error) { +func makeLevelDBDatastore(cfg config.Datastore) (ds2.ThreadSafeDatastoreCloser, error) { if len(cfg.Path) == 0 { return nil, debugerror.Errorf("config datastore.path required for leveldb") } diff --git a/core/mock.go b/core/mock.go index 7f6faa0b40f..9c893a6dc19 100644 --- a/core/mock.go +++ b/core/mock.go @@ -12,7 +12,7 @@ import ( path "github.com/jbenet/go-ipfs/path" peer "github.com/jbenet/go-ipfs/peer" mdht "github.com/jbenet/go-ipfs/routing/mock" - "github.com/jbenet/go-ipfs/util" + ds2 "github.com/jbenet/go-ipfs/util/datastore2" ) // NewMockNode constructs an IpfsNode for use in tests. @@ -39,10 +39,10 @@ func NewMockNode() (*IpfsNode, error) { // Temp Datastore dstore := ds.NewMapDatastore() - nd.Datastore = util.CloserWrap(syncds.MutexWrap(dstore)) + nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore)) // Routing - dht := mdht.NewMockRouter(nd.Identity, nd.Datastore) + dht := mdht.NewServer().ClientWithDatastore(nd.Identity, nd.Datastore) nd.Routing = dht // Bitswap diff --git a/epictest/addcat_test.go b/epictest/addcat_test.go new file mode 100644 index 00000000000..6e7fc8177c9 --- /dev/null +++ b/epictest/addcat_test.go @@ -0,0 +1,161 @@ +package epictest + +import ( + "bytes" + "fmt" + "io" + "os" + "testing" + "time" + + blockservice "github.com/jbenet/go-ipfs/blockservice" + bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" + tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" + importer "github.com/jbenet/go-ipfs/importer" + chunk "github.com/jbenet/go-ipfs/importer/chunk" + merkledag "github.com/jbenet/go-ipfs/merkledag" + path "github.com/jbenet/go-ipfs/path" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" + uio "github.com/jbenet/go-ipfs/unixfs/io" + util "github.com/jbenet/go-ipfs/util" + errors "github.com/jbenet/go-ipfs/util/debugerror" + delay "github.com/jbenet/go-ipfs/util/delay" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random" +) + +const kSeed = 1 + +func Test100MBInstantaneous(t *testing.T) { + t.Log("a sanity check") + + t.Parallel() + + conf := Config{ + NetworkLatency: 0, + RoutingLatency: 0, + BlockstoreLatency: 0, + DataAmountBytes: 100 * 1024 * 1024, + } + + AddCatBytes(conf) +} + +func TestDegenerateSlowBlockstore(t *testing.T) { + SkipUnlessEpic(t) + t.Parallel() + + conf := Config{BlockstoreLatency: 50 * time.Millisecond} + + if err := AddCatPowers(conf, 128); err != nil { + t.Fatal(err) + } +} + +func TestDegenerateSlowNetwork(t *testing.T) { + SkipUnlessEpic(t) + t.Parallel() + + conf := Config{NetworkLatency: 400 * time.Millisecond} + + if err := AddCatPowers(conf, 128); err != nil { + t.Fatal(err) + } +} + +func TestDegenerateSlowRouting(t *testing.T) { + SkipUnlessEpic(t) + t.Parallel() + + conf := Config{RoutingLatency: 400 * time.Millisecond} + + if err := AddCatPowers(conf, 128); err != nil { + t.Fatal(err) + } +} + +func Test100MBMacbookCoastToCoast(t *testing.T) { + SkipUnlessEpic(t) + t.Parallel() + + conf := Config{ + DataAmountBytes: 100 * 1024 * 1024, + }.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow() + + if err := AddCatBytes(conf); err != nil { + t.Fatal(err) + } +} + +func AddCatPowers(conf Config, megabytesMax int64) error { + var i int64 + for i = 1; i < megabytesMax; i = i * 2 { + fmt.Printf("%d MB\n", i) + conf.DataAmountBytes = i * 1024 * 1024 + if err := AddCatBytes(conf); err != nil { + return err + } + } + return nil +} + +func AddCatBytes(conf Config) error { + + sessionGenerator := bitswap.NewSessionGenerator( + tn.VirtualNetwork(delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork + mockrouting.NewServerWithDelay(delay.Fixed(conf.RoutingLatency)), + ) + + adder := sessionGenerator.Next() + catter := sessionGenerator.Next() + catter.SetBlockstoreLatency(conf.BlockstoreLatency) + + adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation + var data bytes.Buffer + random.WritePseudoRandomBytes(conf.DataAmountBytes, &data, kSeed) // FIXME get a lazy reader + keyAdded, err := add(adder, bytes.NewReader(data.Bytes())) + if err != nil { + return err + } + adder.SetBlockstoreLatency(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait + + readerCatted, err := cat(catter, keyAdded) + if err != nil { + return err + } + + // verify + var bufout bytes.Buffer + io.Copy(&bufout, readerCatted) + if 0 != bytes.Compare(bufout.Bytes(), data.Bytes()) { + return errors.New("catted data does not match added data") + } + return nil +} + +func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) { + catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore(), catter.Exchange}) + nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String()) + if err != nil { + return nil, err + } + return uio.NewDagReader(nodeCatted, catterdag) +} + +func add(adder bitswap.Instance, r io.Reader) (util.Key, error) { + nodeAdded, err := importer.BuildDagFromReader( + r, + merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore(), adder.Exchange}), + nil, + chunk.DefaultSplitter, + ) + if err != nil { + return "", err + } + return nodeAdded.Key() +} + +func SkipUnlessEpic(t *testing.T) { + if os.Getenv("IPFS_EPIC_TEST") == "" { + t.SkipNow() + } +} diff --git a/epictest/bench_test.go b/epictest/bench_test.go new file mode 100644 index 00000000000..8821830f52b --- /dev/null +++ b/epictest/bench_test.go @@ -0,0 +1,57 @@ +package epictest + +import "testing" + +func benchmarkAddCat(conf Config, b *testing.B) { + b.SetBytes(conf.DataAmountBytes) + for n := 0; n < b.N; n++ { + if err := AddCatBytes(conf); err != nil { + b.Fatal(err) + } + } +} + +var instant = Config{}.All_Instantaneous() + +func BenchmarkInstantaneousAddCat1MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(1), b) } +func BenchmarkInstantaneousAddCat2MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(2), b) } +func BenchmarkInstantaneousAddCat4MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(4), b) } +func BenchmarkInstantaneousAddCat8MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(8), b) } +func BenchmarkInstantaneousAddCat16MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(16), b) } +func BenchmarkInstantaneousAddCat32MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(32), b) } +func BenchmarkInstantaneousAddCat64MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(64), b) } +func BenchmarkInstantaneousAddCat128MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(128), b) } +func BenchmarkInstantaneousAddCat256MB(b *testing.B) { benchmarkAddCat(instant.Megabytes(256), b) } + +var routing = Config{}.Routing_Slow() + +func BenchmarkRoutingSlowAddCat1MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(1), b) } +func BenchmarkRoutingSlowAddCat2MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(2), b) } +func BenchmarkRoutingSlowAddCat4MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(4), b) } +func BenchmarkRoutingSlowAddCat8MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(8), b) } +func BenchmarkRoutingSlowAddCat16MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(16), b) } +func BenchmarkRoutingSlowAddCat32MB(b *testing.B) { benchmarkAddCat(routing.Megabytes(32), b) } + +var network = Config{}.Network_NYtoSF() + +func BenchmarkNetworkSlowAddCat1MB(b *testing.B) { benchmarkAddCat(network.Megabytes(1), b) } +func BenchmarkNetworkSlowAddCat2MB(b *testing.B) { benchmarkAddCat(network.Megabytes(2), b) } +func BenchmarkNetworkSlowAddCat4MB(b *testing.B) { benchmarkAddCat(network.Megabytes(4), b) } +func BenchmarkNetworkSlowAddCat8MB(b *testing.B) { benchmarkAddCat(network.Megabytes(8), b) } +func BenchmarkNetworkSlowAddCat16MB(b *testing.B) { benchmarkAddCat(network.Megabytes(16), b) } +func BenchmarkNetworkSlowAddCat32MB(b *testing.B) { benchmarkAddCat(network.Megabytes(32), b) } +func BenchmarkNetworkSlowAddCat64MB(b *testing.B) { benchmarkAddCat(network.Megabytes(64), b) } +func BenchmarkNetworkSlowAddCat128MB(b *testing.B) { benchmarkAddCat(network.Megabytes(128), b) } +func BenchmarkNetworkSlowAddCat256MB(b *testing.B) { benchmarkAddCat(network.Megabytes(256), b) } + +var blockstore = Config{}.Blockstore_7200RPM() + +func BenchmarkBlockstoreSlowAddCat1MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(1), b) } +func BenchmarkBlockstoreSlowAddCat2MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(2), b) } +func BenchmarkBlockstoreSlowAddCat4MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(4), b) } +func BenchmarkBlockstoreSlowAddCat8MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(8), b) } +func BenchmarkBlockstoreSlowAddCat16MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(16), b) } +func BenchmarkBlockstoreSlowAddCat32MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(32), b) } +func BenchmarkBlockstoreSlowAddCat64MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(64), b) } +func BenchmarkBlockstoreSlowAddCat128MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(128), b) } +func BenchmarkBlockstoreSlowAddCat256MB(b *testing.B) { benchmarkAddCat(blockstore.Megabytes(256), b) } diff --git a/epictest/test_config.go b/epictest/test_config.go new file mode 100644 index 00000000000..7c31727b582 --- /dev/null +++ b/epictest/test_config.go @@ -0,0 +1,55 @@ +package epictest + +import "time" + +type Config struct { + BlockstoreLatency time.Duration + NetworkLatency time.Duration + RoutingLatency time.Duration + DataAmountBytes int64 +} + +func (c Config) All_Instantaneous() Config { + // Could use a zero value but whatever. Consistency of interface + c.NetworkLatency = 0 + c.RoutingLatency = 0 + c.BlockstoreLatency = 0 + return c +} + +func (c Config) Network_NYtoSF() Config { + c.NetworkLatency = 20 * time.Millisecond + return c +} + +func (c Config) Network_IntraDatacenter2014() Config { + c.NetworkLatency = 250 * time.Microsecond + return c +} + +func (c Config) Blockstore_FastSSD2014() Config { + const iops = 100000 + c.BlockstoreLatency = (1 / iops) * time.Second + return c +} + +func (c Config) Blockstore_SlowSSD2014() Config { + c.BlockstoreLatency = 150 * time.Microsecond + return c +} + +func (c Config) Blockstore_7200RPM() Config { + c.BlockstoreLatency = 8 * time.Millisecond + return c +} + +func (c Config) Routing_Slow() Config { + c.BlockstoreLatency = 200 * time.Millisecond + return c +} + +// Megabytes is a convenience method to set DataAmountBytes +func (c Config) Megabytes(mb int64) Config { + c.DataAmountBytes = mb * 1024 * 1024 + return c +} diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 4d0b5e59d1d..d58ff596a1e 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -10,15 +10,20 @@ import ( blocks "github.com/jbenet/go-ipfs/blocks" blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" - mock "github.com/jbenet/go-ipfs/routing/mock" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" + delay "github.com/jbenet/go-ipfs/util/delay" testutil "github.com/jbenet/go-ipfs/util/testutil" ) +// FIXME the tests are really sensitive to the network delay. fix them to work +// well under varying conditions +const kNetworkDelay = 0 * time.Millisecond + func TestClose(t *testing.T) { // TODO t.Skip("TODO Bitswap's Close implementation is a WIP") - vnet := tn.VirtualNetwork() - rout := mock.VirtualRoutingServer() + vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + rout := mockrouting.NewServer() sesgen := NewSessionGenerator(vnet, rout) bgen := blocksutil.NewBlockGenerator() @@ -31,8 +36,8 @@ func TestClose(t *testing.T) { func TestGetBlockTimeout(t *testing.T) { - net := tn.VirtualNetwork() - rs := mock.VirtualRoutingServer() + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + rs := mockrouting.NewServer() g := NewSessionGenerator(net, rs) self := g.Next() @@ -48,12 +53,12 @@ func TestGetBlockTimeout(t *testing.T) { func TestProviderForKeyButNetworkCannotFind(t *testing.T) { - net := tn.VirtualNetwork() - rs := mock.VirtualRoutingServer() + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + rs := mockrouting.NewServer() g := NewSessionGenerator(net, rs) block := blocks.NewBlock([]byte("block")) - rs.Announce(testutil.NewPeerWithIDString("testing"), block.Key()) // but not on network + rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network solo := g.Next() @@ -69,14 +74,14 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { - net := tn.VirtualNetwork() - rs := mock.VirtualRoutingServer() + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + rs := mockrouting.NewServer() block := blocks.NewBlock([]byte("block")) g := NewSessionGenerator(net, rs) hasBlock := g.Next() - if err := hasBlock.Blockstore.Put(block); err != nil { + if err := hasBlock.Blockstore().Put(block); err != nil { t.Fatal(err) } if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { @@ -121,8 +126,8 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { if testing.Short() { t.SkipNow() } - net := tn.VirtualNetwork() - rs := mock.VirtualRoutingServer() + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + rs := mockrouting.NewServer() sg := NewSessionGenerator(net, rs) bg := blocksutil.NewBlockGenerator() @@ -135,9 +140,9 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { first := instances[0] for _, b := range blocks { - first.Blockstore.Put(b) + first.Blockstore().Put(b) first.Exchange.HasBlock(context.Background(), b) - rs.Announce(first.Peer, b.Key()) + rs.Client(first.Peer).Provide(context.Background(), b.Key()) } t.Log("Distribute!") @@ -158,7 +163,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { for _, inst := range instances { for _, b := range blocks { - if _, err := inst.Blockstore.Get(b.Key()); err != nil { + if _, err := inst.Blockstore().Get(b.Key()); err != nil { t.Fatal(err) } } @@ -166,7 +171,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) { - if _, err := bitswap.Blockstore.Get(b.Key()); err != nil { + if _, err := bitswap.Blockstore().Get(b.Key()); err != nil { _, err := bitswap.Exchange.GetBlock(context.Background(), b.Key()) if err != nil { t.Fatal(err) @@ -181,8 +186,8 @@ func TestSendToWantingPeer(t *testing.T) { t.SkipNow() } - net := tn.VirtualNetwork() - rs := mock.VirtualRoutingServer() + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + rs := mockrouting.NewServer() sg := NewSessionGenerator(net, rs) bg := blocksutil.NewBlockGenerator() @@ -208,7 +213,7 @@ func TestSendToWantingPeer(t *testing.T) { beta := bg.Next() t.Logf("Peer %v announes availability of %v\n", w.Peer, beta.Key()) ctx, _ = context.WithTimeout(context.Background(), timeout) - if err := w.Blockstore.Put(beta); err != nil { + if err := w.Blockstore().Put(beta); err != nil { t.Fatal(err) } w.Exchange.HasBlock(ctx, beta) @@ -221,7 +226,7 @@ func TestSendToWantingPeer(t *testing.T) { t.Logf("%v announces availability of %v\n", o.Peer, alpha.Key()) ctx, _ = context.WithTimeout(context.Background(), timeout) - if err := o.Blockstore.Put(alpha); err != nil { + if err := o.Blockstore().Put(alpha); err != nil { t.Fatal(err) } o.Exchange.HasBlock(ctx, alpha) @@ -233,7 +238,7 @@ func TestSendToWantingPeer(t *testing.T) { } t.Logf("%v should now have %v\n", w.Peer, alpha.Key()) - block, err := w.Blockstore.Get(alpha.Key()) + block, err := w.Blockstore().Get(alpha.Key()) if err != nil { t.Fatalf("Should not have received an error: %s", err) } diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go index 3993eba0573..fe7414caa73 100644 --- a/exchange/bitswap/strategy/strategy.go +++ b/exchange/bitswap/strategy/strategy.go @@ -71,7 +71,6 @@ func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool { // Dont resend blocks within a certain time period t, ok := ledger.sentToPeer[k] if ok && t.Add(resendTimeoutPeriod).After(time.Now()) { - log.Error("Prevented block resend!") return false } diff --git a/exchange/bitswap/testnet/network.go b/exchange/bitswap/testnet/network.go index 691b7cb4259..b8f61b41346 100644 --- a/exchange/bitswap/testnet/network.go +++ b/exchange/bitswap/testnet/network.go @@ -10,6 +10,7 @@ import ( bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" peer "github.com/jbenet/go-ipfs/peer" "github.com/jbenet/go-ipfs/util" + delay "github.com/jbenet/go-ipfs/util/delay" ) type Network interface { @@ -33,14 +34,16 @@ type Network interface { // network impl -func VirtualNetwork() Network { +func VirtualNetwork(d delay.D) Network { return &network{ clients: make(map[util.Key]bsnet.Receiver), + delay: d, } } type network struct { clients map[util.Key]bsnet.Receiver + delay delay.D } func (n *network) Adapter(p peer.Peer) bsnet.BitSwapNetwork { @@ -84,13 +87,15 @@ func (n *network) deliver( return errors.New("Invalid input") } + n.delay.Wait() + nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message) if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) { return errors.New("Malformed client request") } - if nextPeer == nil && nextMsg == nil { + if nextPeer == nil && nextMsg == nil { // no response to send return nil } @@ -102,8 +107,6 @@ func (n *network) deliver( return nil } -var NoResponse = errors.New("No response received from the receiver") - // TODO func (n *network) SendRequest( ctx context.Context, diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index 0bfb0cb1ee4..7a9f48e2d23 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -9,11 +9,12 @@ import ( bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" peer "github.com/jbenet/go-ipfs/peer" + delay "github.com/jbenet/go-ipfs/util/delay" testutil "github.com/jbenet/go-ipfs/util/testutil" ) func TestSendRequestToCooperativePeer(t *testing.T) { - net := VirtualNetwork() + net := VirtualNetwork(delay.Fixed(0)) idOfRecipient := []byte("recipient") @@ -60,7 +61,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { } func TestSendMessageAsyncButWaitForResponse(t *testing.T) { - net := VirtualNetwork() + net := VirtualNetwork(delay.Fixed(0)) idOfResponder := []byte("responder") waiter := net.Adapter(testutil.NewPeerWithIDString("waiter")) responder := net.Adapter(testutil.NewPeerWithID(idOfResponder)) diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 7f8ef85469d..9e9b80230f1 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -1,18 +1,22 @@ package bitswap import ( - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + "time" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - "github.com/jbenet/go-ipfs/blocks/blockstore" - "github.com/jbenet/go-ipfs/exchange" + blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" + exchange "github.com/jbenet/go-ipfs/exchange" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" - "github.com/jbenet/go-ipfs/peer" - "github.com/jbenet/go-ipfs/routing/mock" + peer "github.com/jbenet/go-ipfs/peer" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" + datastore2 "github.com/jbenet/go-ipfs/util/datastore2" + delay "github.com/jbenet/go-ipfs/util/delay" ) func NewSessionGenerator( - net tn.Network, rs mock.RoutingServer) SessionGenerator { + net tn.Network, rs mockrouting.Server) SessionGenerator { return SessionGenerator{ net: net, rs: rs, @@ -24,7 +28,7 @@ func NewSessionGenerator( type SessionGenerator struct { seq int net tn.Network - rs mock.RoutingServer + rs mockrouting.Server ps peer.Peerstore } @@ -45,7 +49,17 @@ func (g *SessionGenerator) Instances(n int) []Instance { type Instance struct { Peer peer.Peer Exchange exchange.Interface - Blockstore blockstore.Blockstore + blockstore blockstore.Blockstore + + blockstoreDelay delay.D +} + +func (i *Instance) Blockstore() blockstore.Blockstore { + return i.blockstore +} + +func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { + return i.blockstoreDelay.Set(t) } // session creates a test bitswap session. @@ -53,12 +67,19 @@ type Instance struct { // NB: It's easy make mistakes by providing the same peer ID to two different // sessions. To safeguard, use the SessionGenerator to generate sessions. It's // just a much better idea. -func session(net tn.Network, rs mock.RoutingServer, ps peer.Peerstore, id peer.ID) Instance { +func session(net tn.Network, rs mockrouting.Server, ps peer.Peerstore, id peer.ID) Instance { p := ps.WithID(id) adapter := net.Adapter(p) htc := rs.Client(p) - bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + bsdelay := delay.Fixed(0) + const kWriteCacheElems = 100 + bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))), kWriteCacheElems) + if err != nil { + // FIXME perhaps change signature and return error. + panic(err.Error()) + } const alwaysSendToPeer = true ctx := context.TODO() @@ -66,8 +87,9 @@ func session(net tn.Network, rs mock.RoutingServer, ps peer.Peerstore, id peer.I bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer) return Instance{ - Peer: p, - Exchange: bs, - Blockstore: bstore, + Peer: p, + Exchange: bs, + blockstore: bstore, + blockstoreDelay: bsdelay, } } diff --git a/importer/importer_test.go b/importer/importer_test.go index fca8e3d61c9..84607229633 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -9,10 +9,10 @@ import ( "os" "testing" - "github.com/jbenet/go-ipfs/importer/chunk" + chunk "github.com/jbenet/go-ipfs/importer/chunk" + merkledag "github.com/jbenet/go-ipfs/merkledag" uio "github.com/jbenet/go-ipfs/unixfs/io" u "github.com/jbenet/go-ipfs/util" - testutil "github.com/jbenet/go-ipfs/util/testutil" ) // NOTE: @@ -91,7 +91,7 @@ func TestBuilderConsistency(t *testing.T) { buf := new(bytes.Buffer) io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes)) should := dup(buf.Bytes()) - dagserv := testutil.GetDAGServ(t) + dagserv := merkledag.Mock(t) nd, err := BuildDagFromReader(buf, dagserv, nil, chunk.DefaultSplitter) if err != nil { t.Fatal(err) diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 0f628e6c102..b5f170c241c 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -7,13 +7,10 @@ import ( "io/ioutil" "testing" - bserv "github.com/jbenet/go-ipfs/blockservice" - bs "github.com/jbenet/go-ipfs/exchange/bitswap" - tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" + blockservice "github.com/jbenet/go-ipfs/blockservice" imp "github.com/jbenet/go-ipfs/importer" chunk "github.com/jbenet/go-ipfs/importer/chunk" . "github.com/jbenet/go-ipfs/merkledag" - "github.com/jbenet/go-ipfs/routing/mock" uio "github.com/jbenet/go-ipfs/unixfs/io" u "github.com/jbenet/go-ipfs/util" ) @@ -79,20 +76,8 @@ func makeTestDag(t *testing.T) *Node { } func TestBatchFetch(t *testing.T) { - net := tn.VirtualNetwork() - rs := mock.VirtualRoutingServer() - sg := bs.NewSessionGenerator(net, rs) - - instances := sg.Instances(5) - - var servs []*bserv.BlockService var dagservs []DAGService - for _, i := range instances { - bsi, err := bserv.New(i.Blockstore, i.Exchange) - if err != nil { - t.Fatal(err) - } - servs = append(servs, bsi) + for _, bsi := range blockservice.Mocks(t, 5) { dagservs = append(dagservs, NewDAGService(bsi)) } t.Log("finished setup.") diff --git a/merkledag/mock.go b/merkledag/mock.go new file mode 100644 index 00000000000..ea3737f5807 --- /dev/null +++ b/merkledag/mock.go @@ -0,0 +1,20 @@ +package merkledag + +import ( + "testing" + + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/blocks/blockstore" + bsrv "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/exchange/offline" +) + +func Mock(t testing.TB) DAGService { + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + bserv, err := bsrv.New(bstore, offline.Exchange(bstore)) + if err != nil { + t.Fatal(err) + } + return NewDAGService(bserv) +} diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index eef5e68258c..1d487f9a77d 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -3,17 +3,15 @@ package namesys import ( "testing" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ci "github.com/jbenet/go-ipfs/crypto" - mock "github.com/jbenet/go-ipfs/routing/mock" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" testutil "github.com/jbenet/go-ipfs/util/testutil" ) func TestRoutingResolve(t *testing.T) { local := testutil.NewPeerWithIDString("testID") - lds := ds.NewMapDatastore() - d := mock.NewMockRouter(local, lds) + d := mockrouting.NewServer().Client(local) resolver := NewRoutingResolver(d) publisher := NewRoutingPublisher(d) diff --git a/routing/mock/client.go b/routing/mock/client.go new file mode 100644 index 00000000000..f4702aae66c --- /dev/null +++ b/routing/mock/client.go @@ -0,0 +1,74 @@ +package mockrouting + +import ( + "errors" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + peer "github.com/jbenet/go-ipfs/peer" + routing "github.com/jbenet/go-ipfs/routing" + u "github.com/jbenet/go-ipfs/util" +) + +var log = u.Logger("mockrouter") + +type client struct { + datastore ds.Datastore + server server + peer peer.Peer +} + +// FIXME(brian): is this method meant to simulate putting a value into the network? +func (c *client) PutValue(ctx context.Context, key u.Key, val []byte) error { + log.Debugf("PutValue: %s", key) + return c.datastore.Put(key.DsKey(), val) +} + +// FIXME(brian): is this method meant to simulate getting a value from the network? +func (c *client) GetValue(ctx context.Context, key u.Key) ([]byte, error) { + log.Debugf("GetValue: %s", key) + v, err := c.datastore.Get(key.DsKey()) + if err != nil { + return nil, err + } + + data, ok := v.([]byte) + if !ok { + return nil, errors.New("could not cast value from datastore") + } + + return data, nil +} + +func (c *client) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) { + return c.server.Providers(key), nil +} + +func (c *client) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) { + log.Debugf("FindPeer: %s", pid) + return nil, nil +} + +func (c *client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer { + out := make(chan peer.Peer) + go func() { + defer close(out) + for i, p := range c.server.Providers(k) { + if max <= i { + return + } + select { + case out <- p: + case <-ctx.Done(): + return + } + } + }() + return out +} + +func (c *client) Provide(_ context.Context, key u.Key) error { + return c.server.Announce(c.peer, key) +} + +var _ routing.IpfsRouting = &client{} diff --git a/routing/mock/interface.go b/routing/mock/interface.go new file mode 100644 index 00000000000..e84a9ba5a13 --- /dev/null +++ b/routing/mock/interface.go @@ -0,0 +1,40 @@ +// Package mock provides a virtual routing server. To use it, create a virtual +// routing server and use the Client() method to get a routing client +// (IpfsRouting). The server quacks like a DHT but is really a local in-memory +// hash table. +package mockrouting + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + peer "github.com/jbenet/go-ipfs/peer" + routing "github.com/jbenet/go-ipfs/routing" + u "github.com/jbenet/go-ipfs/util" + delay "github.com/jbenet/go-ipfs/util/delay" +) + +// Server provides mockrouting Clients +type Server interface { + Client(p peer.Peer) Client + ClientWithDatastore(peer.Peer, ds.Datastore) Client +} + +// Client implements IpfsRouting +type Client interface { + FindProviders(context.Context, u.Key) ([]peer.Peer, error) + + routing.IpfsRouting +} + +// NewServer returns a mockrouting Server +func NewServer() Server { + return NewServerWithDelay(delay.Fixed(0)) +} + +// NewServerWithDelay returns a mockrouting Server with a delay! +func NewServerWithDelay(d delay.D) Server { + return &s{ + providers: make(map[u.Key]peer.Map), + delay: d, + } +} diff --git a/routing/mock/routing_test.go b/routing/mock/mockrouting_test.go similarity index 74% rename from routing/mock/routing_test.go rename to routing/mock/mockrouting_test.go index 536d7b01891..3f9bfab6c1f 100644 --- a/routing/mock/routing_test.go +++ b/routing/mock/mockrouting_test.go @@ -1,4 +1,4 @@ -package mock +package mockrouting import ( "bytes" @@ -12,37 +12,21 @@ import ( func TestKeyNotFound(t *testing.T) { - vrs := VirtualRoutingServer() - empty := vrs.Providers(u.Key("not there")) - if len(empty) != 0 { - t.Fatal("should be empty") - } -} + var peer = testutil.NewPeerWithID(peer.ID([]byte("the peer id"))) + var key = u.Key("mock key") + var ctx = context.Background() -func TestSetAndGet(t *testing.T) { - pid := peer.ID([]byte("the peer id")) - p := testutil.NewPeerWithID(pid) - k := u.Key("42") - rs := VirtualRoutingServer() - err := rs.Announce(p, k) - if err != nil { - t.Fatal(err) - } - providers := rs.Providers(k) - if len(providers) != 1 { - t.Fatal("should be one") + rs := NewServer() + providers := rs.Client(peer).FindProvidersAsync(ctx, key, 10) + _, ok := <-providers + if ok { + t.Fatal("should be closed") } - for _, elem := range providers { - if bytes.Equal(elem.ID(), pid) { - return - } - } - t.Fatal("ID should have matched") } func TestClientFindProviders(t *testing.T) { peer := testutil.NewPeerWithIDString("42") - rs := VirtualRoutingServer() + rs := NewServer() client := rs.Client(peer) k := u.Key("hello") @@ -52,7 +36,10 @@ func TestClientFindProviders(t *testing.T) { } max := 100 - providersFromHashTable := rs.Providers(k) + providersFromHashTable, err := rs.Client(peer).FindProviders(context.Background(), k) + if err != nil { + t.Fatal(err) + } isInHT := false for _, p := range providersFromHashTable { @@ -76,21 +63,16 @@ func TestClientFindProviders(t *testing.T) { } func TestClientOverMax(t *testing.T) { - rs := VirtualRoutingServer() + rs := NewServer() k := u.Key("hello") numProvidersForHelloKey := 100 for i := 0; i < numProvidersForHelloKey; i++ { peer := testutil.NewPeerWithIDString(string(i)) - err := rs.Announce(peer, k) + err := rs.Client(peer).Provide(context.Background(), k) if err != nil { t.Fatal(err) } } - providersFromHashTable := rs.Providers(k) - if len(providersFromHashTable) != numProvidersForHelloKey { - t.Log(1 == len(providersFromHashTable)) - t.Fatal("not all providers were returned") - } max := 10 peer := testutil.NewPeerWithIDString("TODO") @@ -108,7 +90,7 @@ func TestClientOverMax(t *testing.T) { // TODO does dht ensure won't receive self as a provider? probably not. func TestCanceledContext(t *testing.T) { - rs := VirtualRoutingServer() + rs := NewServer() k := u.Key("hello") t.Log("async'ly announce infinite stream of providers for key") @@ -116,7 +98,7 @@ func TestCanceledContext(t *testing.T) { go func() { // infinite stream for { peer := testutil.NewPeerWithIDString(string(i)) - err := rs.Announce(peer, k) + err := rs.Client(peer).Provide(context.Background(), k) if err != nil { t.Fatal(err) } diff --git a/routing/mock/routing.go b/routing/mock/routing.go deleted file mode 100644 index ff83ddca310..00000000000 --- a/routing/mock/routing.go +++ /dev/null @@ -1,144 +0,0 @@ -package mock - -import ( - "errors" - "math/rand" - "sync" - - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - peer "github.com/jbenet/go-ipfs/peer" - routing "github.com/jbenet/go-ipfs/routing" - u "github.com/jbenet/go-ipfs/util" -) - -var log = u.Logger("mockrouter") - -var _ routing.IpfsRouting = &MockRouter{} - -type MockRouter struct { - datastore ds.Datastore - hashTable RoutingServer - peer peer.Peer -} - -func NewMockRouter(local peer.Peer, dstore ds.Datastore) routing.IpfsRouting { - return &MockRouter{ - datastore: dstore, - peer: local, - hashTable: VirtualRoutingServer(), - } -} - -func (mr *MockRouter) SetRoutingServer(rs RoutingServer) { - mr.hashTable = rs -} - -func (mr *MockRouter) PutValue(ctx context.Context, key u.Key, val []byte) error { - log.Debugf("PutValue: %s", key) - return mr.datastore.Put(key.DsKey(), val) -} - -func (mr *MockRouter) GetValue(ctx context.Context, key u.Key) ([]byte, error) { - log.Debugf("GetValue: %s", key) - v, err := mr.datastore.Get(key.DsKey()) - if err != nil { - return nil, err - } - - data, ok := v.([]byte) - if !ok { - return nil, errors.New("could not cast value from datastore") - } - - return data, nil -} - -func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) { - return nil, nil -} - -func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) { - log.Debugf("FindPeer: %s", pid) - return nil, nil -} - -func (mr *MockRouter) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer { - out := make(chan peer.Peer) - go func() { - defer close(out) - for i, p := range mr.hashTable.Providers(k) { - if max <= i { - return - } - select { - case out <- p: - case <-ctx.Done(): - return - } - } - }() - return out -} - -func (mr *MockRouter) Provide(_ context.Context, key u.Key) error { - return mr.hashTable.Announce(mr.peer, key) -} - -type RoutingServer interface { - Announce(peer.Peer, u.Key) error - - Providers(u.Key) []peer.Peer - - Client(p peer.Peer) routing.IpfsRouting -} - -func VirtualRoutingServer() RoutingServer { - return &hashTable{ - providers: make(map[u.Key]peer.Map), - } -} - -type hashTable struct { - lock sync.RWMutex - providers map[u.Key]peer.Map -} - -func (rs *hashTable) Announce(p peer.Peer, k u.Key) error { - rs.lock.Lock() - defer rs.lock.Unlock() - - _, ok := rs.providers[k] - if !ok { - rs.providers[k] = make(peer.Map) - } - rs.providers[k][p.Key()] = p - return nil -} - -func (rs *hashTable) Providers(k u.Key) []peer.Peer { - rs.lock.RLock() - defer rs.lock.RUnlock() - ret := make([]peer.Peer, 0) - peerset, ok := rs.providers[k] - if !ok { - return ret - } - for _, peer := range peerset { - ret = append(ret, peer) - } - - for i := range ret { - j := rand.Intn(i + 1) - ret[i], ret[j] = ret[j], ret[i] - } - - return ret -} - -func (rs *hashTable) Client(p peer.Peer) routing.IpfsRouting { - return &MockRouter{ - peer: p, - hashTable: rs, - } -} diff --git a/routing/mock/server.go b/routing/mock/server.go new file mode 100644 index 00000000000..3e189d95447 --- /dev/null +++ b/routing/mock/server.go @@ -0,0 +1,76 @@ +package mockrouting + +import ( + "math/rand" + "sync" + + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" + delay "github.com/jbenet/go-ipfs/util/delay" +) + +// server is the mockrouting.Client's private interface to the routing server +type server interface { + Announce(peer.Peer, u.Key) error + Providers(u.Key) []peer.Peer + + Server +} + +// s is an implementation of the private server interface +type s struct { + delay delay.D + + lock sync.RWMutex + providers map[u.Key]peer.Map +} + +func (rs *s) Announce(p peer.Peer, k u.Key) error { + rs.delay.Wait() // before locking + + rs.lock.Lock() + defer rs.lock.Unlock() + + _, ok := rs.providers[k] + if !ok { + rs.providers[k] = make(peer.Map) + } + rs.providers[k][p.Key()] = p + return nil +} + +func (rs *s) Providers(k u.Key) []peer.Peer { + rs.delay.Wait() // before locking + + rs.lock.RLock() + defer rs.lock.RUnlock() + + var ret []peer.Peer + peerset, ok := rs.providers[k] + if !ok { + return ret + } + for _, peer := range peerset { + ret = append(ret, peer) + } + + for i := range ret { + j := rand.Intn(i + 1) + ret[i], ret[j] = ret[j], ret[i] + } + + return ret +} + +func (rs *s) Client(p peer.Peer) Client { + return rs.ClientWithDatastore(p, ds.NewMapDatastore()) +} + +func (rs *s) ClientWithDatastore(p peer.Peer, datastore ds.Datastore) Client { + return &client{ + peer: p, + datastore: ds.NewMapDatastore(), + server: rs, + } +} diff --git a/util/datastore_closer.go b/util/datastore2/datastore_closer.go similarity index 95% rename from util/datastore_closer.go rename to util/datastore2/datastore_closer.go index 7330ddb2dec..40bd44e4358 100644 --- a/util/datastore_closer.go +++ b/util/datastore2/datastore_closer.go @@ -1,4 +1,4 @@ -package util +package datastore2 import ( "io" diff --git a/util/datastore2/delayed.go b/util/datastore2/delayed.go new file mode 100644 index 00000000000..b8670b2523d --- /dev/null +++ b/util/datastore2/delayed.go @@ -0,0 +1,42 @@ +package datastore2 + +import ( + datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + delay "github.com/jbenet/go-ipfs/util/delay" +) + +func WithDelay(ds datastore.Datastore, delay delay.D) datastore.Datastore { + return &delayed{ds: ds, delay: delay} +} + +type delayed struct { + ds datastore.Datastore + delay delay.D +} + +func (dds *delayed) Put(key datastore.Key, value interface{}) (err error) { + dds.delay.Wait() + return dds.ds.Put(key, value) +} + +func (dds *delayed) Get(key datastore.Key) (value interface{}, err error) { + dds.delay.Wait() + return dds.ds.Get(key) +} + +func (dds *delayed) Has(key datastore.Key) (exists bool, err error) { + dds.delay.Wait() + return dds.ds.Has(key) +} + +func (dds *delayed) Delete(key datastore.Key) (err error) { + dds.delay.Wait() + return dds.ds.Delete(key) +} + +func (dds *delayed) KeyList() ([]datastore.Key, error) { + dds.delay.Wait() + return dds.ds.KeyList() +} + +var _ datastore.Datastore = &delayed{} diff --git a/util/delay/delay.go b/util/delay/delay.go new file mode 100644 index 00000000000..e7fb28091b4 --- /dev/null +++ b/util/delay/delay.go @@ -0,0 +1,39 @@ +package delay + +import ( + "sync" + "time" +) + +// Delay makes it easy to add (threadsafe) configurable delays to other +// objects. +type D interface { + Set(time.Duration) time.Duration + Wait() +} + +// Fixed returns a delay with fixed latency +func Fixed(t time.Duration) D { + return &delay{t: t} +} + +type delay struct { + l sync.RWMutex + t time.Duration +} + +// TODO func Variable(time.Duration) D returns a delay with probablistic latency + +func (d *delay) Set(t time.Duration) time.Duration { + d.l.Lock() + defer d.l.Unlock() + prev := d.t + d.t = t + return prev +} + +func (d *delay) Wait() { + d.l.RLock() + defer d.l.RUnlock() + time.Sleep(d.t) +} diff --git a/util/testutil/gen.go b/util/testutil/gen.go index b652d40d3be..542aaa87580 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -2,28 +2,10 @@ package testutil import ( crand "crypto/rand" - "testing" - - dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - "github.com/jbenet/go-ipfs/exchange/offline" - "github.com/jbenet/go-ipfs/peer" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - "github.com/jbenet/go-ipfs/blocks/blockstore" - bsrv "github.com/jbenet/go-ipfs/blockservice" - dag "github.com/jbenet/go-ipfs/merkledag" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) -func GetDAGServ(t testing.TB) dag.DAGService { - bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - bserv, err := bsrv.New(bstore, offline.Exchange(bstore)) - if err != nil { - t.Fatal(err) - } - return dag.NewDAGService(bserv) -} - func RandPeer() peer.Peer { id := make([]byte, 16) crand.Read(id) diff --git a/util/testutil/mock.go b/util/testutil/mock.go index 488a26044dd..1c5d4ded274 100644 --- a/util/testutil/mock.go +++ b/util/testutil/mock.go @@ -1,9 +1,8 @@ package testutil import ( - "github.com/jbenet/go-ipfs/peer" - ic "github.com/jbenet/go-ipfs/crypto" + peer "github.com/jbenet/go-ipfs/peer" ) func NewPeerWithKeyPair(sk ic.PrivKey, pk ic.PubKey) (peer.Peer, error) {