From 68db754e53bc1bac65f783489114f6a3b3c749e3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 14 Jul 2017 20:51:46 +0300 Subject: [PATCH 01/29] pb/relay: CiruitRelay protobuf --- .../internal/circuitv1-deprecated/pb/Makefile | 10 + .../circuitv1-deprecated/pb/relay.pb.go | 199 ++++++++++++++++++ .../circuitv1-deprecated/pb/relay.proto | 41 ++++ 3 files changed, 250 insertions(+) create mode 100644 p2p/protocol/internal/circuitv1-deprecated/pb/Makefile create mode 100644 p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go create mode 100644 p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/Makefile b/p2p/protocol/internal/circuitv1-deprecated/pb/Makefile new file mode 100644 index 0000000000..95e2e1c206 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/Makefile @@ -0,0 +1,10 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --gogo_out=. $< + +clean: + rm *.pb.go diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go new file mode 100644 index 0000000000..fee05f8ee5 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go @@ -0,0 +1,199 @@ +// Code generated by protoc-gen-gogo. +// source: relay.proto +// DO NOT EDIT! + +/* +Package relay_pb is a generated protocol buffer package. + +It is generated from these files: + relay.proto + +It has these top-level messages: + CircuitRelay +*/ +package relay_pb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type CircuitRelay_Status int32 + +const ( + CircuitRelay_SUCCESS CircuitRelay_Status = 100 + CircuitRelay_HOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 220 + CircuitRelay_HOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 221 + CircuitRelay_HOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 250 + CircuitRelay_HOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 251 + CircuitRelay_HOP_NO_CONN_TO_DST CircuitRelay_Status = 260 + CircuitRelay_HOP_CANT_DIAL_DST CircuitRelay_Status = 261 + CircuitRelay_HOP_CANT_OPEN_DST_STREAM CircuitRelay_Status = 262 + CircuitRelay_HOP_CANT_SPEAK_RELAY CircuitRelay_Status = 270 + CircuitRelay_HOP_CANT_REALAY_TO_SELF CircuitRelay_Status = 280 + CircuitRelay_STOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 320 + CircuitRelay_STOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 321 + CircuitRelay_STOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 350 + CircuitRelay_STOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 351 +) + +var CircuitRelay_Status_name = map[int32]string{ + 100: "SUCCESS", + 220: "HOP_SRC_ADDR_TOO_LONG", + 221: "HOP_DST_ADDR_TOO_LONG", + 250: "HOP_SRC_MULTIADDR_INVALID", + 251: "HOP_DST_MULTIADDR_INVALID", + 260: "HOP_NO_CONN_TO_DST", + 261: "HOP_CANT_DIAL_DST", + 262: "HOP_CANT_OPEN_DST_STREAM", + 270: "HOP_CANT_SPEAK_RELAY", + 280: "HOP_CANT_REALAY_TO_SELF", + 320: "STOP_SRC_ADDR_TOO_LONG", + 321: "STOP_DST_ADDR_TOO_LONG", + 350: "STOP_SRC_MULTIADDR_INVALID", + 351: "STOP_DST_MULTIADDR_INVALID", +} +var CircuitRelay_Status_value = map[string]int32{ + "SUCCESS": 100, + "HOP_SRC_ADDR_TOO_LONG": 220, + "HOP_DST_ADDR_TOO_LONG": 221, + "HOP_SRC_MULTIADDR_INVALID": 250, + "HOP_DST_MULTIADDR_INVALID": 251, + "HOP_NO_CONN_TO_DST": 260, + "HOP_CANT_DIAL_DST": 261, + "HOP_CANT_OPEN_DST_STREAM": 262, + "HOP_CANT_SPEAK_RELAY": 270, + "HOP_CANT_REALAY_TO_SELF": 280, + "STOP_SRC_ADDR_TOO_LONG": 320, + "STOP_DST_ADDR_TOO_LONG": 321, + "STOP_SRC_MULTIADDR_INVALID": 350, + "STOP_DST_MULTIADDR_INVALID": 351, +} + +func (x CircuitRelay_Status) Enum() *CircuitRelay_Status { + p := new(CircuitRelay_Status) + *p = x + return p +} +func (x CircuitRelay_Status) String() string { + return proto.EnumName(CircuitRelay_Status_name, int32(x)) +} +func (x *CircuitRelay_Status) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(CircuitRelay_Status_value, data, "CircuitRelay_Status") + if err != nil { + return err + } + *x = CircuitRelay_Status(value) + return nil +} + +type CircuitRelay_Type int32 + +const ( + CircuitRelay_HOP CircuitRelay_Type = 1 + CircuitRelay_STOP CircuitRelay_Type = 2 + CircuitRelay_STATUS CircuitRelay_Type = 3 +) + +var CircuitRelay_Type_name = map[int32]string{ + 1: "HOP", + 2: "STOP", + 3: "STATUS", +} +var CircuitRelay_Type_value = map[string]int32{ + "HOP": 1, + "STOP": 2, + "STATUS": 3, +} + +func (x CircuitRelay_Type) Enum() *CircuitRelay_Type { + p := new(CircuitRelay_Type) + *p = x + return p +} +func (x CircuitRelay_Type) String() string { + return proto.EnumName(CircuitRelay_Type_name, int32(x)) +} +func (x *CircuitRelay_Type) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(CircuitRelay_Type_value, data, "CircuitRelay_Type") + if err != nil { + return err + } + *x = CircuitRelay_Type(value) + return nil +} + +type CircuitRelay struct { + Type *CircuitRelay_Type `protobuf:"varint,1,opt,name=type,enum=relay.pb.CircuitRelay_Type" json:"type,omitempty"` + SrcPeer *CircuitRelay_Peer `protobuf:"bytes,2,opt,name=srcPeer" json:"srcPeer,omitempty"` + DstPeer *CircuitRelay_Peer `protobuf:"bytes,3,opt,name=dstPeer" json:"dstPeer,omitempty"` + Code *CircuitRelay_Status `protobuf:"varint,4,opt,name=code,enum=relay.pb.CircuitRelay_Status" json:"code,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *CircuitRelay) Reset() { *m = CircuitRelay{} } +func (m *CircuitRelay) String() string { return proto.CompactTextString(m) } +func (*CircuitRelay) ProtoMessage() {} + +func (m *CircuitRelay) GetType() CircuitRelay_Type { + if m != nil && m.Type != nil { + return *m.Type + } + return CircuitRelay_HOP +} + +func (m *CircuitRelay) GetSrcPeer() *CircuitRelay_Peer { + if m != nil { + return m.SrcPeer + } + return nil +} + +func (m *CircuitRelay) GetDstPeer() *CircuitRelay_Peer { + if m != nil { + return m.DstPeer + } + return nil +} + +func (m *CircuitRelay) GetCode() CircuitRelay_Status { + if m != nil && m.Code != nil { + return *m.Code + } + return CircuitRelay_SUCCESS +} + +type CircuitRelay_Peer struct { + Id []byte `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` + Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *CircuitRelay_Peer) Reset() { *m = CircuitRelay_Peer{} } +func (m *CircuitRelay_Peer) String() string { return proto.CompactTextString(m) } +func (*CircuitRelay_Peer) ProtoMessage() {} + +func (m *CircuitRelay_Peer) GetId() []byte { + if m != nil { + return m.Id + } + return nil +} + +func (m *CircuitRelay_Peer) GetAddrs() [][]byte { + if m != nil { + return m.Addrs + } + return nil +} + +func init() { + proto.RegisterType((*CircuitRelay)(nil), "relay.pb.CircuitRelay") + proto.RegisterType((*CircuitRelay_Peer)(nil), "relay.pb.CircuitRelay.Peer") + proto.RegisterEnum("relay.pb.CircuitRelay_Status", CircuitRelay_Status_name, CircuitRelay_Status_value) + proto.RegisterEnum("relay.pb.CircuitRelay_Type", CircuitRelay_Type_name, CircuitRelay_Type_value) +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto new file mode 100644 index 0000000000..46a7787e39 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto @@ -0,0 +1,41 @@ +syntax = "proto2"; + +package relay.pb; + +message CircuitRelay { + + enum Status { + SUCCESS = 100; + HOP_SRC_ADDR_TOO_LONG = 220; + HOP_DST_ADDR_TOO_LONG = 221; + HOP_SRC_MULTIADDR_INVALID = 250; + HOP_DST_MULTIADDR_INVALID = 251; + HOP_NO_CONN_TO_DST = 260; + HOP_CANT_DIAL_DST = 261; + HOP_CANT_OPEN_DST_STREAM = 262; + HOP_CANT_SPEAK_RELAY = 270; + HOP_CANT_REALAY_TO_SELF = 280; + STOP_SRC_ADDR_TOO_LONG = 320; + STOP_DST_ADDR_TOO_LONG = 321; + STOP_SRC_MULTIADDR_INVALID = 350; + STOP_DST_MULTIADDR_INVALID = 351; + } + + enum Type { // RPC identifier, either HOP, STOP or STATUS + HOP = 1; + STOP = 2; + STATUS = 3; + } + + message Peer { + required bytes id = 1; // peer id + repeated bytes addrs = 2; // peer's known addresses + } + + optional Type type = 1; // Type of the message + + optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STATUS + optional Peer dstPeer = 3; + + optional Status code = 4; // Status code, used when Type is STATUS +} From 59ce49e385b259e78bf3163da31087091c8a70c1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 14 Jul 2017 20:52:02 +0300 Subject: [PATCH 02/29] delete unnecessary status/util --- .../internal/circuitv1-deprecated/status.go | 62 ------------------ .../circuitv1-deprecated/status_test.go | 32 ---------- .../internal/circuitv1-deprecated/util.go | 63 ------------------- .../circuitv1-deprecated/util_test.go | 44 ------------- 4 files changed, 201 deletions(-) delete mode 100644 p2p/protocol/internal/circuitv1-deprecated/status.go delete mode 100644 p2p/protocol/internal/circuitv1-deprecated/status_test.go delete mode 100644 p2p/protocol/internal/circuitv1-deprecated/util.go delete mode 100644 p2p/protocol/internal/circuitv1-deprecated/util_test.go diff --git a/p2p/protocol/internal/circuitv1-deprecated/status.go b/p2p/protocol/internal/circuitv1-deprecated/status.go deleted file mode 100644 index 8d9c2582ed..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/status.go +++ /dev/null @@ -1,62 +0,0 @@ -package relay - -import ( - "encoding/binary" - "fmt" - "io" -) - -const maxStatusMessageLength = 1024 - -const ( - StatusOK = 100 - - StatusRelayAddrErr = 250 - StatusRelayNotConnected = 260 - StatusRelayDialFailed = 261 - StatusRelayStreamFailed = 262 - StatusRelayHopToSelf = 270 - - StatusDstAddrErr = 350 - StatusDstRelayRefused = 380 - StatusDstWrongDst = 381 -) - -type RelayStatus struct { - Code uint64 - Message string -} - -func (s *RelayStatus) WriteTo(w io.Writer) error { - outbuf := make([]byte, 2*binary.MaxVarintLen64+len(s.Message)) - n := binary.PutUvarint(outbuf, s.Code) - n += binary.PutUvarint(outbuf[n:], uint64(len(s.Message))) - n += copy(outbuf[n:], s.Message) - _, err := w.Write(outbuf[:n]) - return err -} - -func (s *RelayStatus) ReadFrom(r io.Reader) error { - br := &singleByteReader{r} - code, err := binary.ReadUvarint(br) - if err != nil { - return err - } - l, err := binary.ReadUvarint(br) - if err != nil { - return err - } - buf := make([]byte, l) - _, err = io.ReadFull(r, buf) - if err != nil { - return err - } - - s.Code = code - s.Message = string(buf) - return nil -} - -func (s *RelayStatus) Error() string { - return fmt.Sprintf("%d: %s", s.Code, s.Message) -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/status_test.go b/p2p/protocol/internal/circuitv1-deprecated/status_test.go deleted file mode 100644 index 25fe4e0f32..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/status_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package relay - -import ( - "bytes" - "testing" -) - -func TestStatusParsing(t *testing.T) { - s := &RelayStatus{ - Code: StatusDstAddrErr, - Message: "foo bar", - } - - buf := new(bytes.Buffer) - - if err := s.WriteTo(buf); err != nil { - t.Fatal(err) - } - - var ns RelayStatus - if err := ns.ReadFrom(buf); err != nil { - t.Fatal(err) - } - - if ns.Code != s.Code { - t.Fatal("codes didnt match") - } - - if ns.Message != s.Message { - t.Fatal("messages didnt match") - } -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/util.go b/p2p/protocol/internal/circuitv1-deprecated/util.go deleted file mode 100644 index 15726d3d3d..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/util.go +++ /dev/null @@ -1,63 +0,0 @@ -package relay - -import ( - "encoding/binary" - "fmt" - "io" - - ma "github.com/multiformats/go-multiaddr" -) - -type singleByteReader struct { - r io.Reader -} - -func (s *singleByteReader) ReadByte() (byte, error) { - var b [1]byte - n, err := s.r.Read(b[:]) - if err != nil { - return 0, err - } - if n == 0 { - return 0, io.ErrNoProgress - } - - return b[0], nil -} - -func writeLpMultiaddr(w io.Writer, a ma.Multiaddr) error { - buf := make([]byte, binary.MaxVarintLen32+len(a.Bytes())) - n := binary.PutUvarint(buf, uint64(len(a.Bytes()))) - n += copy(buf[n:], a.Bytes()) - nw, err := w.Write(buf[:n]) - if err != nil { - return err - } - if n != nw { - return fmt.Errorf("failed to write all bytes to writer") - } - return nil -} - -func readLpMultiaddr(r io.Reader) (ma.Multiaddr, error) { - l, err := binary.ReadUvarint(&singleByteReader{r}) - if err != nil { - return nil, err - } - - if l > maxAddrLen { - return nil, fmt.Errorf("address length was too long: %d > %d", l, maxAddrLen) - } - - if l == 0 { - return nil, fmt.Errorf("zero length multiaddr is invalid") - } - - buf := make([]byte, l) - _, err = io.ReadFull(r, buf) - if err != nil { - return nil, err - } - - return ma.NewMultiaddrBytes(buf) -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/util_test.go b/p2p/protocol/internal/circuitv1-deprecated/util_test.go deleted file mode 100644 index b0f3f68cd6..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/util_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package relay - -import ( - "bytes" - "testing" - - ma "github.com/multiformats/go-multiaddr" -) - -func TestMultiaddrSerialization(t *testing.T) { - buf := new(bytes.Buffer) - - addr, err := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234") - if err != nil { - t.Fatal(err) - } - - if err := writeLpMultiaddr(buf, addr); err != nil { - t.Fatal(err) - } - - out, err := readLpMultiaddr(buf) - if err != nil { - t.Fatal(err) - } - - if !addr.Equal(out) { - t.Fatal("addresses didnt match") - } -} - -func TestDecodeInvalid(t *testing.T) { - buf := bytes.NewBuffer([]byte{72, 0, 0}) - _, err := readLpMultiaddr(buf) - if err == nil { - t.Fatal("shouldnt have parsed correctly") - } - - buf = bytes.NewBuffer([]byte{0}) - _, err = readLpMultiaddr(buf) - if err == nil { - t.Fatal("shouldnt have parsed correctly") - } -} From 55c4d357b769649ffba4f8ef96fc9b220d70c70e Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 14 Jul 2017 21:21:19 +0300 Subject: [PATCH 03/29] pb/relay: fix typos 1. it's not REALAY 2. srcaddr/dstaddr not meaningful for STATUS --- p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go | 6 +++--- p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go index fee05f8ee5..a7f7d9d8b9 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go @@ -34,7 +34,7 @@ const ( CircuitRelay_HOP_CANT_DIAL_DST CircuitRelay_Status = 261 CircuitRelay_HOP_CANT_OPEN_DST_STREAM CircuitRelay_Status = 262 CircuitRelay_HOP_CANT_SPEAK_RELAY CircuitRelay_Status = 270 - CircuitRelay_HOP_CANT_REALAY_TO_SELF CircuitRelay_Status = 280 + CircuitRelay_HOP_CANT_RELAY_TO_SELF CircuitRelay_Status = 280 CircuitRelay_STOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 320 CircuitRelay_STOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 321 CircuitRelay_STOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 350 @@ -51,7 +51,7 @@ var CircuitRelay_Status_name = map[int32]string{ 261: "HOP_CANT_DIAL_DST", 262: "HOP_CANT_OPEN_DST_STREAM", 270: "HOP_CANT_SPEAK_RELAY", - 280: "HOP_CANT_REALAY_TO_SELF", + 280: "HOP_CANT_RELAY_TO_SELF", 320: "STOP_SRC_ADDR_TOO_LONG", 321: "STOP_DST_ADDR_TOO_LONG", 350: "STOP_SRC_MULTIADDR_INVALID", @@ -67,7 +67,7 @@ var CircuitRelay_Status_value = map[string]int32{ "HOP_CANT_DIAL_DST": 261, "HOP_CANT_OPEN_DST_STREAM": 262, "HOP_CANT_SPEAK_RELAY": 270, - "HOP_CANT_REALAY_TO_SELF": 280, + "HOP_CANT_RELAY_TO_SELF": 280, "STOP_SRC_ADDR_TOO_LONG": 320, "STOP_DST_ADDR_TOO_LONG": 321, "STOP_SRC_MULTIADDR_INVALID": 350, diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto index 46a7787e39..0b2290048a 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto @@ -14,7 +14,7 @@ message CircuitRelay { HOP_CANT_DIAL_DST = 261; HOP_CANT_OPEN_DST_STREAM = 262; HOP_CANT_SPEAK_RELAY = 270; - HOP_CANT_REALAY_TO_SELF = 280; + HOP_CANT_RELAY_TO_SELF = 280; STOP_SRC_ADDR_TOO_LONG = 320; STOP_DST_ADDR_TOO_LONG = 321; STOP_SRC_MULTIADDR_INVALID = 350; @@ -34,7 +34,7 @@ message CircuitRelay { optional Type type = 1; // Type of the message - optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STATUS + optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STOP optional Peer dstPeer = 3; optional Status code = 4; // Status code, used when Type is STATUS From ce5e59807c2cfb07a16afb98e6033c37fa11620d Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 12:48:33 +0300 Subject: [PATCH 04/29] relay.proto: add more error codes --- .../circuitv1-deprecated/pb/relay.pb.go | 21 +++++++++++++------ .../circuitv1-deprecated/pb/relay.proto | 9 +++++--- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go index a7f7d9d8b9..9c3fc2b466 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go @@ -39,6 +39,8 @@ const ( CircuitRelay_STOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 321 CircuitRelay_STOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 350 CircuitRelay_STOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 351 + CircuitRelay_STOP_RELAY_REFUSED CircuitRelay_Status = 390 + CircuitRelay_MALFORMED_MESSAGE CircuitRelay_Status = 400 ) var CircuitRelay_Status_name = map[int32]string{ @@ -56,6 +58,8 @@ var CircuitRelay_Status_name = map[int32]string{ 321: "STOP_DST_ADDR_TOO_LONG", 350: "STOP_SRC_MULTIADDR_INVALID", 351: "STOP_DST_MULTIADDR_INVALID", + 390: "STOP_RELAY_REFUSED", + 400: "MALFORMED_MESSAGE", } var CircuitRelay_Status_value = map[string]int32{ "SUCCESS": 100, @@ -72,6 +76,8 @@ var CircuitRelay_Status_value = map[string]int32{ "STOP_DST_ADDR_TOO_LONG": 321, "STOP_SRC_MULTIADDR_INVALID": 350, "STOP_DST_MULTIADDR_INVALID": 351, + "STOP_RELAY_REFUSED": 390, + "MALFORMED_MESSAGE": 400, } func (x CircuitRelay_Status) Enum() *CircuitRelay_Status { @@ -94,20 +100,23 @@ func (x *CircuitRelay_Status) UnmarshalJSON(data []byte) error { type CircuitRelay_Type int32 const ( - CircuitRelay_HOP CircuitRelay_Type = 1 - CircuitRelay_STOP CircuitRelay_Type = 2 - CircuitRelay_STATUS CircuitRelay_Type = 3 + CircuitRelay_HOP CircuitRelay_Type = 1 + CircuitRelay_STOP CircuitRelay_Type = 2 + CircuitRelay_STATUS CircuitRelay_Type = 3 + CircuitRelay_CAN_HOP CircuitRelay_Type = 4 ) var CircuitRelay_Type_name = map[int32]string{ 1: "HOP", 2: "STOP", 3: "STATUS", + 4: "CAN_HOP", } var CircuitRelay_Type_value = map[string]int32{ - "HOP": 1, - "STOP": 2, - "STATUS": 3, + "HOP": 1, + "STOP": 2, + "STATUS": 3, + "CAN_HOP": 4, } func (x CircuitRelay_Type) Enum() *CircuitRelay_Type { diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto index 0b2290048a..de3e637b12 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto @@ -19,12 +19,15 @@ message CircuitRelay { STOP_DST_ADDR_TOO_LONG = 321; STOP_SRC_MULTIADDR_INVALID = 350; STOP_DST_MULTIADDR_INVALID = 351; + STOP_RELAY_REFUSED = 390; + MALFORMED_MESSAGE = 400; } enum Type { // RPC identifier, either HOP, STOP or STATUS - HOP = 1; - STOP = 2; - STATUS = 3; + HOP = 1; + STOP = 2; + STATUS = 3; + CAN_HOP = 4; } message Peer { From f18fc04bd4a7a69ff9ba219bb904acc0cb1b7909 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 13:16:07 +0300 Subject: [PATCH 05/29] implement ciruit/relay/0.1.0 spec --- .../internal/circuitv1-deprecated/conn.go | 10 +- .../internal/circuitv1-deprecated/dial.go | 11 +- .../internal/circuitv1-deprecated/listen.go | 20 +- .../internal/circuitv1-deprecated/relay.go | 250 +++++++++++------- .../internal/circuitv1-deprecated/util.go | 47 ++++ 5 files changed, 226 insertions(+), 112 deletions(-) create mode 100644 p2p/protocol/internal/circuitv1-deprecated/util.go diff --git a/p2p/protocol/internal/circuitv1-deprecated/conn.go b/p2p/protocol/internal/circuitv1-deprecated/conn.go index 73a66b78d0..f165647028 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/conn.go +++ b/p2p/protocol/internal/circuitv1-deprecated/conn.go @@ -8,6 +8,7 @@ import ( iconn "github.com/libp2p/go-libp2p-interface-conn" inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" tpt "github.com/libp2p/go-libp2p-transport" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" @@ -15,8 +16,7 @@ import ( type Conn struct { inet.Stream - remoteMaddr ma.Multiaddr - remotePeer peer.ID + remote pstore.PeerInfo } var _ iconn.Conn = (*Conn)(nil) @@ -37,12 +37,12 @@ func (n *NetAddr) String() string { func (c *Conn) RemoteAddr() net.Addr { return &NetAddr{ Relay: c.Conn().RemotePeer().Pretty(), - Remote: c.remotePeer.String(), + Remote: c.remote.ID.Pretty(), } } func (c *Conn) RemoteMultiaddr() ma.Multiaddr { - a, err := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s/p2p-circuit/%s", c.remoteMaddr, c.remotePeer.Pretty(), c.Conn().RemotePeer())) + a, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/%s", c.remote.ID.Pretty(), c.Conn().RemotePeer())) if err != nil { panic(err) } @@ -71,7 +71,7 @@ func (c *Conn) LocalPeer() peer.ID { } func (c *Conn) RemotePeer() peer.ID { - return c.remotePeer + return c.remote.ID } func (c *Conn) LocalPrivateKey() ic.PrivKey { diff --git a/p2p/protocol/internal/circuitv1-deprecated/dial.go b/p2p/protocol/internal/circuitv1-deprecated/dial.go index 1d2f103575..936cc21d71 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/dial.go +++ b/p2p/protocol/internal/circuitv1-deprecated/dial.go @@ -42,16 +42,9 @@ func (d *Dialer) DialPeer(ctx context.Context, p peer.ID, a ma.Multiaddr) (tpt.C return nil, err } - destp2p, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", p.Pretty())) - if err != nil { - return nil, err - } - - destp2p = destaddr.Encapsulate(destp2p) - - d.Relay().host.Peerstore().AddAddrs(rinfo.ID, rinfo.Addrs, pstore.TempAddrTTL) + dinfo := pstore.PeerInfo{ID: p, Addrs: []ma.Multiaddr{destaddr}} - return d.Relay().Dial(ctx, rinfo.ID, destp2p) + return d.Relay().Dial(ctx, *rinfo, dinfo) } func (d *Dialer) Matches(a ma.Multiaddr) bool { diff --git a/p2p/protocol/internal/circuitv1-deprecated/listen.go b/p2p/protocol/internal/circuitv1-deprecated/listen.go index 1c73a65417..8cc096eb8c 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/listen.go +++ b/p2p/protocol/internal/circuitv1-deprecated/listen.go @@ -3,6 +3,8 @@ package relay import ( "net" + pb "github.com/libp2p/go-libp2p-circuit/pb" + peer "github.com/libp2p/go-libp2p-peer" tpt "github.com/libp2p/go-libp2p-transport" filter "github.com/libp2p/go-maddr-filter" @@ -26,20 +28,20 @@ func (r *Relay) Matches(a ma.Multiaddr) bool { } func (l *RelayListener) Accept() (tpt.Conn, error) { - ctx := l.Relay().ctx select { case c := <-l.incoming: - log.Infof("accepted relay connection: %s", c.ID()) - s := RelayStatus{ - Code: StatusOK, - Message: "OK", - } - if err := s.WriteTo(c); err != nil { + err := l.Relay().writeResponse(c.Stream, pb.CircuitRelay_SUCCESS) + if err != nil { + log.Debugf("error writing relay response: %s", err.Error()) + c.Stream.Close() return nil, err } + + log.Infof("accepted relay connection: %s", c.ID()) + return c, nil - case <-ctx.Done(): - return nil, ctx.Err() + case <-l.ctx.Done(): + return nil, l.ctx.Err() } } diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index d4229567ae..65a04375a1 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -7,20 +7,21 @@ import ( "sync" "time" + pb "github.com/libp2p/go-libp2p-circuit/pb" + + ggio "github.com/gogo/protobuf/io" logging "github.com/ipfs/go-log" host "github.com/libp2p/go-libp2p-host" inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" ) var log = logging.Logger("relay") -const HopID = "/libp2p/relay/circuit/1.0.0/hop" -const StopID = "/libp2p/relay/circuit/1.0.0/stop" +const ProtoID = "/libp2p/circuit/relay/0.1.0" -const maxAddrLen = 1024 +const maxMessageSize = 4096 var RelayAcceptTimeout = time.Minute @@ -30,6 +31,7 @@ type Relay struct { self peer.ID active bool + hop bool incoming chan *Conn @@ -44,6 +46,14 @@ var ( OptHop = RelayOpt(1) ) +type RelayError struct { + Code int32 +} + +func (e RelayError) Error() string { + return fmt.Sprintf("error opening relay circuit: %s (%d)", pb.CircuitRelay_Status_name[e.Code], e.Code) +} + func NewRelay(ctx context.Context, h host.Host, opts ...RelayOpt) (*Relay, error) { r := &Relay{ host: h, @@ -52,138 +62,156 @@ func NewRelay(ctx context.Context, h host.Host, opts ...RelayOpt) (*Relay, error incoming: make(chan *Conn), } - h.SetStreamHandler(StopID, r.HandleNewStopStream) - for _, opt := range opts { switch opt { case OptActive: r.active = true case OptHop: - h.SetStreamHandler(HopID, r.HandleNewHopStream) + r.hop = true default: return nil, fmt.Errorf("unrecognized option: %d", opt) } } + h.SetStreamHandler(ProtoID, r.handleNewStream) + return r, nil } -func (r *Relay) Dial(ctx context.Context, relay peer.ID, dest ma.Multiaddr) (*Conn, error) { - s, err := r.host.NewStream(ctx, relay, HopID) +func (r *Relay) Dial(ctx context.Context, relay pstore.PeerInfo, dest pstore.PeerInfo) (*Conn, error) { + err := r.host.Connect(ctx, relay) if err != nil { return nil, err } - if err := writeLpMultiaddr(s, dest); err != nil { + s, err := r.host.NewStream(ctx, relay.ID, ProtoID) + if err != nil { return nil, err } - var stat RelayStatus - if err := stat.ReadFrom(s); err != nil { + rd := ggio.NewDelimitedReader(s, maxMessageSize) + wr := ggio.NewDelimitedWriter(s) + + var msg pb.CircuitRelay + + msg.Type = pb.CircuitRelay_HOP.Enum() + msg.SrcPeer = peerInfoToPeer(pstore.PeerInfo{r.self, r.host.Addrs()}) + msg.DstPeer = peerInfoToPeer(dest) + + err = wr.WriteMsg(&msg) + if err != nil { + s.Close() return nil, err } - if stat.Code != StatusOK { - return nil, &stat + msg.Reset() + + err = rd.ReadMsg(&msg) + if err != nil { + s.Close() + return nil, err } - return &Conn{Stream: s}, nil -} + if msg.GetType() != pb.CircuitRelay_STATUS { + s.Close() + return nil, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType()) + } -func (r *Relay) HandleNewStopStream(s inet.Stream) { - log.Infof("new stop stream from: %s", s.Conn().RemotePeer()) - status := r.handleNewStopStream(s) - if status != nil { - if err := status.WriteTo(s); err != nil { - log.Info("problem writing error status:", err) - } + if msg.GetCode() != pb.CircuitRelay_SUCCESS { s.Close() - return + return nil, RelayError{int32(msg.GetCode())} } + + return &Conn{Stream: s}, nil } -func (r *Relay) handleNewStopStream(s inet.Stream) *RelayStatus { - info, err := r.readInfo(s) +func (r *Relay) handleNewStream(s inet.Stream) { + log.Infof("new relay stream from: %s", s.Conn().RemotePeer()) + + rd := ggio.NewDelimitedReader(s, maxMessageSize) + + var msg pb.CircuitRelay + + err := rd.ReadMsg(&msg) if err != nil { - return &RelayStatus{ - Code: StatusDstAddrErr, - Message: err.Error(), - } + r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE) + return } - log.Infof("relay connection from: %s", info.ID) - select { - case r.incoming <- &Conn{Stream: s, remoteMaddr: info.Addrs[0], remotePeer: info.ID}: - return nil - case <-time.After(RelayAcceptTimeout): - return &RelayStatus{ - Code: StatusDstRelayRefused, - Message: "timed out waiting for relay to be accepted", - } + switch msg.GetType() { + case pb.CircuitRelay_HOP: + r.handleHopStream(s, &msg) + case pb.CircuitRelay_STOP: + r.handleStopStream(s, &msg) + case pb.CircuitRelay_CAN_HOP: + r.handleCanHop(s, &msg) + default: + log.Warningf("unexpected relay handshake: %d", msg.GetType()) + r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE) } } -func (r *Relay) HandleNewHopStream(s inet.Stream) { - log.Infof("new hop stream from: %s", s.Conn().RemotePeer()) - status := r.handleNewHopStream(s) - if status != nil { - if err := status.WriteTo(s); err != nil { - log.Debugf("problem writing error status back: %s", err) - s.Close() - return - } +func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { + if !r.hop { + r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) + return } -} -func (r *Relay) handleNewHopStream(s inet.Stream) *RelayStatus { - info, err := r.readInfo(s) + src, err := peerToPeerInfo(msg.GetSrcPeer()) if err != nil { - return &RelayStatus{ - Code: StatusRelayAddrErr, - Message: err.Error(), - } + r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID) + return } - if info.ID == r.self { - return &RelayStatus{ - Code: StatusRelayHopToSelf, - Message: "relay hop attempted to self", - } + if src.ID != s.Conn().RemotePeer() { + r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID) + return } - ctp := r.host.Network().ConnsToPeer(info.ID) + dst, err := peerToPeerInfo(msg.GetDstPeer()) + if err != nil { + r.handleError(s, pb.CircuitRelay_HOP_DST_MULTIADDR_INVALID) + return + } + + if dst.ID == r.self { + r.handleError(s, pb.CircuitRelay_HOP_CANT_RELAY_TO_SELF) + return + } + + ctp := r.host.Network().ConnsToPeer(dst.ID) if len(ctp) == 0 { - return &RelayStatus{ - Code: StatusRelayNotConnected, - Message: "refusing to make new connection for relay", + if !r.active { + r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST) + return } - } - bs, err := r.host.NewStream(r.ctx, info.ID, StopID) - if err != nil { - return &RelayStatus{ - Code: StatusRelayStreamFailed, - Message: err.Error(), + ctx, cancel := context.WithTimeout(r.ctx, time.Second*10) + defer cancel() + err = r.host.Connect(ctx, dst) + if err != nil { + log.Debugf("error opening relay connection to %s: %s", dst.ID.Pretty(), err.Error()) + r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST) + return } } - // TODO: add helper method 'PeerID to multiaddr' - paddr, err := ma.NewMultiaddr("/ipfs/" + s.Conn().RemotePeer().Pretty()) + bs, err := r.host.NewStream(r.ctx, dst.ID, ProtoID) if err != nil { - return &RelayStatus{ - Code: StatusRelayAddrErr, - Message: err.Error(), - } + log.Debugf("error opening relay stream to %s: %s", dst.ID.Pretty(), err.Error()) + r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) + return } - p2pa := s.Conn().RemoteMultiaddr().Encapsulate(paddr) - if err := writeLpMultiaddr(bs, p2pa); err != nil { - return &RelayStatus{ - Code: StatusRelayStreamFailed, - Message: err.Error(), - } + err = r.writeResponse(s, pb.CircuitRelay_SUCCESS) + if err != nil { + log.Debugf("error writing relay response: %s", err.Error()) + s.Close() + return } + log.Infof("relaying connection between %s and %s", src.ID.Pretty(), dst.ID.Pretty()) + go func() { _, err := io.Copy(s, bs) if err != io.EOF && err != nil { @@ -191,6 +219,7 @@ func (r *Relay) handleNewHopStream(s inet.Stream) *RelayStatus { } s.Close() }() + go func() { _, err := io.Copy(bs, s) if err != io.EOF && err != nil { @@ -198,20 +227,63 @@ func (r *Relay) handleNewHopStream(s inet.Stream) *RelayStatus { } bs.Close() }() +} - return nil +func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) { + src, err := peerToPeerInfo(msg.GetSrcPeer()) + if err != nil || len(src.Addrs) == 0 { + r.handleError(s, pb.CircuitRelay_STOP_SRC_MULTIADDR_INVALID) + return + } + + dst, err := peerToPeerInfo(msg.GetDstPeer()) + if err != nil || dst.ID != r.self { + r.handleError(s, pb.CircuitRelay_STOP_DST_MULTIADDR_INVALID) + return + } + + log.Infof("relay connection from: %s", src.ID) + + r.host.Peerstore().AddAddrs(src.ID, src.Addrs, pstore.TempAddrTTL) + + select { + case r.incoming <- &Conn{Stream: s, remote: src}: + case <-time.After(RelayAcceptTimeout): + r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) + } } -func (r *Relay) readInfo(s inet.Stream) (*pstore.PeerInfo, error) { - addr, err := readLpMultiaddr(s) +func (r *Relay) handleCanHop(s inet.Stream, msg *pb.CircuitRelay) { + var err error + + if r.hop { + err = r.writeResponse(s, pb.CircuitRelay_SUCCESS) + } else { + err = r.writeResponse(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) + } + if err != nil { - return nil, err + log.Debugf("error writing relay response: %s", err.Error()) } - info, err := pstore.InfoFromP2pAddr(addr) + s.Close() +} + +func (r *Relay) handleError(s inet.Stream, code pb.CircuitRelay_Status) { + log.Warningf("relay error: %s (%d)", pb.CircuitRelay_Status_name[int32(code)], code) + err := r.writeResponse(s, code) if err != nil { - return nil, err + log.Debugf("error writing relay response: %s", err.Error()) } + s.Close() +} + +func (r *Relay) writeResponse(s inet.Stream, code pb.CircuitRelay_Status) error { + wr := ggio.NewDelimitedWriter(s) + + var msg pb.CircuitRelay + msg.Type = pb.CircuitRelay_STATUS.Enum() + msg.Code = code.Enum() - return info, nil + return wr.WriteMsg(&msg) } diff --git a/p2p/protocol/internal/circuitv1-deprecated/util.go b/p2p/protocol/internal/circuitv1-deprecated/util.go new file mode 100644 index 0000000000..d18970414d --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/util.go @@ -0,0 +1,47 @@ +package relay + +import ( + "errors" + + pb "github.com/libp2p/go-libp2p-circuit/pb" + + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" + mh "github.com/multiformats/go-multihash" +) + +func peerToPeerInfo(p *pb.CircuitRelay_Peer) (empty pstore.PeerInfo, err error) { + if p == nil { + return empty, errors.New("nil peer") + } + + h, err := mh.Cast(p.Id) + if err != nil { + return empty, err + } + + addrs := make([]ma.Multiaddr, len(p.Addrs)) + for i := 0; i < len(addrs); i++ { + a, err := ma.NewMultiaddrBytes(p.Addrs[i]) + if err != nil { + return empty, err + } + addrs[i] = a + } + + return pstore.PeerInfo{ID: peer.ID(h), Addrs: addrs}, nil +} + +func peerInfoToPeer(pi pstore.PeerInfo) *pb.CircuitRelay_Peer { + addrs := make([][]byte, len(pi.Addrs)) + for i := 0; i < len(addrs); i++ { + addrs[i] = pi.Addrs[i].Bytes() + } + + p := new(pb.CircuitRelay_Peer) + p.Id = []byte(pi.ID) + p.Addrs = addrs + + return p +} From ebdec667d319e8bd4cddfb2c1db31d8f4bfb0943 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 13:57:31 +0300 Subject: [PATCH 06/29] preserve pb status code type in RelayError --- p2p/protocol/internal/circuitv1-deprecated/relay.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 65a04375a1..1176841a85 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -47,11 +47,11 @@ var ( ) type RelayError struct { - Code int32 + Code pb.CircuitRelay_Status } func (e RelayError) Error() string { - return fmt.Sprintf("error opening relay circuit: %s (%d)", pb.CircuitRelay_Status_name[e.Code], e.Code) + return fmt.Sprintf("error opening relay circuit: %s (%d)", pb.CircuitRelay_Status_name[int32(e.Code)], e.Code) } func NewRelay(ctx context.Context, h host.Host, opts ...RelayOpt) (*Relay, error) { @@ -119,7 +119,7 @@ func (r *Relay) Dial(ctx context.Context, relay pstore.PeerInfo, dest pstore.Pee if msg.GetCode() != pb.CircuitRelay_SUCCESS { s.Close() - return nil, RelayError{int32(msg.GetCode())} + return nil, RelayError{msg.GetCode()} } return &Conn{Stream: s}, nil From 9e3ef8701d2b52d61afe71fca33d1602f9e7b206 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 14:18:44 +0300 Subject: [PATCH 07/29] hop relay must perform stop handshake; duh. --- .../internal/circuitv1-deprecated/relay.go | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 1176841a85..27cb51f43c 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -179,6 +179,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { return } + // open stream ctp := r.host.Network().ConnsToPeer(dst.ID) if len(ctp) == 0 { if !r.active { @@ -203,6 +204,44 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { return } + // stop handshake + rd := ggio.NewDelimitedReader(bs, maxMessageSize) + wr := ggio.NewDelimitedWriter(bs) + + msg.Type = pb.CircuitRelay_STOP.Enum() + + err = wr.WriteMsg(msg) + if err != nil { + log.Debugf("error writing stop handshake: %s", err.Error()) + bs.Close() + r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) + return + } + + msg.Reset() + + err = rd.ReadMsg(msg) + if err != nil { + log.Debugf("error reading stop response: %s", err.Error()) + bs.Close() + r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) + return + } + + if msg.GetType() != pb.CircuitRelay_STATUS { + log.Debugf("unexpected relay stop response: not a status message (%d)", msg.GetType()) + bs.Close() + r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) + return + } + + if msg.GetCode() != pb.CircuitRelay_SUCCESS { + log.Debugf("relay stop failure: %d", msg.GetCode()) + bs.Close() + r.handleError(s, msg.GetCode()) + return + } + err = r.writeResponse(s, pb.CircuitRelay_SUCCESS) if err != nil { log.Debugf("error writing relay response: %s", err.Error()) @@ -210,6 +249,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { return } + // relay connection log.Infof("relaying connection between %s and %s", src.ID.Pretty(), dst.ID.Pretty()) go func() { From 8941943d8dcd005419dadcf6c2d1b1c524da9b31 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 14:25:00 +0300 Subject: [PATCH 08/29] update test --- .../circuitv1-deprecated/relay_test.go | 119 +++++++++++++----- 1 file changed, 89 insertions(+), 30 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go index cb1a855021..c6f4516548 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go @@ -5,11 +5,13 @@ import ( "context" "fmt" "io/ioutil" - "strings" "testing" + "time" - bhost "github.com/libp2p/go-libp2p-blankhost" . "github.com/libp2p/go-libp2p-circuit" + pb "github.com/libp2p/go-libp2p-circuit/pb" + + bhost "github.com/libp2p/go-libp2p-blankhost" host "github.com/libp2p/go-libp2p-host" netutil "github.com/libp2p/go-libp2p-netutil" ma "github.com/multiformats/go-multiaddr" @@ -90,12 +92,13 @@ func TestBasicRelay(t *testing.T) { con.Close() }() - destma, err := ma.NewMultiaddr("/ipfs/" + hosts[2].ID().Pretty()) - if err != nil { - t.Fatal(err) - } + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - con, err := r1.Dial(ctx, hosts[1].ID(), destma) + rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + defer rcancel() + + con, err := r1.Dial(rctx, rinfo, dinfo) if err != nil { t.Fatal(err) } @@ -161,8 +164,11 @@ func TestBasicRelayDial(t *testing.T) { t.Fatal(err) } + rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + defer rcancel() + d := r1.Dialer() - con, err := d.DialPeer(ctx, hosts[2].ID(), relayaddr) + con, err := d.DialPeer(rctx, hosts[2].ID(), relayaddr) if err != nil { t.Fatal(err) } @@ -201,25 +207,34 @@ func TestRelayThroughNonHop(t *testing.T) { t.Fatal(err) } - destma, err := ma.NewMultiaddr("/ipfs/" + hosts[2].ID().Pretty()) - if err != nil { - t.Fatal(err) + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) + + rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + defer rcancel() + + _, err = r1.Dial(rctx, rinfo, dinfo) + if err == nil { + t.Fatal("expected error") } - _, err = r1.Dial(ctx, hosts[1].ID(), destma) - if err.Error() != "protocol not supported" { - t.Fatal("expected 'protocol not supported' error") + rerr, ok := err.(RelayError) + if !ok { + t.Fatalf("expected RelayError: %#v", err) + } + + if rerr.Code != pb.CircuitRelay_HOP_CANT_SPEAK_RELAY { + t.Fatal("expected 'HOP_CANT_SPEAK_RELAY' error") } } -func TestDestNoRelay(t *testing.T) { +func TestRelayNoDestConnection(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 3) connect(t, hosts[0], hosts[1]) - connect(t, hosts[1], hosts[2]) r1, err := NewRelay(ctx, hosts[0]) if err != nil { @@ -231,21 +246,28 @@ func TestDestNoRelay(t *testing.T) { t.Fatal(err) } - destma, err := ma.NewMultiaddr("/ipfs/" + hosts[2].ID().Pretty()) - if err != nil { - t.Fatal(err) + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) + + rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + defer rcancel() + + _, err = r1.Dial(rctx, rinfo, dinfo) + if err == nil { + t.Fatal("expected error") } - for i := 0; i < 10; i++ { - destma = ma.Join(destma, destma) + + rerr, ok := err.(RelayError) + if !ok { + t.Fatalf("expected RelayError: %#v", err) } - _, err = r1.Dial(ctx, hosts[1].ID(), destma) - if !strings.HasPrefix(err.Error(), fmt.Sprintf("%d: address length was too long", StatusRelayAddrErr)) { - t.Fatal(err) + if rerr.Code != pb.CircuitRelay_HOP_NO_CONN_TO_DST { + t.Fatal("expected 'HOP_NO_CONN_TO_DST' error") } } -func TestRelayNoDestConnection(t *testing.T) { +func TestActiveRelay(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -258,18 +280,55 @@ func TestRelayNoDestConnection(t *testing.T) { t.Fatal(err) } - _, err = NewRelay(ctx, hosts[1], OptHop) + _, err = NewRelay(ctx, hosts[1], OptHop, OptActive) + if err != nil { + t.Fatal(err) + } + + r3, err := NewRelay(ctx, hosts[2]) + if err != nil { + t.Fatal(err) + } + + msg := []byte("relay works!") + go func() { + list, err := r3.Listener() + if err != nil { + t.Error(err) + return + } + + con, err := list.Accept() + if err != nil { + t.Error(err) + return + } + + _, err = con.Write(msg) + if err != nil { + t.Error("failed to write", err) + return + } + con.Close() + }() + + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) + + rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + defer rcancel() + + con, err := r1.Dial(rctx, rinfo, dinfo) if err != nil { t.Fatal(err) } - destma, err := ma.NewMultiaddr("/ipfs/" + hosts[2].ID().Pretty()) + data, err := ioutil.ReadAll(con) if err != nil { t.Fatal(err) } - _, err = r1.Dial(ctx, hosts[1].ID(), destma) - if err.Error() != "260: refusing to make new connection for relay" { - t.Fatal("expected this not to work") + if !bytes.Equal(data, msg) { + t.Fatal("message was incorrect:", string(data)) } } From 3943597bcec0f6159e772b94bbc4957bf51ee3c2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 15:26:54 +0300 Subject: [PATCH 09/29] relay: return HOP_CANT_OPEN_DST_STREAM on stop stream errors --- p2p/protocol/internal/circuitv1-deprecated/relay.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 27cb51f43c..03cb38a6b4 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -214,7 +214,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { if err != nil { log.Debugf("error writing stop handshake: %s", err.Error()) bs.Close() - r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) + r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) return } @@ -224,14 +224,14 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { if err != nil { log.Debugf("error reading stop response: %s", err.Error()) bs.Close() - r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) + r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) return } if msg.GetType() != pb.CircuitRelay_STATUS { log.Debugf("unexpected relay stop response: not a status message (%d)", msg.GetType()) bs.Close() - r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) + r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) return } From 1cee41522ddca3fc0e392d401e511a69a9804c21 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 15:37:24 +0300 Subject: [PATCH 10/29] hop relay: close bs on status response error --- p2p/protocol/internal/circuitv1-deprecated/relay.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 03cb38a6b4..d37cab213a 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -245,6 +245,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { err = r.writeResponse(s, pb.CircuitRelay_SUCCESS) if err != nil { log.Debugf("error writing relay response: %s", err.Error()) + bs.Close() s.Close() return } From 1ad69518d15546f34ac4ef9e52bd72d88c4959c1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 16:28:30 +0300 Subject: [PATCH 11/29] relay: use Peerstore to get self peer info, log bytes relayed --- p2p/protocol/internal/circuitv1-deprecated/relay.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index d37cab213a..7b855edca8 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -95,7 +95,7 @@ func (r *Relay) Dial(ctx context.Context, relay pstore.PeerInfo, dest pstore.Pee var msg pb.CircuitRelay msg.Type = pb.CircuitRelay_HOP.Enum() - msg.SrcPeer = peerInfoToPeer(pstore.PeerInfo{r.self, r.host.Addrs()}) + msg.SrcPeer = peerInfoToPeer(r.host.Peerstore().PeerInfo(r.self)) msg.DstPeer = peerInfoToPeer(dest) err = wr.WriteMsg(&msg) @@ -254,19 +254,21 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { log.Infof("relaying connection between %s and %s", src.ID.Pretty(), dst.ID.Pretty()) go func() { - _, err := io.Copy(s, bs) + count, err := io.Copy(s, bs) if err != io.EOF && err != nil { log.Debugf("relay copy error: %s", err) } s.Close() + log.Debugf("relayed %d bytes from %s to %s", count, dst.ID.Pretty(), src.ID.Pretty()) }() go func() { - _, err := io.Copy(bs, s) + count, err := io.Copy(bs, s) if err != io.EOF && err != nil { log.Debugf("relay copy error: %s", err) } bs.Close() + log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty()) }() } From 9883786568b913954e76c36b0ba1d8bd1171dd0b Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 18:32:07 +0300 Subject: [PATCH 12/29] relay: poluate remote field in dialed Conn --- p2p/protocol/internal/circuitv1-deprecated/relay.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 7b855edca8..25977e6522 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -122,7 +122,7 @@ func (r *Relay) Dial(ctx context.Context, relay pstore.PeerInfo, dest pstore.Pee return nil, RelayError{msg.GetCode()} } - return &Conn{Stream: s}, nil + return &Conn{Stream: s, remote: dest}, nil } func (r *Relay) handleNewStream(s inet.Stream) { From 37e58194250f31d57158779cf2504c84624e2a97 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 21:39:34 +0300 Subject: [PATCH 13/29] conn: connection ID, fix RemoteMultiaddr panic --- p2p/protocol/internal/circuitv1-deprecated/conn.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/conn.go b/p2p/protocol/internal/circuitv1-deprecated/conn.go index f165647028..16c08459aa 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/conn.go +++ b/p2p/protocol/internal/circuitv1-deprecated/conn.go @@ -42,7 +42,7 @@ func (c *Conn) RemoteAddr() net.Addr { } func (c *Conn) RemoteMultiaddr() ma.Multiaddr { - a, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/%s", c.remote.ID.Pretty(), c.Conn().RemotePeer())) + a, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/ipfs/%s", c.remote.ID.Pretty(), c.Conn().RemotePeer().Pretty())) if err != nil { panic(err) } @@ -83,5 +83,5 @@ func (c *Conn) RemotePublicKey() ic.PubKey { } func (c *Conn) ID() string { - return "TODO: relay conn ID" + return iconn.ID(c) } From 30adc968b1777e22d400842911a971d8a1a6e29f Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 17 Jul 2017 22:20:28 +0300 Subject: [PATCH 14/29] fix relay message input: ggio delimited reader buffers so we hand-craft a compatible delimited reader which doesn't. --- .../internal/circuitv1-deprecated/relay.go | 13 +++--- .../internal/circuitv1-deprecated/util.go | 43 +++++++++++++++++++ 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 25977e6522..7f926fd1c9 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -9,7 +9,6 @@ import ( pb "github.com/libp2p/go-libp2p-circuit/pb" - ggio "github.com/gogo/protobuf/io" logging "github.com/ipfs/go-log" host "github.com/libp2p/go-libp2p-host" inet "github.com/libp2p/go-libp2p-net" @@ -89,8 +88,8 @@ func (r *Relay) Dial(ctx context.Context, relay pstore.PeerInfo, dest pstore.Pee return nil, err } - rd := ggio.NewDelimitedReader(s, maxMessageSize) - wr := ggio.NewDelimitedWriter(s) + rd := newDelimitedReader(s, maxMessageSize) + wr := newDelimitedWriter(s) var msg pb.CircuitRelay @@ -128,7 +127,7 @@ func (r *Relay) Dial(ctx context.Context, relay pstore.PeerInfo, dest pstore.Pee func (r *Relay) handleNewStream(s inet.Stream) { log.Infof("new relay stream from: %s", s.Conn().RemotePeer()) - rd := ggio.NewDelimitedReader(s, maxMessageSize) + rd := newDelimitedReader(s, maxMessageSize) var msg pb.CircuitRelay @@ -205,8 +204,8 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { } // stop handshake - rd := ggio.NewDelimitedReader(bs, maxMessageSize) - wr := ggio.NewDelimitedWriter(bs) + rd := newDelimitedReader(bs, maxMessageSize) + wr := newDelimitedWriter(bs) msg.Type = pb.CircuitRelay_STOP.Enum() @@ -322,7 +321,7 @@ func (r *Relay) handleError(s inet.Stream, code pb.CircuitRelay_Status) { } func (r *Relay) writeResponse(s inet.Stream, code pb.CircuitRelay_Status) error { - wr := ggio.NewDelimitedWriter(s) + wr := newDelimitedWriter(s) var msg pb.CircuitRelay msg.Type = pb.CircuitRelay_STATUS.Enum() diff --git a/p2p/protocol/internal/circuitv1-deprecated/util.go b/p2p/protocol/internal/circuitv1-deprecated/util.go index d18970414d..58e64657c2 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/util.go +++ b/p2p/protocol/internal/circuitv1-deprecated/util.go @@ -1,10 +1,14 @@ package relay import ( + "encoding/binary" "errors" + "io" pb "github.com/libp2p/go-libp2p-circuit/pb" + ggio "github.com/gogo/protobuf/io" + proto "github.com/gogo/protobuf/proto" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" @@ -45,3 +49,42 @@ func peerInfoToPeer(pi pstore.PeerInfo) *pb.CircuitRelay_Peer { return p } + +type delimitedReader struct { + r io.Reader + buf []byte +} + +// the gogo protobuf NewDelimitedReader is buffered, which may eat up stream data +func newDelimitedReader(r io.Reader, maxSize int) *delimitedReader { + return &delimitedReader{r: r, buf: make([]byte, maxSize)} +} + +func (d *delimitedReader) ReadByte() (byte, error) { + buf := d.buf[:1] + _, err := d.r.Read(buf) + return buf[0], err +} + +func (d *delimitedReader) ReadMsg(msg proto.Message) error { + mlen, err := binary.ReadUvarint(d) + if err != nil { + return err + } + + if uint64(len(d.buf)) < mlen { + return errors.New("Message too large") + } + + buf := d.buf[:mlen] + _, err = io.ReadFull(d.r, buf) + if err != nil { + return err + } + + return proto.Unmarshal(buf, msg) +} + +func newDelimitedWriter(w io.Writer) ggio.WriteCloser { + return ggio.NewDelimitedWriter(w) +} From aadf0c53716bf7c49c79b2af48eddb0581eb4e71 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 10:59:03 +0300 Subject: [PATCH 15/29] conn: fix RemoteMultiaddr consistency so that it works for both active and passive connections. --- p2p/protocol/internal/circuitv1-deprecated/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/conn.go b/p2p/protocol/internal/circuitv1-deprecated/conn.go index 16c08459aa..86cdd14718 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/conn.go +++ b/p2p/protocol/internal/circuitv1-deprecated/conn.go @@ -42,7 +42,7 @@ func (c *Conn) RemoteAddr() net.Addr { } func (c *Conn) RemoteMultiaddr() ma.Multiaddr { - a, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/ipfs/%s", c.remote.ID.Pretty(), c.Conn().RemotePeer().Pretty())) + a, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/ipfs/%s", c.Conn().RemotePeer().Pretty(), c.remote.ID.Pretty())) if err != nil { panic(err) } From 93ae30dd90903b1f6e013a100840e568dbf5c93b Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 13:41:04 +0300 Subject: [PATCH 16/29] don't connect explicitly (unnecessary), add comments for non obvious things per @Stebalien's review comments. --- .../internal/circuitv1-deprecated/listen.go | 2 ++ .../internal/circuitv1-deprecated/relay.go | 32 ++++++++----------- .../internal/circuitv1-deprecated/util.go | 19 ++++++++--- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/listen.go b/p2p/protocol/internal/circuitv1-deprecated/listen.go index 8cc096eb8c..db6c7e4e2f 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/listen.go +++ b/p2p/protocol/internal/circuitv1-deprecated/listen.go @@ -33,6 +33,8 @@ func (l *RelayListener) Accept() (tpt.Conn, error) { err := l.Relay().writeResponse(c.Stream, pb.CircuitRelay_SUCCESS) if err != nil { log.Debugf("error writing relay response: %s", err.Error()) + // this won't prevent the other side from continuing to write + // TODO fully close the stream when Reset is implemented c.Stream.Close() return nil, err } diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 7f926fd1c9..7c36c7cbd8 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -23,6 +23,7 @@ const ProtoID = "/libp2p/circuit/relay/0.1.0" const maxMessageSize = 4096 var RelayAcceptTimeout = time.Minute +var HopConnectTimeout = 2 * time.Second type Relay struct { host host.Host @@ -78,10 +79,8 @@ func NewRelay(ctx context.Context, h host.Host, opts ...RelayOpt) (*Relay, error } func (r *Relay) Dial(ctx context.Context, relay pstore.PeerInfo, dest pstore.PeerInfo) (*Conn, error) { - err := r.host.Connect(ctx, relay) - if err != nil { - return nil, err - } + + r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, pstore.TempAddrTTL) s, err := r.host.NewStream(ctx, relay.ID, ProtoID) if err != nil { @@ -180,26 +179,21 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { // open stream ctp := r.host.Network().ConnsToPeer(dst.ID) - if len(ctp) == 0 { - if !r.active { - r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST) - return - } - ctx, cancel := context.WithTimeout(r.ctx, time.Second*10) - defer cancel() - err = r.host.Connect(ctx, dst) - if err != nil { - log.Debugf("error opening relay connection to %s: %s", dst.ID.Pretty(), err.Error()) - r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST) - return - } + if len(ctp) == 0 && !r.active { + r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST) + return } - bs, err := r.host.NewStream(r.ctx, dst.ID, ProtoID) + r.host.Peerstore().AddAddrs(dst.ID, dst.Addrs, pstore.TempAddrTTL) + + ctx, cancel := context.WithTimeout(r.ctx, HopConnectTimeout) + defer cancel() + + bs, err := r.host.NewStream(ctx, dst.ID, ProtoID) if err != nil { log.Debugf("error opening relay stream to %s: %s", dst.ID.Pretty(), err.Error()) - r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) + r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST) return } diff --git a/p2p/protocol/internal/circuitv1-deprecated/util.go b/p2p/protocol/internal/circuitv1-deprecated/util.go index 58e64657c2..b06c1aafe1 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/util.go +++ b/p2p/protocol/internal/circuitv1-deprecated/util.go @@ -15,21 +15,21 @@ import ( mh "github.com/multiformats/go-multihash" ) -func peerToPeerInfo(p *pb.CircuitRelay_Peer) (empty pstore.PeerInfo, err error) { +func peerToPeerInfo(p *pb.CircuitRelay_Peer) (pstore.PeerInfo, error) { if p == nil { - return empty, errors.New("nil peer") + return pstore.PeerInfo{}, errors.New("nil peer") } h, err := mh.Cast(p.Id) if err != nil { - return empty, err + return pstore.PeerInfo{}, err } addrs := make([]ma.Multiaddr, len(p.Addrs)) for i := 0; i < len(addrs); i++ { a, err := ma.NewMultiaddrBytes(p.Addrs[i]) if err != nil { - return empty, err + return pstore.PeerInfo{}, err } addrs[i] = a } @@ -55,7 +55,16 @@ type delimitedReader struct { buf []byte } -// the gogo protobuf NewDelimitedReader is buffered, which may eat up stream data +// The gogo protobuf NewDelimitedReader is buffered, which may eat up stream data. +// So we need to implement a compatible delimited reader that reads unbuffered. +// There is a slowdown from unbuffered reading: when reading the message +// it can take multiple single byte Reads to read the length and another Read +// to read the message payload. +// However, this is not critical performance degradation as +// - the reader is utilized to read one (dialer, stop) or two messages (hop) during +// the handshake, so it's a drop in the water for the connection lifetime. +// - messages are small (max 4k) and the length fits in a couple of bytes, +// so overall we have at most three reads per message. func newDelimitedReader(r io.Reader, maxSize int) *delimitedReader { return &delimitedReader{r: r, buf: make([]byte, maxSize)} } From 2b8bc8164622a776de6a98021d96fd4013be2671 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 13:49:51 +0300 Subject: [PATCH 17/29] set HopConnectTimeout to 10s connection might have to resolve through the DHT, so it can take a while. --- p2p/protocol/internal/circuitv1-deprecated/relay.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 7c36c7cbd8..3b57452c57 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -23,7 +23,7 @@ const ProtoID = "/libp2p/circuit/relay/0.1.0" const maxMessageSize = 4096 var RelayAcceptTimeout = time.Minute -var HopConnectTimeout = 2 * time.Second +var HopConnectTimeout = 10 * time.Second type Relay struct { host host.Host From a556e2d9ebe35686043e7909d639c089d1e1944d Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 14:26:32 +0300 Subject: [PATCH 18/29] relay_test: more robust test --- .../circuitv1-deprecated/relay_test.go | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go index c6f4516548..6fe1255f23 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go @@ -55,6 +55,8 @@ func TestBasicRelay(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[1], hosts[2]) + time.Sleep(10 * time.Millisecond) + r1, err := NewRelay(ctx, hosts[0]) if err != nil { t.Fatal(err) @@ -86,7 +88,7 @@ func TestBasicRelay(t *testing.T) { _, err = con.Write(msg) if err != nil { - t.Error("failed to write", err) + t.Error(err) return } con.Close() @@ -95,7 +97,7 @@ func TestBasicRelay(t *testing.T) { rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + rctx, rcancel := context.WithTimeout(ctx, time.Second) defer rcancel() con, err := r1.Dial(rctx, rinfo, dinfo) @@ -122,6 +124,8 @@ func TestBasicRelayDial(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[1], hosts[2]) + time.Sleep(10 * time.Millisecond) + r1, err := NewRelay(ctx, hosts[0]) if err != nil { t.Fatal(err) @@ -153,7 +157,7 @@ func TestBasicRelayDial(t *testing.T) { _, err = con.Write(msg) if err != nil { - t.Error("failed to write", err) + t.Error(err) return } con.Close() @@ -164,7 +168,7 @@ func TestBasicRelayDial(t *testing.T) { t.Fatal(err) } - rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + rctx, rcancel := context.WithTimeout(ctx, time.Second) defer rcancel() d := r1.Dialer() @@ -192,6 +196,8 @@ func TestRelayThroughNonHop(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[1], hosts[2]) + time.Sleep(10 * time.Millisecond) + r1, err := NewRelay(ctx, hosts[0]) if err != nil { t.Fatal(err) @@ -210,7 +216,7 @@ func TestRelayThroughNonHop(t *testing.T) { rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + rctx, rcancel := context.WithTimeout(ctx, time.Second) defer rcancel() _, err = r1.Dial(rctx, rinfo, dinfo) @@ -236,6 +242,8 @@ func TestRelayNoDestConnection(t *testing.T) { connect(t, hosts[0], hosts[1]) + time.Sleep(10 * time.Millisecond) + r1, err := NewRelay(ctx, hosts[0]) if err != nil { t.Fatal(err) @@ -249,7 +257,7 @@ func TestRelayNoDestConnection(t *testing.T) { rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + rctx, rcancel := context.WithTimeout(ctx, time.Second) defer rcancel() _, err = r1.Dial(rctx, rinfo, dinfo) @@ -275,6 +283,8 @@ func TestActiveRelay(t *testing.T) { connect(t, hosts[0], hosts[1]) + time.Sleep(10 * time.Millisecond) + r1, err := NewRelay(ctx, hosts[0]) if err != nil { t.Fatal(err) @@ -306,7 +316,7 @@ func TestActiveRelay(t *testing.T) { _, err = con.Write(msg) if err != nil { - t.Error("failed to write", err) + t.Error(err) return } con.Close() @@ -315,7 +325,7 @@ func TestActiveRelay(t *testing.T) { rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) + rctx, rcancel := context.WithTimeout(ctx, time.Second) defer rcancel() con, err := r1.Dial(rctx, rinfo, dinfo) From 80dbe02b30a1c75b62f87094102d4b6e196641e8 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 15:15:40 +0300 Subject: [PATCH 19/29] dialer: implement transport.Dialer interface --- .../internal/circuitv1-deprecated/dial.go | 16 ++++++++++++---- .../internal/circuitv1-deprecated/relay.go | 2 +- .../internal/circuitv1-deprecated/relay_test.go | 12 ++++++------ 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/dial.go b/p2p/protocol/internal/circuitv1-deprecated/dial.go index 936cc21d71..f48e4138de 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/dial.go +++ b/p2p/protocol/internal/circuitv1-deprecated/dial.go @@ -4,12 +4,13 @@ import ( "context" "fmt" - peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" tpt "github.com/libp2p/go-libp2p-transport" ma "github.com/multiformats/go-multiaddr" ) +var _ tpt.Dialer = (*Dialer)(nil) + type Dialer Relay func (d *Dialer) Relay() *Relay { @@ -20,7 +21,11 @@ func (r *Relay) Dialer() *Dialer { return (*Dialer)(r) } -func (d *Dialer) DialPeer(ctx context.Context, p peer.ID, a ma.Multiaddr) (tpt.Conn, error) { +func (d *Dialer) Dial(a ma.Multiaddr) (tpt.Conn, error) { + return d.DialContext(d.ctx, a) +} + +func (d *Dialer) DialContext(ctx context.Context, a ma.Multiaddr) (tpt.Conn, error) { if !d.Matches(a) { return nil, fmt.Errorf("%s is not a relay address", a) } @@ -42,9 +47,12 @@ func (d *Dialer) DialPeer(ctx context.Context, p peer.ID, a ma.Multiaddr) (tpt.C return nil, err } - dinfo := pstore.PeerInfo{ID: p, Addrs: []ma.Multiaddr{destaddr}} + dinfo, err := pstore.InfoFromP2pAddr(destaddr) + if err != nil { + return nil, err + } - return d.Relay().Dial(ctx, *rinfo, dinfo) + return d.Relay().DialPeer(ctx, *rinfo, *dinfo) } func (d *Dialer) Matches(a ma.Multiaddr) bool { diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 3b57452c57..09ec1eb73c 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -78,7 +78,7 @@ func NewRelay(ctx context.Context, h host.Host, opts ...RelayOpt) (*Relay, error return r, nil } -func (r *Relay) Dial(ctx context.Context, relay pstore.PeerInfo, dest pstore.PeerInfo) (*Conn, error) { +func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore.PeerInfo) (*Conn, error) { r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, pstore.TempAddrTTL) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go index 6fe1255f23..bf5f322d91 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go @@ -100,7 +100,7 @@ func TestBasicRelay(t *testing.T) { rctx, rcancel := context.WithTimeout(ctx, time.Second) defer rcancel() - con, err := r1.Dial(rctx, rinfo, dinfo) + con, err := r1.DialPeer(rctx, rinfo, dinfo) if err != nil { t.Fatal(err) } @@ -163,7 +163,7 @@ func TestBasicRelayDial(t *testing.T) { con.Close() }() - relayaddr, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit", hosts[1].ID().Pretty())) + addr, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/ipfs/%s", hosts[1].ID().Pretty(), hosts[2].ID().Pretty())) if err != nil { t.Fatal(err) } @@ -172,7 +172,7 @@ func TestBasicRelayDial(t *testing.T) { defer rcancel() d := r1.Dialer() - con, err := d.DialPeer(rctx, hosts[2].ID(), relayaddr) + con, err := d.DialContext(rctx, addr) if err != nil { t.Fatal(err) } @@ -219,7 +219,7 @@ func TestRelayThroughNonHop(t *testing.T) { rctx, rcancel := context.WithTimeout(ctx, time.Second) defer rcancel() - _, err = r1.Dial(rctx, rinfo, dinfo) + _, err = r1.DialPeer(rctx, rinfo, dinfo) if err == nil { t.Fatal("expected error") } @@ -260,7 +260,7 @@ func TestRelayNoDestConnection(t *testing.T) { rctx, rcancel := context.WithTimeout(ctx, time.Second) defer rcancel() - _, err = r1.Dial(rctx, rinfo, dinfo) + _, err = r1.DialPeer(rctx, rinfo, dinfo) if err == nil { t.Fatal("expected error") } @@ -328,7 +328,7 @@ func TestActiveRelay(t *testing.T) { rctx, rcancel := context.WithTimeout(ctx, time.Second) defer rcancel() - con, err := r1.Dial(rctx, rinfo, dinfo) + con, err := r1.DialPeer(rctx, rinfo, dinfo) if err != nil { t.Fatal(err) } From d442f301dee94a63137b7777faa8d514eb350873 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 15:28:54 +0300 Subject: [PATCH 20/29] listen: implement transport.Listener panic stubs --- .../internal/circuitv1-deprecated/listen.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/listen.go b/p2p/protocol/internal/circuitv1-deprecated/listen.go index db6c7e4e2f..b271c0f465 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/listen.go +++ b/p2p/protocol/internal/circuitv1-deprecated/listen.go @@ -1,6 +1,7 @@ package relay import ( + "fmt" "net" pb "github.com/libp2p/go-libp2p-circuit/pb" @@ -23,10 +24,6 @@ func (r *Relay) Listener() (tpt.Listener, error) { return (*RelayListener)(r), nil } -func (r *Relay) Matches(a ma.Multiaddr) bool { - return false -} - func (l *RelayListener) Accept() (tpt.Conn, error) { select { case c := <-l.incoming: @@ -48,15 +45,22 @@ func (l *RelayListener) Accept() (tpt.Conn, error) { } func (l *RelayListener) Addr() net.Addr { - panic("oh no") + return &NetAddr{ + Relay: "any", + Remote: "any", + } } func (l *RelayListener) Multiaddr() ma.Multiaddr { - panic("oh no") + a, err := ma.NewMultiaddr(fmt.Sprintf("/p2p-circuit/ipfs/%s", l.self.Pretty())) + if err != nil { + panic(err) + } + return a } func (l *RelayListener) LocalPeer() peer.ID { - return l.Relay().self + return l.self } func (l *RelayListener) SetAddrFilters(f *filter.Filters) { From f26747379662f443dc0855be7aa9a96b79e56e90 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 18:23:30 +0300 Subject: [PATCH 21/29] transport: implement transport.Transport interface --- .../internal/circuitv1-deprecated/conn.go | 5 +-- .../internal/circuitv1-deprecated/relay.go | 4 +-- .../circuitv1-deprecated/transport.go | 33 +++++++++++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/conn.go b/p2p/protocol/internal/circuitv1-deprecated/conn.go index 86cdd14718..35043e5e39 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/conn.go +++ b/p2p/protocol/internal/circuitv1-deprecated/conn.go @@ -16,7 +16,8 @@ import ( type Conn struct { inet.Stream - remote pstore.PeerInfo + remote pstore.PeerInfo + transport tpt.Transport } var _ iconn.Conn = (*Conn)(nil) @@ -63,7 +64,7 @@ func (c *Conn) LocalAddr() net.Addr { } func (c *Conn) Transport() tpt.Transport { - panic("does anyone really call this?") + return c.transport } func (c *Conn) LocalPeer() peer.ID { diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 09ec1eb73c..ef8bb7e4fc 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -120,7 +120,7 @@ func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore return nil, RelayError{msg.GetCode()} } - return &Conn{Stream: s, remote: dest}, nil + return &Conn{Stream: s, remote: dest, transport: r.Transport()}, nil } func (r *Relay) handleNewStream(s inet.Stream) { @@ -283,7 +283,7 @@ func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) { r.host.Peerstore().AddAddrs(src.ID, src.Addrs, pstore.TempAddrTTL) select { - case r.incoming <- &Conn{Stream: s, remote: src}: + case r.incoming <- &Conn{Stream: s, remote: src, transport: r.Transport()}: case <-time.After(RelayAcceptTimeout): r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) } diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport.go b/p2p/protocol/internal/circuitv1-deprecated/transport.go index 24c09fd874..2a3d71ea49 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/transport.go +++ b/p2p/protocol/internal/circuitv1-deprecated/transport.go @@ -1,6 +1,9 @@ package relay import ( + "fmt" + + tpt "github.com/libp2p/go-libp2p-transport" ma "github.com/multiformats/go-multiaddr" ) @@ -15,3 +18,33 @@ var RelayMaddrProtocol = ma.Protocol{ func init() { ma.AddProtocol(RelayMaddrProtocol) } + +var _ tpt.Transport = (*Transport)(nil) + +type Transport Relay + +func (t *Transport) Relay() *Relay { + return (*Relay)(t) +} + +func (r *Relay) Transport() *Transport { + return (*Transport)(r) +} + +func (t *Transport) Dialer(laddr ma.Multiaddr, opts ...tpt.DialOpt) (tpt.Dialer, error) { + if !t.Matches(laddr) { + return nil, fmt.Errorf("%s is not a relay address", laddr) + } + return t.Relay().Dialer(), nil +} + +func (t *Transport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) { + if !t.Matches(laddr) { + return nil, fmt.Errorf("%s is not a relay address", laddr) + } + return t.Relay().Listener() +} + +func (t *Transport) Matches(a ma.Multiaddr) bool { + return t.Relay().Dialer().Matches(a) +} From d0ae18b80943eb082daef6ae7593c83378dabedc Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 19:02:13 +0300 Subject: [PATCH 22/29] relay: only AddAddrs if there are some --- p2p/protocol/internal/circuitv1-deprecated/relay.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index ef8bb7e4fc..b0ba572f99 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -80,7 +80,9 @@ func NewRelay(ctx context.Context, h host.Host, opts ...RelayOpt) (*Relay, error func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore.PeerInfo) (*Conn, error) { - r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, pstore.TempAddrTTL) + if len(relay.Addrs) > 0 { + r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, pstore.TempAddrTTL) + } s, err := r.host.NewStream(ctx, relay.ID, ProtoID) if err != nil { @@ -185,7 +187,9 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { return } - r.host.Peerstore().AddAddrs(dst.ID, dst.Addrs, pstore.TempAddrTTL) + if len(dst.Addrs) > 0 { + r.host.Peerstore().AddAddrs(dst.ID, dst.Addrs, pstore.TempAddrTTL) + } ctx, cancel := context.WithTimeout(r.ctx, HopConnectTimeout) defer cancel() From d6401c780381e4a5e97cb5099243da218ddbc31e Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 20:16:26 +0300 Subject: [PATCH 23/29] relay: remove unused fields from Relay --- p2p/protocol/internal/circuitv1-deprecated/relay.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index b0ba572f99..7e7a0ad9f1 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "sync" "time" pb "github.com/libp2p/go-libp2p-circuit/pb" @@ -34,9 +33,6 @@ type Relay struct { hop bool incoming chan *Conn - - arLk sync.Mutex - activeRelays []*Conn } type RelayOpt int From 7428b11e553b147816435c9d7835894d9787b621 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 18 Jul 2017 20:28:32 +0300 Subject: [PATCH 24/29] interface - type naming consistency: RelayDialer and RelayTransport --- .../internal/circuitv1-deprecated/dial.go | 16 ++++++++-------- .../internal/circuitv1-deprecated/listen.go | 4 ++-- .../circuitv1-deprecated/relay_test.go | 18 +++--------------- .../internal/circuitv1-deprecated/transport.go | 18 +++++++++--------- 4 files changed, 22 insertions(+), 34 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/dial.go b/p2p/protocol/internal/circuitv1-deprecated/dial.go index f48e4138de..8e543e7b86 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/dial.go +++ b/p2p/protocol/internal/circuitv1-deprecated/dial.go @@ -9,23 +9,23 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -var _ tpt.Dialer = (*Dialer)(nil) +var _ tpt.Dialer = (*RelayDialer)(nil) -type Dialer Relay +type RelayDialer Relay -func (d *Dialer) Relay() *Relay { +func (d *RelayDialer) Relay() *Relay { return (*Relay)(d) } -func (r *Relay) Dialer() *Dialer { - return (*Dialer)(r) +func (r *Relay) Dialer() *RelayDialer { + return (*RelayDialer)(r) } -func (d *Dialer) Dial(a ma.Multiaddr) (tpt.Conn, error) { +func (d *RelayDialer) Dial(a ma.Multiaddr) (tpt.Conn, error) { return d.DialContext(d.ctx, a) } -func (d *Dialer) DialContext(ctx context.Context, a ma.Multiaddr) (tpt.Conn, error) { +func (d *RelayDialer) DialContext(ctx context.Context, a ma.Multiaddr) (tpt.Conn, error) { if !d.Matches(a) { return nil, fmt.Errorf("%s is not a relay address", a) } @@ -55,7 +55,7 @@ func (d *Dialer) DialContext(ctx context.Context, a ma.Multiaddr) (tpt.Conn, err return d.Relay().DialPeer(ctx, *rinfo, *dinfo) } -func (d *Dialer) Matches(a ma.Multiaddr) bool { +func (d *RelayDialer) Matches(a ma.Multiaddr) bool { _, err := a.ValueForProtocol(P_CIRCUIT) return err == nil } diff --git a/p2p/protocol/internal/circuitv1-deprecated/listen.go b/p2p/protocol/internal/circuitv1-deprecated/listen.go index b271c0f465..dfec479ab3 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/listen.go +++ b/p2p/protocol/internal/circuitv1-deprecated/listen.go @@ -20,8 +20,8 @@ func (l *RelayListener) Relay() *Relay { return (*Relay)(l) } -func (r *Relay) Listener() (tpt.Listener, error) { - return (*RelayListener)(r), nil +func (r *Relay) Listener() *RelayListener { + return (*RelayListener)(r) } func (l *RelayListener) Accept() (tpt.Conn, error) { diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go index bf5f322d91..5ad1c7b2fb 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go @@ -74,11 +74,7 @@ func TestBasicRelay(t *testing.T) { msg := []byte("relay works!") go func() { - list, err := r3.Listener() - if err != nil { - t.Error(err) - return - } + list := r3.Listener() con, err := list.Accept() if err != nil { @@ -143,11 +139,7 @@ func TestBasicRelayDial(t *testing.T) { msg := []byte("relay works!") go func() { - list, err := r3.Listener() - if err != nil { - t.Error(err) - return - } + list := r3.Listener() con, err := list.Accept() if err != nil { @@ -302,11 +294,7 @@ func TestActiveRelay(t *testing.T) { msg := []byte("relay works!") go func() { - list, err := r3.Listener() - if err != nil { - t.Error(err) - return - } + list := r3.Listener() con, err := list.Accept() if err != nil { diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport.go b/p2p/protocol/internal/circuitv1-deprecated/transport.go index 2a3d71ea49..52dc86bd5f 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/transport.go +++ b/p2p/protocol/internal/circuitv1-deprecated/transport.go @@ -19,32 +19,32 @@ func init() { ma.AddProtocol(RelayMaddrProtocol) } -var _ tpt.Transport = (*Transport)(nil) +var _ tpt.Transport = (*RelayTransport)(nil) -type Transport Relay +type RelayTransport Relay -func (t *Transport) Relay() *Relay { +func (t *RelayTransport) Relay() *Relay { return (*Relay)(t) } -func (r *Relay) Transport() *Transport { - return (*Transport)(r) +func (r *Relay) Transport() *RelayTransport { + return (*RelayTransport)(r) } -func (t *Transport) Dialer(laddr ma.Multiaddr, opts ...tpt.DialOpt) (tpt.Dialer, error) { +func (t *RelayTransport) Dialer(laddr ma.Multiaddr, opts ...tpt.DialOpt) (tpt.Dialer, error) { if !t.Matches(laddr) { return nil, fmt.Errorf("%s is not a relay address", laddr) } return t.Relay().Dialer(), nil } -func (t *Transport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) { +func (t *RelayTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) { if !t.Matches(laddr) { return nil, fmt.Errorf("%s is not a relay address", laddr) } - return t.Relay().Listener() + return t.Relay().Listener(), nil } -func (t *Transport) Matches(a ma.Multiaddr) bool { +func (t *RelayTransport) Matches(a ma.Multiaddr) bool { return t.Relay().Dialer().Matches(a) } From ac48c753abf4324e45702caa5de6df40bfd8e05e Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 19 Jul 2017 20:49:01 +0300 Subject: [PATCH 25/29] AddRelayTransport: use relays for host transport --- .../internal/circuitv1-deprecated/dial.go | 2 +- .../circuitv1-deprecated/transport.go | 35 ++++++--- .../circuitv1-deprecated/transport_test.go | 77 +++++++++++++++++++ 3 files changed, 101 insertions(+), 13 deletions(-) create mode 100644 p2p/protocol/internal/circuitv1-deprecated/transport_test.go diff --git a/p2p/protocol/internal/circuitv1-deprecated/dial.go b/p2p/protocol/internal/circuitv1-deprecated/dial.go index 8e543e7b86..53252644ba 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/dial.go +++ b/p2p/protocol/internal/circuitv1-deprecated/dial.go @@ -56,6 +56,6 @@ func (d *RelayDialer) DialContext(ctx context.Context, a ma.Multiaddr) (tpt.Conn } func (d *RelayDialer) Matches(a ma.Multiaddr) bool { - _, err := a.ValueForProtocol(P_CIRCUIT) + _, err := a.ValueForProtocol(ma.P_CIRCUIT) return err == nil } diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport.go b/p2p/protocol/internal/circuitv1-deprecated/transport.go index 52dc86bd5f..6fb79304cf 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/transport.go +++ b/p2p/protocol/internal/circuitv1-deprecated/transport.go @@ -1,24 +1,15 @@ package relay import ( + "context" "fmt" + host "github.com/libp2p/go-libp2p-host" + swarm "github.com/libp2p/go-libp2p-swarm" tpt "github.com/libp2p/go-libp2p-transport" ma "github.com/multiformats/go-multiaddr" ) -const P_CIRCUIT = 290 - -var RelayMaddrProtocol = ma.Protocol{ - Code: P_CIRCUIT, - Name: "p2p-circuit", - Size: 0, -} - -func init() { - ma.AddProtocol(RelayMaddrProtocol) -} - var _ tpt.Transport = (*RelayTransport)(nil) type RelayTransport Relay @@ -48,3 +39,23 @@ func (t *RelayTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) { func (t *RelayTransport) Matches(a ma.Multiaddr) bool { return t.Relay().Dialer().Matches(a) } + +// AddRelayTransport constructs a relay and adds it as a transport to the host network. +func AddRelayTransport(ctx context.Context, h host.Host, opts ...RelayOpt) error { + // the necessary methods are not part of the Network interface, only exported by Swarm + // TODO: generalize the network interface for adding tranports + n, ok := h.Network().(*swarm.Network) + if !ok { + return fmt.Errorf("%s is not a swarm network", h.Network()) + } + + s := n.Swarm() + + r, err := NewRelay(ctx, h, opts...) + if err != nil { + return err + } + + s.AddTransport(r.Transport()) + return s.AddListenAddr(r.Listener().Multiaddr()) +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport_test.go b/p2p/protocol/internal/circuitv1-deprecated/transport_test.go new file mode 100644 index 0000000000..6eb12efbe0 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/transport_test.go @@ -0,0 +1,77 @@ +package relay_test + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "testing" + "time" + + . "github.com/libp2p/go-libp2p-circuit" + + inet "github.com/libp2p/go-libp2p-net" + pstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" +) + +func TestRelayTransport(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 3) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + time.Sleep(10 * time.Millisecond) + + err := AddRelayTransport(ctx, hosts[0]) + if err != nil { + t.Fatal(err) + } + + err = AddRelayTransport(ctx, hosts[1], OptHop) + if err != nil { + t.Fatal(err) + } + + err = AddRelayTransport(ctx, hosts[2]) + if err != nil { + t.Fatal(err) + } + + const proto = "test/relay-transport" + + msg := []byte("relay works!") + handler := func(s inet.Stream) { + s.Write(msg) + s.Close() + } + + hosts[2].SetStreamHandler(proto, handler) + + addr, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/ipfs/%s", hosts[1].ID().Pretty(), hosts[2].ID().Pretty())) + if err != nil { + t.Fatal(err) + } + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, pstore.TempAddrTTL) + + s, err := hosts[0].NewStream(rctx, hosts[2].ID(), proto) + if err != nil { + t.Fatal(err) + } + + data, err := ioutil.ReadAll(s) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(data, msg) { + t.Fatal("message was incorrect:", string(data)) + } +} From 264939d2e027dfe8f873f093863b1c0cb97bc6d5 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 19 Jul 2017 21:28:59 +0300 Subject: [PATCH 26/29] transport_test: check error in stream handler --- p2p/protocol/internal/circuitv1-deprecated/transport_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport_test.go b/p2p/protocol/internal/circuitv1-deprecated/transport_test.go index 6eb12efbe0..101622fb5d 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/transport_test.go +++ b/p2p/protocol/internal/circuitv1-deprecated/transport_test.go @@ -45,7 +45,10 @@ func TestRelayTransport(t *testing.T) { msg := []byte("relay works!") handler := func(s inet.Stream) { - s.Write(msg) + _, err := s.Write(msg) + if err != nil { + t.Error(err) + } s.Close() } From 3cb456daa060d7cd2ab955391193712f857c7651 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 19 Jul 2017 22:59:44 +0300 Subject: [PATCH 27/29] dynamic protocol registration --- .../internal/circuitv1-deprecated/dial.go | 2 +- .../circuitv1-deprecated/transport.go | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/dial.go b/p2p/protocol/internal/circuitv1-deprecated/dial.go index 53252644ba..8e543e7b86 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/dial.go +++ b/p2p/protocol/internal/circuitv1-deprecated/dial.go @@ -56,6 +56,6 @@ func (d *RelayDialer) DialContext(ctx context.Context, a ma.Multiaddr) (tpt.Conn } func (d *RelayDialer) Matches(a ma.Multiaddr) bool { - _, err := a.ValueForProtocol(ma.P_CIRCUIT) + _, err := a.ValueForProtocol(P_CIRCUIT) return err == nil } diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport.go b/p2p/protocol/internal/circuitv1-deprecated/transport.go index 6fb79304cf..480ebdd3ae 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/transport.go +++ b/p2p/protocol/internal/circuitv1-deprecated/transport.go @@ -4,12 +4,42 @@ import ( "context" "fmt" + addrutil "github.com/libp2p/go-addr-util" host "github.com/libp2p/go-libp2p-host" swarm "github.com/libp2p/go-libp2p-swarm" tpt "github.com/libp2p/go-libp2p-transport" ma "github.com/multiformats/go-multiaddr" ) +const P_CIRCUIT = 290 + +var Protocol = ma.Protocol{ + Code: P_CIRCUIT, + Size: 0, + Name: "p2p-circuit", + VCode: ma.CodeToVarint(P_CIRCUIT), +} + +func init() { + ma.AddProtocol(Protocol) + + // Add dialer transport + const proto = "/ipfs/p2p-circuit/ipfs" + tps := addrutil.SupportedTransportStrings + + err := addrutil.AddTransport(proto) + if err != nil { + panic(err) + } + + for _, tp := range tps { + err = addrutil.AddTransport(tp + proto) + if err != nil { + panic(err) + } + } +} + var _ tpt.Transport = (*RelayTransport)(nil) type RelayTransport Relay From b671627aafc890992946cfbb2878990495619d33 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 20 Jul 2017 10:55:46 +0300 Subject: [PATCH 28/29] transport: use correct format specifier for error --- p2p/protocol/internal/circuitv1-deprecated/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport.go b/p2p/protocol/internal/circuitv1-deprecated/transport.go index 480ebdd3ae..614b252e4c 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/transport.go +++ b/p2p/protocol/internal/circuitv1-deprecated/transport.go @@ -76,7 +76,7 @@ func AddRelayTransport(ctx context.Context, h host.Host, opts ...RelayOpt) error // TODO: generalize the network interface for adding tranports n, ok := h.Network().(*swarm.Network) if !ok { - return fmt.Errorf("%s is not a swarm network", h.Network()) + return fmt.Errorf("%v is not a swarm network", h.Network()) } s := n.Swarm() From 1536c8a0637b63ecec654edc689659d8a173b723 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 20 Jul 2017 19:46:19 +0300 Subject: [PATCH 29/29] relay: no need to error on empty src multiaddrs --- p2p/protocol/internal/circuitv1-deprecated/relay.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 7e7a0ad9f1..5e2aaec60e 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -267,7 +267,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) { src, err := peerToPeerInfo(msg.GetSrcPeer()) - if err != nil || len(src.Addrs) == 0 { + if err != nil { r.handleError(s, pb.CircuitRelay_STOP_SRC_MULTIADDR_INVALID) return } @@ -280,7 +280,9 @@ func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) { log.Infof("relay connection from: %s", src.ID) - r.host.Peerstore().AddAddrs(src.ID, src.Addrs, pstore.TempAddrTTL) + if len(src.Addrs) > 0 { + r.host.Peerstore().AddAddrs(src.ID, src.Addrs, pstore.TempAddrTTL) + } select { case r.incoming <- &Conn{Stream: s, remote: src, transport: r.Transport()}: