diff --git a/cmd/ipfs/ipfs.go b/cmd/ipfs/ipfs.go index 5935b090f8d..ad153feee58 100644 --- a/cmd/ipfs/ipfs.go +++ b/cmd/ipfs/ipfs.go @@ -41,8 +41,7 @@ Advanced Commands: mount Mount an ipfs read-only mountpoint. serve Serve an interface to ipfs. - - net-diag Print network diagnostic + net-diag Print network diagnostic Use "ipfs help " for more information about a command. `, diff --git a/core/commands/diag.go b/core/commands/diag.go index c06499ec63b..fdb84ecf493 100644 --- a/core/commands/diag.go +++ b/core/commands/diag.go @@ -8,8 +8,22 @@ import ( "time" "github.com/jbenet/go-ipfs/core" + diagn "github.com/jbenet/go-ipfs/diagnostics" ) +func PrintDiagnostics(info []*diagn.DiagInfo, out io.Writer) { + for _, i := range info { + fmt.Fprintf(out, "Peer: %s\n", i.ID) + fmt.Fprintf(out, "\tUp for: %s\n", i.LifeSpan.String()) + fmt.Fprintf(out, "\tConnected To:\n") + for _, c := range i.Connections { + fmt.Fprintf(out, "\t%s\n\t\tLatency = %s\n", c.ID, c.Latency.String()) + } + fmt.Fprintln(out) + } + +} + func Diag(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.Writer) error { if n.Diagnostics == nil { return errors.New("Cannot run diagnostic in offline mode!") @@ -29,15 +43,7 @@ func Diag(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.W return err } } else { - for _, i := range info { - fmt.Fprintf(out, "Peer: %s\n", i.ID) - fmt.Fprintf(out, "\tUp for: %s\n", i.LifeSpan.String()) - fmt.Fprintf(out, "\tConnected To:\n") - for _, c := range i.Connections { - fmt.Fprintf(out, "\t%s\n\t\tLatency = %s\n", c.ID, c.Latency.String()) - } - fmt.Fprintln(out) - } + PrintDiagnostics(info, out) } return nil } diff --git a/core/core.go b/core/core.go index d22390d9296..3b9cc1fb2e8 100644 --- a/core/core.go +++ b/core/core.go @@ -108,6 +108,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { route *dht.IpfsDHT exchangeSession exchange.Interface diagnostics *diag.Diagnostics + network inet.Network ) if online { @@ -135,6 +136,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { if err != nil { return nil, err } + network = net diagnostics = diag.NewDiagnostics(local, net, diagService) diagService.SetHandler(diagnostics) @@ -173,6 +175,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { Routing: route, Namesys: ns, Diagnostics: diagnostics, + Network: network, }, nil } diff --git a/diagnostics/diag.go b/diagnostics/diag.go index 8a6c636b6a5..f347c79ed6c 100644 --- a/diagnostics/diag.go +++ b/diagnostics/diag.go @@ -1,4 +1,4 @@ -package diagnostic +package diagnostics import ( "bytes" @@ -48,15 +48,17 @@ type connDiagInfo struct { ID string } -type diagInfo struct { +type DiagInfo struct { ID string Connections []connDiagInfo Keys []string LifeSpan time.Duration + BwIn uint64 + BwOut uint64 CodeVersion string } -func (di *diagInfo) Marshal() []byte { +func (di *DiagInfo) Marshal() []byte { b, err := json.Marshal(di) if err != nil { panic(err) @@ -69,12 +71,13 @@ func (d *Diagnostics) getPeers() []*peer.Peer { return d.network.GetPeerList() } -func (d *Diagnostics) getDiagInfo() *diagInfo { - di := new(diagInfo) +func (d *Diagnostics) getDiagInfo() *DiagInfo { + di := new(DiagInfo) di.CodeVersion = "github.com/jbenet/go-ipfs" di.ID = d.self.ID.Pretty() di.LifeSpan = time.Since(d.birth) di.Keys = nil // Currently no way to query datastore + di.BwIn, di.BwOut = d.network.GetBandwidthTotals() for _, p := range d.getPeers() { di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID.Pretty()}) @@ -88,7 +91,7 @@ func newID() string { return string(id) } -func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { +func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) { log.Debug("Getting diagnostic.") ctx, _ := context.WithTimeout(context.TODO(), timeout) @@ -102,7 +105,7 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) peers := d.getPeers() log.Debug("Sending diagnostic request to %d peers.", len(peers)) - var out []*diagInfo + var out []*DiagInfo di := d.getDiagInfo() out = append(out, di) @@ -134,15 +137,15 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) return out, nil } -func AppendDiagnostics(data []byte, cur []*diagInfo) []*diagInfo { +func AppendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo { buf := bytes.NewBuffer(data) dec := json.NewDecoder(buf) for { - di := new(diagInfo) + di := new(DiagInfo) err := dec.Decode(di) if err != nil { if err != io.EOF { - log.Error("error decoding diagInfo: %v", err) + log.Error("error decoding DiagInfo: %v", err) } break } @@ -216,6 +219,7 @@ func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, e sendcount := 0 for _, p := range d.getPeers() { log.Debug("Sending diagnostic request to peer: %s", p) + sendcount++ go func(p *peer.Peer) { out, err := d.getDiagnosticFromPeer(ctx, p, pmes) if err != nil { diff --git a/diagnostics/message.pb.go b/diagnostics/message.pb.go index a3ef994efbb..5132e1e2421 100644 --- a/diagnostics/message.pb.go +++ b/diagnostics/message.pb.go @@ -3,7 +3,7 @@ // DO NOT EDIT! /* -Package diagnostic is a generated protocol buffer package. +Package diagnostics is a generated protocol buffer package. It is generated from these files: message.proto @@ -11,9 +11,9 @@ It is generated from these files: It has these top-level messages: Message */ -package diagnostic +package diagnostics -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" +import proto "code.google.com/p/goprotobuf/proto" import math "math" // Reference imports to suppress errors if they are not otherwise used. diff --git a/diagnostics/message.proto b/diagnostics/message.proto index 349afba257d..ca1e367f277 100644 --- a/diagnostics/message.proto +++ b/diagnostics/message.proto @@ -1,4 +1,4 @@ -package diagnostic; +package diagnostics; message Message { required string DiagID = 1; diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 7eb8870aa50..b93b1a9b85e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -24,13 +24,12 @@ func NetMessageSession(parent context.Context, p *peer.Peer, net inet.Network, srv inet.Service, directory bsnet.Routing, d ds.Datastore, nice bool) exchange.Interface { - networkAdapter := bsnet.NetMessageAdapter(srv, nil) + networkAdapter := bsnet.NetMessageAdapter(srv, net, nil) bs := &bitswap{ blockstore: blockstore.NewBlockstore(d), notifications: notifications.New(), strategy: strategy.New(nice), routing: directory, - network: net, sender: networkAdapter, wantlist: u.NewKeySet(), } @@ -42,9 +41,6 @@ func NetMessageSession(parent context.Context, p *peer.Peer, // bitswap instances implement the bitswap protocol. type bitswap struct { - // network maintains connections to the outside world. - network inet.Network - // sender delivers messages on behalf of the session sender bsnet.Adapter @@ -85,11 +81,20 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) message.AppendWanted(wanted) } message.AppendWanted(k) - for iiiii := range peersToQuery { - log.Debug("bitswap got peersToQuery: %s", iiiii) + for peerToQuery := range peersToQuery { + log.Debug("bitswap got peersToQuery: %s", peerToQuery) go func(p *peer.Peer) { + + log.Debug("bitswap dialing peer: %s", p) + err := bs.sender.DialPeer(p) + if err != nil { + log.Error("Error sender.DialPeer(%s)", p) + return + } + response, err := bs.sender.SendRequest(ctx, p, message) if err != nil { + log.Error("Error sender.SendRequest(%s)", p) return } // FIXME ensure accounting is handled correctly when @@ -101,7 +106,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) return } bs.ReceiveMessage(ctx, p, response) - }(iiiii) + }(peerToQuery) } }() diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 8985ecefc30..03d7d341561 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -11,6 +11,9 @@ import ( // Adapter provides network connectivity for BitSwap sessions type Adapter interface { + // DialPeer ensures there is a connection to peer. + DialPeer(*peer.Peer) error + // SendMessage sends a BitSwap message to a peer. SendMessage( context.Context, diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index a95e566ccaa..52f42807688 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -10,9 +10,10 @@ import ( ) // NetMessageAdapter wraps a NetMessage network service -func NetMessageAdapter(s inet.Service, r Receiver) Adapter { +func NetMessageAdapter(s inet.Service, n inet.Network, r Receiver) Adapter { adapter := impl{ nms: s, + net: n, receiver: r, } s.SetHandler(&adapter) @@ -22,6 +23,7 @@ func NetMessageAdapter(s inet.Service, r Receiver) Adapter { // implements an Adapter that integrates with a NetMessage network service type impl struct { nms inet.Service + net inet.Network // inbound messages from the network are forwarded to the receiver receiver Receiver @@ -58,6 +60,10 @@ func (adapter *impl) HandleMessage( return outgoing } +func (adapter *impl) DialPeer(p *peer.Peer) error { + return adapter.net.DialPeer(p) +} + func (adapter *impl) SendMessage( ctx context.Context, p *peer.Peer, diff --git a/exchange/bitswap/testnet/network.go b/exchange/bitswap/testnet/network.go index 4d5f8c35ea4..c3081337df4 100644 --- a/exchange/bitswap/testnet/network.go +++ b/exchange/bitswap/testnet/network.go @@ -3,6 +3,7 @@ package bitswap import ( "bytes" "errors" + "fmt" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" @@ -14,6 +15,8 @@ import ( type Network interface { Adapter(*peer.Peer) bsnet.Adapter + HasPeer(*peer.Peer) bool + SendMessage( ctx context.Context, from *peer.Peer, @@ -49,6 +52,11 @@ func (n *network) Adapter(p *peer.Peer) bsnet.Adapter { return client } +func (n *network) HasPeer(p *peer.Peer) bool { + _, found := n.clients[p.Key()] + return found +} + // TODO should this be completely asynchronous? // TODO what does the network layer do with errors received from services? func (n *network) SendMessage( @@ -155,6 +163,14 @@ func (nc *networkClient) SendRequest( return nc.network.SendRequest(ctx, nc.local, to, message) } +func (nc *networkClient) DialPeer(p *peer.Peer) error { + // no need to do anything because dialing isn't a thing in this test net. + if !nc.network.HasPeer(p) { + return fmt.Errorf("Peer not in network: %s", p) + } + return nil +} + func (nc *networkClient) SetDelegate(r bsnet.Receiver) { nc.Receiver = r } diff --git a/msgproto/msgproto.go b/msgproto/msgproto.go deleted file mode 100644 index bdd9f1ed51d..00000000000 --- a/msgproto/msgproto.go +++ /dev/null @@ -1 +0,0 @@ -package msgproto diff --git a/net/conn/conn.go b/net/conn/conn.go index dcf6c923116..c00c3f46e4c 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -4,7 +4,6 @@ import ( "fmt" msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net" spipe "github.com/jbenet/go-ipfs/crypto/spipe" @@ -22,9 +21,9 @@ const MaxMessageSize = 1 << 20 // Conn represents a connection to another Peer (IPFS Node). type Conn struct { - Peer *peer.Peer - Addr ma.Multiaddr - Conn manet.Conn + Local *peer.Peer + Remote *peer.Peer + Conn manet.Conn Closed chan bool Outgoing *msgio.Chan @@ -36,11 +35,11 @@ type Conn struct { type Map map[u.Key]*Conn // NewConn constructs a new connection -func NewConn(peer *peer.Peer, addr ma.Multiaddr, mconn manet.Conn) (*Conn, error) { +func NewConn(local, remote *peer.Peer, mconn manet.Conn) (*Conn, error) { conn := &Conn{ - Peer: peer, - Addr: addr, - Conn: mconn, + Local: local, + Remote: remote, + Conn: mconn, } if err := conn.newChans(); err != nil { @@ -52,18 +51,28 @@ func NewConn(peer *peer.Peer, addr ma.Multiaddr, mconn manet.Conn) (*Conn, error // Dial connects to a particular peer, over a given network // Example: Dial("udp", peer) -func Dial(network string, peer *peer.Peer) (*Conn, error) { - addr := peer.NetAddress(network) - if addr == nil { - return nil, fmt.Errorf("No address for network %s", network) +func Dial(network string, local, remote *peer.Peer) (*Conn, error) { + laddr := local.NetAddress(network) + if laddr == nil { + return nil, fmt.Errorf("No local address for network %s", network) } - nconn, err := manet.Dial(addr) + raddr := remote.NetAddress(network) + if raddr == nil { + return nil, fmt.Errorf("No remote address for network %s", network) + } + + // TODO: try to get reusing addr/ports to work. + // dialer := manet.Dialer{LocalAddr: laddr} + dialer := manet.Dialer{} + + log.Info("%s %s dialing %s %s", local, laddr, remote, raddr) + nconn, err := dialer.Dial(raddr) if err != nil { return nil, err } - return NewConn(peer, addr, nconn) + return NewConn(local, remote, nconn) } // Construct new channels for given Conn. @@ -84,7 +93,7 @@ func (c *Conn) newChans() error { // Close closes the connection, and associated channels. func (c *Conn) Close() error { - log.Debug("Closing Conn with %v", c.Peer) + log.Debug("%s closing Conn with %s", c.Local, c.Remote) if c.Conn == nil { return fmt.Errorf("Already closed") // already closed } diff --git a/net/conn/conn_test.go b/net/conn/conn_test.go index 95d5833dfa2..a076edcdad2 100644 --- a/net/conn/conn_test.go +++ b/net/conn/conn_test.go @@ -65,12 +65,17 @@ func TestDial(t *testing.T) { } go echoListen(listener) - p, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/1234") + p1, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/1234") if err != nil { t.Fatal("error setting up peer", err) } - c, err := Dial("tcp", p) + p2, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34", "/ip4/127.0.0.1/tcp/3456") + if err != nil { + t.Fatal("error setting up peer", err) + } + + c, err := Dial("tcp", p2, p1) if err != nil { t.Fatal("error dialing peer", err) } diff --git a/net/interface.go b/net/interface.go index dee1460fc81..379d0196805 100644 --- a/net/interface.go +++ b/net/interface.go @@ -29,6 +29,10 @@ type Network interface { // GetPeerList returns the list of peers currently connected in this network. GetPeerList() []*peer.Peer + // GetBandwidthTotals returns the total number of bytes passed through + // the network since it was instantiated + GetBandwidthTotals() (uint64, uint64) + // SendMessage sends given Message out SendMessage(msg.NetMessage) error diff --git a/net/mux/mux.go b/net/mux/mux.go index 3138fe873f9..ab325ecd55d 100644 --- a/net/mux/mux.go +++ b/net/mux/mux.go @@ -36,6 +36,12 @@ type Muxer struct { ctx context.Context wg sync.WaitGroup + bwiLock sync.Mutex + bwIn uint64 + + bwoLock sync.Mutex + bwOut uint64 + *msg.Pipe } @@ -76,6 +82,17 @@ func (m *Muxer) Start(ctx context.Context) error { return nil } +func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) { + m.bwiLock.Lock() + in = m.bwIn + m.bwiLock.Unlock() + + m.bwoLock.Lock() + out = m.bwOut + m.bwoLock.Unlock() + return +} + // Stop stops muxer activity. func (m *Muxer) Stop() { if m.cancel == nil { @@ -125,6 +142,11 @@ func (m *Muxer) handleIncomingMessages() { // handleIncomingMessage routes message to the appropriate protocol. func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) { + m.bwiLock.Lock() + // TODO: compensate for overhead + m.bwIn += uint64(len(m1.Data())) + m.bwiLock.Unlock() + data, pid, err := unwrapData(m1.Data()) if err != nil { log.Error("muxer de-serializing error: %v", err) @@ -173,6 +195,11 @@ func (m *Muxer) handleOutgoingMessage(pid ProtocolID, m1 msg.NetMessage) { return } + m.bwoLock.Lock() + // TODO: compensate for overhead + m.bwOut += uint64(len(data)) + m.bwoLock.Unlock() + m2 := msg.New(m1.Peer(), data) select { case m.GetPipe().Outgoing <- m2: diff --git a/net/net.go b/net/net.go index b5864fe68a6..9ec7d2982d4 100644 --- a/net/net.go +++ b/net/net.go @@ -111,3 +111,7 @@ func (n *IpfsNetwork) Close() error { func (n *IpfsNetwork) GetPeerList() []*peer.Peer { return n.swarm.GetPeerList() } + +func (n *IpfsNetwork) GetBandwidthTotals() (in uint64, out uint64) { + return n.muxer.GetBandwidthTotals() +} diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 7d5c47b5c9f..63fed2dfea6 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -42,6 +42,11 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error { return err } + // make sure port can be reused. TOOD this doesn't work... + // if err := setSocketReuse(list); err != nil { + // return err + // } + // NOTE: this may require a lock around it later. currently, only run on setup s.listeners = append(s.listeners, list) @@ -69,11 +74,9 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error { // Handle getting ID from this peer, handshake, and adding it into the map func (s *Swarm) handleIncomingConn(nconn manet.Conn) { - addr := nconn.RemoteMultiaddr() - // Construct conn with nil peer for now, because we don't know its ID yet. // connSetup will figure this out, and pull out / construct the peer. - c, err := conn.NewConn(nil, addr, nconn) + c, err := conn.NewConn(s.local, nil, nconn) if err != nil { s.errChan <- err return @@ -94,29 +97,32 @@ func (s *Swarm) connSetup(c *conn.Conn) error { return errors.New("Tried to start nil connection.") } - if c.Peer != nil { - log.Debug("Starting connection: %s", c.Peer) + if c.Remote != nil { + log.Debug("%s Starting connection: %s", c.Local, c.Remote) } else { - log.Debug("Starting connection: [unknown peer]") + log.Debug("%s Starting connection: [unknown peer]", c.Local) } if err := s.connSecure(c); err != nil { return fmt.Errorf("Conn securing error: %v", err) } - log.Debug("Secured connection: %s", c.Peer) + log.Debug("%s secured connection: %s", c.Local, c.Remote) // add address of connection to Peer. Maybe it should happen in connSecure. - c.Peer.AddAddress(c.Addr) + // NOT adding this address here, because the incoming address in TCP + // is an EPHEMERAL address, and not the address we want to keep around. + // addresses should be figured out through the DHT. + // c.Remote.AddAddress(c.Conn.RemoteMultiaddr()) // add to conns s.connsLock.Lock() - if _, ok := s.conns[c.Peer.Key()]; ok { + if _, ok := s.conns[c.Remote.Key()]; ok { log.Debug("Conn already open!") s.connsLock.Unlock() return ErrAlreadyOpen } - s.conns[c.Peer.Key()] = c + s.conns[c.Remote.Key()] = c log.Debug("Added conn to map!") s.connsLock.Unlock() @@ -141,10 +147,10 @@ func (s *Swarm) connSecure(c *conn.Conn) error { return err } - if c.Peer == nil { - c.Peer = sp.RemotePeer() + if c.Remote == nil { + c.Remote = sp.RemotePeer() - } else if c.Peer != sp.RemotePeer() { + } else if c.Remote != sp.RemotePeer() { panic("peers not being constructed correctly.") } @@ -198,20 +204,44 @@ func (s *Swarm) fanIn(c *conn.Conn) { case data, ok := <-c.Secure.In: if !ok { - e := fmt.Errorf("Error retrieving from conn: %v", c.Peer) + e := fmt.Errorf("Error retrieving from conn: %v", c.Remote) s.errChan <- e goto out } // log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer) - msg := msg.New(c.Peer, data) + msg := msg.New(c.Remote, data) s.Incoming <- msg } } out: s.connsLock.Lock() - delete(s.conns, c.Peer.Key()) + delete(s.conns, c.Remote.Key()) s.connsLock.Unlock() } + +// Commenting out because it's platform specific +// func setSocketReuse(l manet.Listener) error { +// nl := l.NetListener() +// +// // for now only TCP. TODO change this when more networks. +// file, err := nl.(*net.TCPListener).File() +// if err != nil { +// return err +// } +// +// fd := file.Fd() +// err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) +// if err != nil { +// return err +// } +// +// err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1) +// if err != nil { +// return err +// } +// +// return nil +// } diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 057e4ad2609..e8bb8bdbc1b 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -11,7 +11,6 @@ import ( u "github.com/jbenet/go-ipfs/util" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net" ) @@ -129,7 +128,7 @@ func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) { } // open connection to peer - c, err = conn.Dial("tcp", peer) + c, err = conn.Dial("tcp", s.local, peer) if err != nil { return nil, err } @@ -142,30 +141,6 @@ func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) { return c, nil } -// DialAddr is for connecting to a peer when you know their addr but not their ID. -// Should only be used when sure that not connected to peer in question -// TODO(jbenet) merge with Dial? need way to patch back. -func (s *Swarm) DialAddr(addr ma.Multiaddr) (*conn.Conn, error) { - if addr == nil { - return nil, errors.New("addr must be a non-nil Multiaddr") - } - - npeer := new(peer.Peer) - npeer.AddAddress(addr) - - c, err := conn.Dial("tcp", npeer) - if err != nil { - return nil, err - } - - if err := s.connSetup(c); err != nil { - c.Close() - return nil, err - } - - return c, err -} - // GetConnection returns the connection in the swarm to given peer.ID func (s *Swarm) GetConnection(pid peer.ID) *conn.Conn { s.connsLock.RLock() @@ -201,11 +176,12 @@ func (s *Swarm) GetErrChan() chan error { return s.errChan } +// GetPeerList returns a copy of the set of peers swarm is connected to. func (s *Swarm) GetPeerList() []*peer.Peer { var out []*peer.Peer s.connsLock.RLock() for _, p := range s.conns { - out = append(out, p.Peer) + out = append(out, p.Remote) } s.connsLock.RUnlock() return out diff --git a/routing/dht/Message.go b/routing/dht/Message.go index e4607f1de3e..526724287b9 100644 --- a/routing/dht/Message.go +++ b/routing/dht/Message.go @@ -1,7 +1,10 @@ package dht import ( + "errors" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" peer "github.com/jbenet/go-ipfs/peer" ) @@ -35,6 +38,14 @@ func peersToPBPeers(peers []*peer.Peer) []*Message_Peer { return pbpeers } +// Address returns a multiaddr associated with the Message_Peer entry +func (m *Message_Peer) Address() (ma.Multiaddr, error) { + if m == nil { + return nil, errors.New("MessagePeer is nil") + } + return ma.NewMultiaddr(*m.Addr) +} + // GetClusterLevel gets and adjusts the cluster level on the message. // a +/- 1 adjustment is needed to distinguish a valid first level (1) and // default "no value" protobuf behavior (0) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index c95e0751136..e00b82bf2c5 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -76,7 +76,7 @@ func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sende // Connect to a new peer at the given address, ping and add to the routing table func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer, error) { - log.Debug("Connect to new peer: %s\n", npeer) + log.Debug("Connect to new peer: %s", npeer) // TODO(jbenet,whyrusleeping) // @@ -109,13 +109,13 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N mData := mes.Data() if mData == nil { - // TODO handle/log err + log.Error("Message contained nil data.") return nil } mPeer := mes.Peer() if mPeer == nil { - // TODO handle/log err + log.Error("Message contained nil peer.") return nil } @@ -123,7 +123,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N pmes := new(Message) err := proto.Unmarshal(mData, pmes) if err != nil { - // TODO handle/log err + log.Error("Error unmarshaling data") return nil } @@ -138,25 +138,27 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N handler := dht.handlerForMsgType(pmes.GetType()) if handler == nil { // TODO handle/log err + log.Error("got back nil handler from handlerForMsgType") return nil } // dispatch handler. rpmes, err := handler(mPeer, pmes) if err != nil { - // TODO handle/log err + log.Error("handle message error: %s", err) return nil } // if nil response, return it before serializing if rpmes == nil { + log.Warning("Got back nil response from request.") return nil } // serialize response msg rmes, err := msg.FromObject(mPeer, rpmes) if err != nil { - // TODO handle/log err + log.Error("serialze response error: %s", err) return nil } @@ -197,6 +199,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message return rpmes, nil } +// putValueToNetwork stores the given key/value pair at the peer 'p' func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, key string, value []byte) error { @@ -216,13 +219,17 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error { pmes := newMessage(Message_ADD_PROVIDER, string(key), 0) + + // add self as the provider + pmes.ProviderPeers = peersToPBPeers([]*peer.Peer{dht.self}) + rpmes, err := dht.sendRequest(ctx, p, pmes) if err != nil { return err } log.Debug("%s putProvider: %s for %s", dht.self, p, key) - if *rpmes.Key != *pmes.Key { + if rpmes.GetKey() != pmes.GetKey() { return errors.New("provider not added correctly") } @@ -257,23 +264,11 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, // Perhaps we were given closer peers var peers []*peer.Peer for _, pb := range pmes.GetCloserPeers() { - if peer.ID(pb.GetId()).Equal(dht.self.ID) { - continue - } - - addr, err := ma.NewMultiaddr(pb.GetAddr()) + pr, err := dht.addPeer(pb) if err != nil { - log.Error("%v", err.Error()) + log.Error("%s", err) continue } - - // check if we already have this peer. - pr, _ := dht.peerstore.Get(peer.ID(pb.GetId())) - if pr == nil { - pr = &peer.Peer{ID: peer.ID(pb.GetId())} - dht.peerstore.Put(pr) - } - pr.AddAddress(addr) // idempotent peers = append(peers, pr) } @@ -286,6 +281,27 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, return nil, nil, u.ErrNotFound } +func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) { + if peer.ID(pb.GetId()).Equal(dht.self.ID) { + return nil, errors.New("cannot add self as peer") + } + + addr, err := ma.NewMultiaddr(pb.GetAddr()) + if err != nil { + return nil, err + } + + // check if we already have this peer. + pr, _ := dht.peerstore.Get(peer.ID(pb.GetId())) + if pr == nil { + pr = &peer.Peer{ID: peer.ID(pb.GetId())} + dht.peerstore.Put(pr) + } + pr.AddAddress(addr) // idempotent + + return pr, nil +} + // getValueSingle simply performs the get value RPC with the given parameters func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*Message, error) { @@ -323,6 +339,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key, return nil, u.ErrNotFound } +// getLocal attempts to retrieve the value from the datastore func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) { dht.dslock.Lock() defer dht.dslock.Unlock() @@ -338,6 +355,7 @@ func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) { return byt, nil } +// putLocal stores the key value pair in the datastore func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error { return dht.datastore.Put(key.DsKey(), value) } @@ -364,8 +382,8 @@ func (dht *IpfsDHT) Update(p *peer.Peer) { // after some deadline of inactivity. } -// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. -func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) { +// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. +func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) { for _, table := range dht.routingTables { p := table.Find(id) if p != nil { @@ -415,39 +433,44 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer return provArr } -// nearestPeerToQuery returns the routing tables closest peers. -func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer { +// nearestPeersToQuery returns the routing tables closest peers. +func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer { level := pmes.GetClusterLevel() cluster := dht.routingTables[level] key := u.Key(pmes.GetKey()) - closer := cluster.NearestPeer(kb.ConvertKey(key)) + closer := cluster.NearestPeers(kb.ConvertKey(key), count) return closer } -// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self. -func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer { - closer := dht.nearestPeerToQuery(pmes) +// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self. +func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer { + closer := dht.nearestPeersToQuery(pmes, count) // no node? nil if closer == nil { return nil } - // == to self? nil - if closer.ID.Equal(dht.self.ID) { - log.Error("Attempted to return self! this shouldnt happen...") - return nil + // == to self? thats bad + for _, p := range closer { + if p.ID.Equal(dht.self.ID) { + log.Error("Attempted to return self! this shouldnt happen...") + return nil + } } - // self is closer? nil - key := u.Key(pmes.GetKey()) - if kb.Closer(dht.self.ID, closer.ID, key) { - return nil + var filtered []*peer.Peer + for _, p := range closer { + // must all be closer than self + key := u.Key(pmes.GetKey()) + if !kb.Closer(dht.self.ID, p.ID, key) { + filtered = append(filtered, p) + } } - // ok seems like a closer node. - return closer + // ok seems like closer nodes + return filtered } func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) { @@ -461,14 +484,14 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) { p, _ := dht.peerstore.Get(id) if p == nil { - p, _ = dht.Find(id) + p, _ = dht.FindLocal(id) if p != nil { panic("somehow peer not getting into peerstore") } } if p == nil { - maddr, err := ma.NewMultiaddr(pbp.GetAddr()) + maddr, err := pbp.Address() if err != nil { return nil, err } @@ -477,6 +500,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) { p = &peer.Peer{ID: id} p.AddAddress(maddr) dht.peerstore.Put(p) + log.Info("dht found new peer: %s %s", p, maddr) } return p, nil } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 1d7413fd53b..bedda1afcd4 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -227,13 +227,16 @@ func TestProvides(t *testing.T) { time.Sleep(time.Millisecond * 60) ctxT, _ := context.WithTimeout(context.Background(), time.Second) - provs, err := dhts[0].FindProviders(ctxT, u.Key("hello")) - if err != nil { - t.Fatal(err) - } + provchan := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 1) - if len(provs) != 1 { - t.Fatal("Didnt get back providers") + after := time.After(time.Second) + select { + case prov := <-provchan: + if prov == nil { + t.Fatal("Got back nil provider") + } + case <-after: + t.Fatal("Did not get a provider back.") } } diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 417dd0918f1..4a9de160eaf 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -13,6 +13,8 @@ import ( ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" ) +var CloserPeerCount = 4 + // dhthandler specifies the signature of functions that handle DHT messages. type dhtHandler func(*peer.Peer, *Message) (*Message, error) @@ -83,10 +85,12 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error } // Find closest peer on given cluster to desired key and reply with that info - closer := dht.betterPeerToQuery(pmes) + closer := dht.betterPeersToQuery(pmes, CloserPeerCount) if closer != nil { - log.Debug("handleGetValue returning a closer peer: '%s'\n", closer) - resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer}) + for _, p := range closer { + log.Debug("handleGetValue returning closer peer: '%s'", p) + } + resp.CloserPeers = peersToPBPeers(closer) } return resp, nil @@ -109,27 +113,31 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) { func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) { resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel()) - var closest *peer.Peer + var closest []*peer.Peer // if looking for self... special case where we send it on CloserPeers. if peer.ID(pmes.GetKey()).Equal(dht.self.ID) { - closest = dht.self + closest = []*peer.Peer{dht.self} } else { - closest = dht.betterPeerToQuery(pmes) + closest = dht.betterPeersToQuery(pmes, CloserPeerCount) } if closest == nil { - log.Error("handleFindPeer: could not find anything.\n") + log.Error("handleFindPeer: could not find anything.") return resp, nil } - if len(closest.Addresses) == 0 { - log.Error("handleFindPeer: no addresses for connected peer...\n") - return resp, nil + var withAddresses []*peer.Peer + for _, p := range closest { + if len(p.Addresses) > 0 { + withAddresses = append(withAddresses, p) + } } - log.Debug("handleFindPeer: sending back '%s'\n", closest) - resp.CloserPeers = peersToPBPeers([]*peer.Peer{closest}) + for _, p := range withAddresses { + log.Debug("handleFindPeer: sending back '%s'", p) + } + resp.CloserPeers = peersToPBPeers(withAddresses) return resp, nil } @@ -157,9 +165,9 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, e } // Also send closer peers. - closer := dht.betterPeerToQuery(pmes) + closer := dht.betterPeersToQuery(pmes, CloserPeerCount) if closer != nil { - resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer}) + resp.CloserPeers = peersToPBPeers(closer) } return resp, nil @@ -175,7 +183,26 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, er log.Debug("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key)) - dht.providers.AddProvider(key, p) + // add provider should use the address given in the message + for _, pb := range pmes.GetProviderPeers() { + pid := peer.ID(pb.GetId()) + if pid.Equal(p.ID) { + + addr, err := pb.Address() + if err != nil { + log.Error("provider %s error with address %s", p, *pb.Addr) + continue + } + + log.Info("received provider %s %s for %s", p, addr, key) + p.AddAddress(addr) + dht.providers.AddProvider(key, p) + + } else { + log.Error("handleAddProvider received provider %s from %s", pid, p) + } + } + return pmes, nil // send back same msg as confirmation. } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index c14031ce2b5..03d94d118f7 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -3,6 +3,7 @@ package dht import ( "bytes" "encoding/json" + "sync" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -117,26 +118,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { return nil } -// NB: not actually async. Used to keep the interface consistent while the -// actual async method, FindProvidersAsync2 is under construction func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan *peer.Peer { - ch := make(chan *peer.Peer) - providers, err := dht.FindProviders(ctx, key) - if err != nil { - close(ch) - return ch - } - go func() { - defer close(ch) - for _, p := range providers { - ch <- p - } - }() - return ch -} - -// FIXME: there's a bug here! -func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count int) <-chan *peer.Peer { peerOut := make(chan *peer.Peer, count) go func() { ps := newPeerSet() @@ -151,9 +133,12 @@ func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count in } } + wg := new(sync.WaitGroup) peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) for _, pp := range peers { + wg.Add(1) go func(p *peer.Peer) { + defer wg.Done() pmes, err := dht.findProvidersSingle(ctx, p, key, 0) if err != nil { log.Error("%s", err) @@ -162,7 +147,8 @@ func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count in dht.addPeerListAsync(key, pmes.GetProviderPeers(), ps, count, peerOut) }(pp) } - + wg.Wait() + close(peerOut) }() return peerOut } @@ -186,61 +172,16 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet } } -// FindProviders searches for peers who can provide the value for given key. -func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]*peer.Peer, error) { - // get closest peer - log.Debug("Find providers for: '%s'", key) - p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key)) - if p == nil { - log.Warning("Got no nearest peer for find providers: '%s'", key) - return nil, nil - } - - for level := 0; level < len(dht.routingTables); { - - // attempt retrieving providers - pmes, err := dht.findProvidersSingle(ctx, p, key, level) - if err != nil { - return nil, err - } - - // handle providers - provs := pmes.GetProviderPeers() - if provs != nil { - log.Debug("Got providers back from findProviders call!") - return dht.addProviders(key, provs), nil - } - - log.Debug("Didnt get providers, just closer peers.") - closer := pmes.GetCloserPeers() - if len(closer) == 0 { - level++ - continue - } - - np, err := dht.peerFromInfo(closer[0]) - if err != nil { - log.Debug("no peerFromInfo") - level++ - continue - } - p = np - } - return nil, u.ErrNotFound -} - // Find specific Peer - // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) { // Check if were already connected to them - p, _ := dht.Find(id) + p, _ := dht.FindLocal(id) if p != nil { return p, nil } - // @whyrusleeping why is this here? doesn't the dht.Find above cover it? routeLevel := 0 p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id)) if p == nil { @@ -277,7 +218,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) { // Check if were already connected to them - p, _ := dht.Find(id) + p, _ := dht.FindLocal(id) if p != nil { return p, nil } diff --git a/routing/routing.go b/routing/routing.go index 4669fb48c7b..f3dd0c9d86a 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -26,11 +26,7 @@ type IpfsRouting interface { // Announce that this node can provide value for given key Provide(context.Context, u.Key) error - // FindProviders searches for peers who can provide the value for given key. - FindProviders(context.Context, u.Key) ([]*peer.Peer, error) - // Find specific Peer - // FindPeer searches for a peer with given ID. FindPeer(context.Context, peer.ID) (*peer.Peer, error) }