diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a7bfa47dd..a3eb447c25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,36 @@ +## Release v1.1.0 + +**Highlights**: + +- `weave launch` now launches all weave components, simplifying + startup. +- `weave status` has been completely revamped, with a much improved + presentation of the information, and the option to select and output + data in JSON. +- weaveDNS has been rewritten and embedded in the router. The new + implementation simplifies configuration, improves performance, and + provides fault resilience for services. +- the weave Docker API proxy now provides an even more seamless user + experience, and enables easier integration of weave with other + systems such as kubernetes. +- many usability improvements +- a few minor bug fixes, including a couple of security + vulnerabilities + +More details in the +[change log](https://github.com/weaveworks/weave/issues?q=milestone%3A1.1.0). + +## Release 1.0.3 + +This release contains a weaveDNS feature enhancement as well as minor fixes for +improved stability and robustness. + +More details in the +[change log](https://github.com/weaveworks/weave/issues?q=milestone%3A1.0.3). + +The release is fully compatible with other 1.0.x versions, so existing +clusters can be upgraded incrementally. + ## Release 1.0.2 This release fixes a number of bugs, including some security diff --git a/README.md b/README.md index 916fae1b2f..5aa9805567 100644 --- a/README.md +++ b/README.md @@ -67,8 +67,8 @@ two containers, one on each host. On $HOST1 we run: - host1$ weave launch && weave launch-dns && weave launch-proxy - host1$ eval $(weave proxy-env) + host1$ weave launch + host1$ eval $(weave env) host1$ docker run --name a1 -ti ubuntu > NB: If the first command results in an error like @@ -81,21 +81,18 @@ On $HOST1 we run: > `sudo`, since some commands modify environment entries and hence > they all need to be executed from the same shell. -The first line runs the weave router, DNS and Docker API proxy, each -in their own container. The second line sets the `DOCKER_HOST` -environment variable to point to the proxy, so that containers -launched via the docker command line are automatically attached to the -weave network. Finally, we run our application container; this happens -via the proxy so it is automatically allocated an IP address and -registered in DNS. +The first line runs weave. The second line configures our environment +so that containers launched via the docker command line are +automatically attached to the weave network. Finally, we run our +application container. That's it! If our application consists of more than one container on this host we simply launch them with `docker run` as appropriate. Next we repeat similar steps on `$HOST2`... - host2$ weave launch $HOST1 && weave launch-dns && weave launch-proxy - host2$ eval $(weave proxy-env) + host2$ weave launch $HOST1 + host2$ eval $(weave env) host2$ docker run --name a2 -ti ubuntu The only difference, apart from the name of the application container, @@ -113,11 +110,10 @@ available. Also, we can tell weave to connect to multiple peers by supplying multiple addresses, separated by spaces. And we can [add peers dynamically](http://docs.weave.works/weave/latest_release/features.html#dynamic-topologies). -The router, DNS and Docker API proxy need to be started once per -host. The relevant container images are pulled down on demand, but if -you wish you can preload them by running `weave setup` - this is -particularly useful for automated deployments, and ensures that there -are no delays during later operations. +Weave must be started once per host. The relevant container images are +pulled down on demand, but if you wish you can preload them by running +`weave setup` - this is particularly useful for automated deployments, +and ensures that there are no delays during later operations. Now that we've got everything set up, let's see whether our containers can talk to each other... diff --git a/bin/release b/bin/release index 26120e99b7..24e8f265b7 100755 --- a/bin/release +++ b/bin/release @@ -242,7 +242,7 @@ usage() { echo "Usage:" echo -e "\t./bin/release build" echo "-- Build artefacts for the latest version tag" - echo -e "\t./bin/release draft + echo -e "\t./bin/release draft" echo "-- Create draft release with artefacts in GitHub" echo -e "\t./bin/release publish" echo "-- Publish the GitHub release and update DockerHub" diff --git a/docs/architecture.txt b/docs/architecture.txt index 4b0de429fb..9e2babecc8 100644 --- a/docs/architecture.txt +++ b/docs/architecture.txt @@ -66,23 +66,23 @@ Router: simply responds to traffic received from the remote peer via TCP 5. We register this connection with the local peer 6a. If we initiated this connection then we now start sending fast - heartbeats to the remote peer so that the remote peer can determine what - address/port it should use to send UDP back to us. To do this, we - spawn off two further threads which are the forwarder - loops. These receive frames which are to be sent to the remote - peer. One of them sends frames without the DF flag set. To do this - it just sends the packets out of the UDP Listener socket. The - other needs to send frames with the DF flag set and needs its own - socket so that it can do PMTU discovery easily. To do this, it - uses a Raw IP socket (IP has no ports, so there's no collision - issue with the UDP Listener socket) and so it must add UDP headers - itself. - 6b. If we did not initiate this connection then the UDP Listener + heartbeats to the remote peer so that the remote peer can + determine what address/port it should use to send UDP back to + us. To do this, we spawn off a "forwarder" thread to send + heartbeats, monitor incoming heartbeats, and some other auxiliary + duties. It also consumes frames to be encapsulated and send via + UDP from two channels, for DF and non-DF cases. In the non-DF + case, it can just send the packets out of the UDP Listener + socket. In the DF case, it needs its own socket so that it can do + PMTU discovery easily. To do this, it uses a Raw IP socket (IP has + no ports, so there's no collision issue with the UDP Listener + socket) and so it must add UDP headers itself. + 6b. If we did not initiate this connection then the UDP Listener should start receiving fast heartbeats from the remote peer. From those it should be able to identify the local connection via the local peer. It will tell the local connection (communicating to the actor thread) about the UDP address of the remote peer. The - local connection will then start its forwarder threads as + local connection will then start its forwarder thread as described in 6a, and start sending fast heartbeats. We send to the remote peer via TCP a ConnectionEstablished message. The remote peer receives this (on the TCP receiver process), tells the diff --git a/prog/weaveexec/Dockerfile b/prog/weaveexec/Dockerfile index 8db372b145..1d35b4195f 100644 --- a/prog/weaveexec/Dockerfile +++ b/prog/weaveexec/Dockerfile @@ -1,4 +1,4 @@ -FROM gliderlabs/alpine +FROM alpine MAINTAINER Weaveworks Inc LABEL works.weave.role=system diff --git a/prog/weaver/main.go b/prog/weaver/main.go index 1ac9c315ac..b8bd33263c 100644 --- a/prog/weaver/main.go +++ b/prog/weaver/main.go @@ -1,7 +1,6 @@ package main import ( - "crypto/sha256" "fmt" "net" "net/http" @@ -36,31 +35,31 @@ func main() { runtime.GOMAXPROCS(procs) var ( - config weave.Config - justVersion bool - protocolMinVersion int - ifaceName string - routerName string - nickName string - password string - pktdebug bool - logLevel string - prof string - bufSzMB int - noDiscovery bool - httpAddr string - iprangeCIDR string - ipsubnetCIDR string - peerCount int - apiPath string - peers []string - + config weave.Config + justVersion bool + protocolMinVersion int + ifaceName string + routerName string + nickName string + password string + pktdebug bool + logLevel string + prof string + bufSzMB int + noDiscovery bool + httpAddr string + iprangeCIDR string + ipsubnetCIDR string + peerCount int + apiPath string + peers []string noDNS bool dnsDomain string dnsListenAddress string dnsTTL int dnsClientTimeout time.Duration dnsEffectiveListenAddress string + iface *net.Interface ) mflag.BoolVar(&justVersion, []string{"#version", "-version"}, false, "print version and exit") @@ -116,18 +115,25 @@ func main() { var err error if ifaceName != "" { - config.Iface, err = weavenet.EnsureInterface(ifaceName) + iface, err := weavenet.EnsureInterface(ifaceName) + if err != nil { + Log.Fatal(err) + } + + // bufsz flag is in MB + config.Bridge, err = weave.NewPcap(iface, bufSzMB*1024*1024) if err != nil { Log.Fatal(err) } } if routerName == "" { - if config.Iface == nil { + if iface == nil { Log.Fatal("Either an interface must be specified with --iface or a name with -name") } - routerName = config.Iface.HardwareAddr.String() + routerName = iface.HardwareAddr.String() } + name, err := weave.PeerNameFromUserInput(routerName) if err != nil { Log.Fatal(err) @@ -157,9 +163,14 @@ func main() { defer profile.Start(&p).Stop() } - config.BufSz = bufSzMB * 1024 * 1024 - config.LogFrame = logFrameFunc(pktdebug) config.PeerDiscovery = !noDiscovery + config.Overlay = weave.NewSleeveOverlay(config.Port) + + if pktdebug { + config.PacketLogging = packetLogging{} + } else { + config.PacketLogging = nopPacketLogging{} + } router := weave.NewRouter(config, name, nickName) Log.Println("Our name is", router.Ourself) @@ -248,24 +259,22 @@ func canonicalName(f *mflag.Flag) string { return "" } -func logFrameFunc(debug bool) weave.LogFrameFunc { - if !debug { - return func(prefix string, frame []byte, dec *weave.EthernetDecoder) {} - } - return func(prefix string, frame []byte, dec *weave.EthernetDecoder) { - h := fmt.Sprintf("%x", sha256.Sum256(frame)) - parts := []interface{}{prefix, len(frame), "bytes (", h, ")"} +type packetLogging struct{} - if dec != nil { - parts = append(parts, dec.Eth.SrcMAC, "->", dec.Eth.DstMAC) +func (packetLogging) LogPacket(msg string, key weave.PacketKey) { + Log.Println(msg, key.SrcMAC, "->", key.DstMAC) +} - if dec.DF() { - parts = append(parts, "(DF)") - } - } +func (packetLogging) LogForwardPacket(msg string, key weave.ForwardPacketKey) { + Log.Println(msg, key.SrcPeer, key.SrcMAC, "->", key.DstPeer, key.DstMAC) +} - Log.Println(parts...) - } +type nopPacketLogging struct{} + +func (nopPacketLogging) LogPacket(string, weave.PacketKey) { +} + +func (nopPacketLogging) LogForwardPacket(string, weave.ForwardPacketKey) { } func parseAndCheckCIDR(cidrStr string) address.CIDR { diff --git a/router/bridge.go b/router/bridge.go new file mode 100644 index 0000000000..00481a0a35 --- /dev/null +++ b/router/bridge.go @@ -0,0 +1,35 @@ +package router + +// Interface to packet handling on the local virtual bridge +type Bridge interface { + // Inject a packet to be delivered locally + InjectPacket(PacketKey) FlowOp + + // Start consuming packets from the bridge. Injected packets + // should not be included. + StartConsumingPackets(BridgeConsumer) error + + String() string + Stats() map[string]int +} + +// A function that determines how to handle locally captured packets. +type BridgeConsumer func(PacketKey) FlowOp + +type NullBridge struct{} + +func (NullBridge) InjectPacket(PacketKey) FlowOp { + return nil +} + +func (NullBridge) StartConsumingPackets(BridgeConsumer) error { + return nil +} + +func (NullBridge) String() string { + return "" +} + +func (NullBridge) Stats() map[string]int { + return nil +} diff --git a/router/connection.go b/router/connection.go index 3026989f74..e3967ec69b 100644 --- a/router/connection.go +++ b/router/connection.go @@ -1,8 +1,6 @@ package router import ( - "encoding/binary" - "errors" "fmt" "net" "strconv" @@ -29,7 +27,7 @@ const ( TieBreakTied ) -var ErrConnectToSelf = errors.New("Cannot connect to ourself") +var ErrConnectToSelf = fmt.Errorf("Cannot connect to ourself") type RemoteConnection struct { local *Peer @@ -42,26 +40,17 @@ type RemoteConnection struct { type LocalConnection struct { sync.RWMutex RemoteConnection - TCPConn *net.TCPConn - version byte - tcpSender TCPSender - remoteUDPAddr *net.UDPAddr - receivedHeartbeat bool - stackFrag bool - effectivePMTU int - SessionKey *[32]byte - heartbeatTCP *time.Ticker - heartbeatTimeout *time.Timer - heartbeatFrame []byte - heartbeat *time.Ticker - fragTest *time.Ticker - forwarder *Forwarder - forwarderDF *ForwarderDF - Decryptor Decryptor - Router *Router - uid uint64 - actionChan chan<- ConnectionAction - finished <-chan struct{} // closed to signal that actorLoop has finished + TCPConn *net.TCPConn + version byte + tcpSender TCPSender + remoteUDPAddr *net.UDPAddr + SessionKey *[32]byte + heartbeatTCP *time.Ticker + Router *Router + uid uint64 + actionChan chan<- ConnectionAction + finished <-chan struct{} // closed to signal that actorLoop has finished + forwarder OverlayForwarder } type ConnectionAction func() error @@ -106,7 +95,6 @@ func StartLocalConnection(connRemote *RemoteConnection, tcpConn *net.TCPConn, ud Router: router, TCPConn: tcpConn, remoteUDPAddr: udpAddr, - effectivePMTU: DefaultPMTU, uid: randUint64(), actionChan: actionChan, finished: finished} @@ -126,48 +114,12 @@ func (conn *LocalConnection) BreakTie(dupConn Connection) ConnectionTieBreak { } } -// Read by the forwarder processes when in the UDP senders -func (conn *LocalConnection) RemoteUDPAddr() *net.UDPAddr { - conn.RLock() - defer conn.RUnlock() - return conn.remoteUDPAddr -} - func (conn *LocalConnection) Established() bool { conn.RLock() defer conn.RUnlock() return conn.established } -// Called by forwarder processes, read in Forward (by sniffer and udp -// listener process in router). -func (conn *LocalConnection) setEffectivePMTU(pmtu int) { - conn.Lock() - defer conn.Unlock() - if conn.effectivePMTU != pmtu { - conn.effectivePMTU = pmtu - conn.Log("Effective PMTU set to", pmtu) - } -} - -// Called by the connection's actor process, and by the connection's -// TCP receiver process. StackFrag is read in conn.forward -func (conn *LocalConnection) setStackFrag(frag bool) { - conn.Lock() - defer conn.Unlock() - conn.stackFrag = frag -} - -// Called by the connection's TCP receiver process. -func (conn *LocalConnection) pmtuVerified(pmtu int) { - conn.RLock() - fwd := conn.forwarderDF - conn.RUnlock() - if fwd != nil { - fwd.PMTUVerified(pmtu) - } -} - // Send directly, not via the Actor. If it goes via the Actor we can // get a deadlock where LocalConnection is blocked talking to // LocalPeer and LocalPeer is blocked trying send a ProtocolMsg via @@ -197,64 +149,6 @@ func (conn *LocalConnection) Shutdown(err error) { go func() { conn.sendAction(func() error { return err }) }() } -// Async -// -// Heartbeating serves two purposes: a) keeping NAT paths alive, and -// b) updating a remote peer's knowledge of our address, in the event -// it changes (e.g. because NAT paths expired). -func (conn *LocalConnection) ReceivedHeartbeat(remoteUDPAddr *net.UDPAddr, connUID uint64) { - if remoteUDPAddr == nil || connUID != conn.uid { - return - } - conn.sendAction(func() error { - oldRemoteUDPAddr := conn.remoteUDPAddr - old := conn.receivedHeartbeat - conn.Lock() - conn.remoteUDPAddr = remoteUDPAddr - conn.receivedHeartbeat = true - conn.Unlock() - conn.heartbeatTimeout.Reset(HeartbeatTimeout) - if !old { - if err := conn.sendSimpleProtocolMsg(ProtocolConnectionEstablished); err != nil { - return err - } - } - if oldRemoteUDPAddr == nil { - return conn.sendFastHeartbeats() - } else if oldRemoteUDPAddr.String() != remoteUDPAddr.String() { - log.Println("Peer", conn.remote, "moved from", oldRemoteUDPAddr, "to", remoteUDPAddr) - } - return nil - }) -} - -// Async -func (conn *LocalConnection) SetEstablished() { - conn.sendAction(func() error { - stopTicker(conn.heartbeat) - old := conn.established - conn.Lock() - conn.established = true - conn.Unlock() - if old { - return nil - } - conn.Router.Ourself.ConnectionEstablished(conn) - if err := conn.ensureForwarders(); err != nil { - return err - } - // Send a large frame down the DF channel in order to prompt - // PMTU discovery to start. - conn.Send(true, PMTUDiscovery) - conn.heartbeat = time.NewTicker(SlowHeartbeat) - conn.fragTest = time.NewTicker(FragTestInterval) - // avoid initial waits for timers to fire - conn.Send(true, conn.heartbeatFrame) - conn.performFragTest() - return nil - }) -} - // Send an actor request to the actorLoop, but don't block if // actorLoop has exited - see http://blog.golang.org/pipelines for // pattern @@ -298,77 +192,63 @@ func (conn *LocalConnection) run(actionChan <-chan ConnectionAction, finished ch return } - if conn.SessionKey == nil { - conn.Decryptor = NewNonDecryptor() - } else { - conn.Decryptor = NewNaClDecryptor(conn.SessionKey, conn.outbound) - } - conn.Log("connection ready; using protocol version", conn.version) - // The ordering of the following is very important. [1] - - if conn.remoteUDPAddr != nil { - if err = conn.ensureForwarders(); err != nil { - return - } + params := ForwarderParams{ + RemotePeer: conn.remote, + LocalIP: conn.TCPConn.LocalAddr().(*net.TCPAddr).IP, + RemoteAddr: conn.remoteUDPAddr, + ConnUID: conn.uid, + Crypto: conn.forwarderCrypto(), + SendControlMessage: conn.sendOverlayControlMessage, } - if err = conn.Router.Ourself.AddConnection(conn); err != nil { + if conn.forwarder, err = conn.Router.Overlay.MakeForwarder(params); err != nil { return } - if err = conn.initHeartbeats(); err != nil { + + // As soon as we do AddConnection, the new connection becomes + // visible to the packet routing logic. So AddConnection must + // come after MakeForwarder + if err = conn.Router.Ourself.AddConnection(conn); err != nil { return } + // SetListener has the side-effect of telling the forwarder + // that the connection is confirmed. This comes after + // AddConnection, because only after that completes do we know + // the connection is valid: in particular that it is not a + // duplicate connection to the same peer. Sending heartbeats + // on a duplicate connection can trip up crypto at the other + // end, since the associated UDP packets may get decoded by + // the other connection. It is also generally wasteful to + // engage in any interaction with the remote on a connection + // that turns out to be invalid. + conn.forwarder.SetListener(ConnectionAsForwarderListener{conn}) + + // receiveTCP must follow also AddConnection. In the absence + // of any indirect connectivity to the remote peer, the first + // we hear about it (and any peers reachable from it) is + // through topology gossip it sends us on the connection. We + // must ensure that the connection has been added to Ourself + // prior to processing any such gossip, otherwise we risk + // immediately gc'ing part of that newly received portion of + // the topology (though not the remote peer itself, since that + // will have a positive ref count), leaving behind dangling + // references to peers. Hence we must invoke AddConnection, + // which is *synchronous*, first. + conn.heartbeatTCP = time.NewTicker(TCPHeartbeat) go conn.receiveTCP(intro.Receiver) + + // AddConnection must precede actorLoop. More precisely, it + // must precede shutdown, since that invokes DeleteConnection + // and is invoked on termination of this entire + // function. Essentially this boils down to a prohibition on + // running AddConnection in a separate goroutine, at least not + // without some synchronisation. Which in turn requires the + // launching of the receiveTCP goroutine to precede actorLoop. err = conn.actorLoop(actionChan) } -// [1] Ordering constraints: -// -// (a) AddConnections must precede initHeartbeats. It is only after -// the former completes that we know the connection is valid, in -// particular is not a duplicate connection to the same peer. Sending -// heartbeats on a duplicate connection can trip up crypto at the -// other end, since the associated UDP packets may get decoded by the -// other connection. It is also generally wasteful to engage in any -// interaction with the remote on a connection that turns out to be -// invald. -// -// (b) AddConnection must precede receiveTCP. In the absence of any -// indirect connectivity to the remote peer, the first we hear about -// it (and any peers reachable from it) is through topology gossip it -// sends us on the connection. We must ensure that the connection has -// been added to Ourself prior to processing any such gossip, -// otherwise we risk immediately gc'ing part of that newly received -// portion of the topology (though not the remote peer itself, since -// that will have a positive ref count), leaving behind dangling -// references to peers. Hence we must invoke AddConnection, which is -// *synchronous*, first. -// -// (c) AddConnection must precede actorLoop. More precisely, it must -// precede shutdown, since that invokes DeleteConnection and is -// invoked on termination of this entire function. Essentially this -// boils down to a prohibition on running AddConnection in a separate -// goroutine, at least not without some synchronisation. Which in turn -// requires us the launching of the receiveTCP goroutine to precede -// actorLoop. -// -// (d) AddConnection should precede receiveTCP. There is no point -// starting the latter if the former fails. -// -// (e) initHeartbeats should precede actorLoop. The former is setting -// LocalConnection fields accessed by the latter. Since the latter -// runs in a separate goroutine, we'd have to add some synchronisation -// if initHeartbeats isn't run first. -// -// (f) ensureForwarders should precede AddConnection. As soon as a -// connection has been added to LocalPeer by the latter, it becomes -// visible to the packet routing logic, which will end up dropping -// packets if the forwarders haven't been created yet. We cannot -// prevent that completely, since, for example, forwarder can only be -// created when we know the remote UDP address, but it helps to try. - func (conn *LocalConnection) makeFeatures() map[string]string { return map[string]string{ "PeerNameFlavour": PeerNameFlavour, @@ -446,17 +326,6 @@ func (conn *LocalConnection) registerRemote(remote *Peer, acceptNewPeer bool) er return nil } -func (conn *LocalConnection) initHeartbeats() error { - conn.heartbeatTCP = time.NewTicker(TCPHeartbeat) - conn.heartbeatTimeout = time.NewTimer(HeartbeatTimeout) - conn.heartbeatFrame = make([]byte, EthernetOverhead+8) - binary.BigEndian.PutUint64(conn.heartbeatFrame[EthernetOverhead:], conn.uid) - if conn.remoteUDPAddr == nil { - return nil - } - return conn.sendFastHeartbeats() -} - func (conn *LocalConnection) actorLoop(actionChan <-chan ConnectionAction) (err error) { for err == nil { select { @@ -464,12 +333,6 @@ func (conn *LocalConnection) actorLoop(actionChan <-chan ConnectionAction) (err err = action() case <-conn.heartbeatTCP.C: err = conn.sendSimpleProtocolMsg(ProtocolHeartbeat) - case <-conn.heartbeatTimeout.C: - err = fmt.Errorf("timed out waiting for UDP heartbeat") - case <-tickerChan(conn.heartbeat): - conn.Send(true, conn.heartbeatFrame) - case <-tickerChan(conn.fragTest): - conn.performFragTest() } } return @@ -491,21 +354,51 @@ func (conn *LocalConnection) shutdown(err error) { conn.Router.Ourself.DeleteConnection(conn) } - if conn.heartbeatTimeout != nil { - conn.heartbeatTimeout.Stop() - } - stopTicker(conn.heartbeatTCP) - stopTicker(conn.heartbeat) - stopTicker(conn.fragTest) - // blank out the forwardChan so that the router processes don't - // try to send any more - conn.stopForwarders() + if conn.forwarder != nil { + conn.forwarder.Stop() + } conn.Router.ConnectionMaker.ConnectionTerminated(conn.remoteTCPAddr, err) } +func (conn *LocalConnection) forwarderCrypto() *OverlayCrypto { + if !conn.Router.UsingPassword() { + return nil + } + + name := conn.local.NameByte + return &OverlayCrypto{ + Dec: NewNaClDecryptor(conn.SessionKey, conn.outbound), + Enc: NewNaClEncryptor(name, conn.SessionKey, conn.outbound, false), + EncDF: NewNaClEncryptor(name, conn.SessionKey, conn.outbound, true), + } +} + +func (conn *LocalConnection) sendOverlayControlMessage(msg []byte) error { + return conn.sendProtocolMsg(ProtocolMsg{ProtocolOverlayControlMsg, msg}) +} + +type ConnectionAsForwarderListener struct{ conn *LocalConnection } + +func (l ConnectionAsForwarderListener) Established() { + l.conn.sendAction(func() error { + old := l.conn.established + l.conn.Lock() + l.conn.established = true + l.conn.Unlock() + if !old { + l.conn.Router.Ourself.ConnectionEstablished(l.conn) + } + return nil + }) +} + +func (l ConnectionAsForwarderListener) Error(err error) { + l.conn.sendAction(func() error { return err }) +} + // Helpers func (conn *LocalConnection) sendSimpleProtocolMsg(tag ProtocolTag) error { @@ -539,16 +432,8 @@ func (conn *LocalConnection) receiveTCP(receiver TCPReceiver) { func (conn *LocalConnection) handleProtocolMsg(tag ProtocolTag, payload []byte) error { switch tag { case ProtocolHeartbeat: - case ProtocolConnectionEstablished: - // We sent fast heartbeats to the remote peer, which has now - // received at least one of them and told us via this message. - // We can now consider the connection as established from our - // end. - conn.SetEstablished() - case ProtocolFragmentationReceived: - conn.setStackFrag(true) - case ProtocolPMTUVerified: - conn.pmtuVerified(int(binary.BigEndian.Uint16(payload))) + case ProtocolOverlayControlMsg: + conn.forwarder.ControlMessage(payload) case ProtocolGossipUnicast, ProtocolGossipBroadcast, ProtocolGossip: return conn.Router.handleGossip(tag, payload) default: @@ -561,18 +446,8 @@ func (conn *LocalConnection) extendReadDeadline() { conn.TCPConn.SetReadDeadline(time.Now().Add(TCPHeartbeat * 2)) } -func (conn *LocalConnection) sendFastHeartbeats() error { - err := conn.ensureForwarders() - if err == nil { - conn.heartbeat = time.NewTicker(FastHeartbeat) - conn.Send(true, conn.heartbeatFrame) // avoid initial wait - } - return err -} - -func (conn *LocalConnection) performFragTest() { - conn.setStackFrag(false) - conn.Send(false, FragTest) +func (conn *LocalConnection) Forward(key ForwardPacketKey) FlowOp { + return conn.forwarder.Forward(key) } func tickerChan(ticker *time.Ticker) <-chan time.Time { diff --git a/router/consts.go b/router/consts.go index 6a132d79c3..f8b718c2b7 100644 --- a/router/consts.go +++ b/router/consts.go @@ -6,29 +6,12 @@ import ( ) const ( - EthernetOverhead = 14 - UDPOverhead = 28 // 20 bytes for IPv4, 8 bytes for UDP - Port = 6783 - HTTPPort = Port + 1 - DefaultPMTU = 65535 - MaxUDPPacketSize = 65535 - ChannelSize = 16 - FragTestSize = 60001 - PMTUDiscoverySize = 60000 - TCPHeartbeat = 30 * time.Second - FastHeartbeat = 500 * time.Millisecond - SlowHeartbeat = 10 * time.Second - FragTestInterval = 5 * time.Minute - GossipInterval = 30 * time.Second - PMTUVerifyAttempts = 8 - PMTUVerifyTimeout = 10 * time.Millisecond // gets doubled with every attempt - MaxDuration = time.Duration(math.MaxInt64) - MaxMissedHeartbeats = 6 - HeartbeatTimeout = MaxMissedHeartbeats * SlowHeartbeat - MaxTCPMsgSize = 10 * 1024 * 1024 -) - -var ( - FragTest = make([]byte, FragTestSize) - PMTUDiscovery = make([]byte, PMTUDiscoverySize) + Port = 6783 + HTTPPort = Port + 1 + MaxUDPPacketSize = 65535 + ChannelSize = 16 + TCPHeartbeat = 30 * time.Second + GossipInterval = 30 * time.Second + MaxDuration = time.Duration(math.MaxInt64) + MaxTCPMsgSize = 10 * 1024 * 1024 ) diff --git a/router/ethernet_decoder.go b/router/ethernet_decoder.go index 0593873c64..2eb1a42985 100644 --- a/router/ethernet_decoder.go +++ b/router/ethernet_decoder.go @@ -29,7 +29,13 @@ func (dec *EthernetDecoder) DecodeLayers(data []byte) { dec.parser.DecodeLayers(data, &dec.decoded) } -func (dec *EthernetDecoder) sendICMPFragNeeded(mtu int, sendFrame func([]byte) error) error { +func (dec *EthernetDecoder) PacketKey() (key PacketKey) { + copy(key.SrcMAC[:], dec.Eth.SrcMAC) + copy(key.DstMAC[:], dec.Eth.DstMAC) + return +} + +func (dec *EthernetDecoder) makeICMPFragNeeded(mtu int) ([]byte, error) { buf := gopacket.NewSerializeBuffer() opts := gopacket.SerializeOptions{ FixLengths: true, @@ -57,23 +63,17 @@ func (dec *EthernetDecoder) sendICMPFragNeeded(mtu int, sendFrame func([]byte) e Seq: uint16(mtu)}, &payload) if err != nil { - return err + return nil, err } log.Printf("Sending ICMP 3,4 (%v -> %v): PMTU= %v", dec.IP.DstIP, dec.IP.SrcIP, mtu) - return sendFrame(buf.Bytes()) + return buf.Bytes(), nil } var ( - // see http://en.wikipedia.org/wiki/Multicast_address#Ethernet - stpMACPrefix = []byte{0x01, 0x80, 0xC2, 0x00, 0x00} - zeroMAC, _ = net.ParseMAC("00:00:00:00:00:00") + zeroMAC, _ = net.ParseMAC("00:00:00:00:00:00") ) -func (dec *EthernetDecoder) DropFrame() bool { - return bytes.Equal(stpMACPrefix, dec.Eth.DstMAC[:len(stpMACPrefix)]) -} - func (dec *EthernetDecoder) IsSpecial() bool { return dec.Eth.Length == 0 && dec.Eth.EthernetType == layers.EthernetTypeLLC && bytes.Equal(zeroMAC, dec.Eth.SrcMAC) && bytes.Equal(zeroMAC, dec.Eth.DstMAC) diff --git a/router/flow.go b/router/flow.go new file mode 100644 index 0000000000..2a7ac03953 --- /dev/null +++ b/router/flow.go @@ -0,0 +1,50 @@ +package router + +import "net" + +// Just enough flow machinery for the weave router + +type MAC [6]byte + +func (mac MAC) String() string { + return net.HardwareAddr(mac[:]).String() +} + +type PacketKey struct { + SrcMAC MAC + DstMAC MAC +} + +type ForwardPacketKey struct { + SrcPeer *Peer + DstPeer *Peer + PacketKey +} + +type FlowOp interface { + // The caller must supply an EthernetDecoder specific to this + // thread, which has already been used to decode the frame. + // The broadcast parameter is a hint whether the packet is + // being broadcast. + Process(frame []byte, dec *EthernetDecoder, broadcast bool) +} + +type MultiFlowOp struct { + broadcast bool + ops []FlowOp +} + +func NewMultiFlowOp(broadcast bool) *MultiFlowOp { + return &MultiFlowOp{broadcast: broadcast} +} + +func (mfop *MultiFlowOp) Add(ops ...FlowOp) { + mfop.ops = append(mfop.ops, ops...) +} + +func (mfop *MultiFlowOp) Process(frame []byte, dec *EthernetDecoder, + broadcast bool) { + for _, op := range mfop.ops { + op.Process(frame, dec, mfop.broadcast) + } +} diff --git a/router/forwarder.go b/router/forwarder.go deleted file mode 100644 index a61eb782c2..0000000000 --- a/router/forwarder.go +++ /dev/null @@ -1,419 +0,0 @@ -package router - -import ( - "syscall" - "time" - - "github.com/google/gopacket" - "github.com/google/gopacket/layers" -) - -type ForwardedFrame struct { - srcPeer *Peer - dstPeer *Peer - frame []byte -} - -type FrameTooBigError struct { - EPMTU int // effective pmtu, i.e. what we tell packet senders -} - -func (conn *LocalConnection) ensureForwarders() error { - if conn.forwarder != nil || conn.forwarderDF != nil { - return nil - } - udpSender := NewSimpleUDPSender(conn) - udpSenderDF, err := NewRawUDPSender(conn) // only thing that can error, so do it early - if err != nil { - return err - } - - usingPassword := conn.SessionKey != nil - var encryptor, encryptorDF Encryptor - if usingPassword { - encryptor = NewNaClEncryptor(conn.local.NameByte, conn.SessionKey, conn.outbound, false) - encryptorDF = NewNaClEncryptor(conn.local.NameByte, conn.SessionKey, conn.outbound, true) - } else { - encryptor = NewNonEncryptor(conn.local.NameByte) - encryptorDF = NewNonEncryptor(conn.local.NameByte) - } - - forwarder := NewForwarder(conn, encryptor, udpSender, DefaultPMTU) - forwarderDF := NewForwarderDF(conn, encryptorDF, udpSenderDF, DefaultPMTU) - - // Various fields in the conn struct are read by other processes, - // so we have to use locks. - conn.Lock() - conn.forwarder = forwarder - conn.forwarderDF = forwarderDF - conn.Unlock() - - return nil -} - -func (conn *LocalConnection) stopForwarders() { - if conn.forwarder == nil || conn.forwarderDF == nil { - return - } - conn.forwarder.Shutdown() - conn.forwarderDF.Shutdown() -} - -// Called from connection's actor process, and from the connection's -// TCP receiver process. -func (conn *LocalConnection) Send(df bool, frameBytes []byte) error { - frame := &ForwardedFrame{ - srcPeer: conn.local, - dstPeer: conn.remote, - frame: frameBytes} - return conn.forward(frame, nil, df) -} - -// Called from LocalPeer.Relay[Broadcast] which is itself invoked from -// router (both UDP listener process and sniffer process). -func (conn *LocalConnection) Forward(frame *ForwardedFrame, dec *EthernetDecoder) error { - return conn.forward(frame, dec, dec != nil && dec.DF()) -} - -func (conn *LocalConnection) forward(frame *ForwardedFrame, dec *EthernetDecoder, df bool) error { - conn.RLock() - var ( - forwarder = conn.forwarder - forwarderDF = conn.forwarderDF - effectivePMTU = conn.effectivePMTU - stackFrag = conn.stackFrag - ) - conn.RUnlock() - - if forwarder == nil || forwarderDF == nil { - conn.Log("Cannot forward frame yet - awaiting contact") - return nil - } - - // We could use non-blocking channel sends here, i.e. drop frames - // on the floor when the forwarder is busy. This would allow our - // caller - the capturing loop in the router - to read frames more - // quickly when under load, i.e. we'd drop fewer frames on the - // floor during capture. And we could maximise CPU utilisation - // since we aren't stalling a thread. However, a lot of work has - // already been done by the time we get here. Since any packet we - // drop will likely get re-transmitted we end up paying that cost - // multiple times. So it's better to drop things at the beginning - // of our pipeline. - if df { - if !frameTooBig(frame, effectivePMTU) { - forwarderDF.Forward(frame) - return nil - } - return FrameTooBigError{EPMTU: effectivePMTU} - } - - if stackFrag || dec == nil || len(dec.decoded) < 2 { - forwarder.Forward(frame) - return nil - } - // Don't have trustworthy stack, so we're going to have to - // send it DF in any case. - if !frameTooBig(frame, effectivePMTU) { - forwarderDF.Forward(frame) - return nil - } - // We can't trust the stack to fragment, we have IP, and we - // have a frame that's too big for the MTU, so we have to - // fragment it ourself. - return fragment(dec.Eth, dec.IP, effectivePMTU, frame, func(segFrame *ForwardedFrame) { - forwarderDF.Forward(segFrame) - }) -} - -func frameTooBig(frame *ForwardedFrame, effectivePMTU int) bool { - // We capture/forward complete ethernet frames. Therefore the - // frame length includes the ethernet header. However, MTUs - // operate at the IP layer and thus do not include the ethernet - // header. To put it another way, when a sender that was told an - // MTU of M sends an IP packet of exactly that length, we will - // capture/forward M + EthernetOverhead bytes of data. - return len(frame.frame) > effectivePMTU+EthernetOverhead -} - -func fragment(eth layers.Ethernet, ip layers.IPv4, pmtu int, frame *ForwardedFrame, forward func(*ForwardedFrame)) error { - // We are not doing any sort of NAT, so we don't need to worry - // about checksums of IP payload (eg UDP checksum). - headerSize := int(ip.IHL) * 4 - // &^ is bit clear (AND NOT). So here we're clearing the lowest 3 - // bits. - maxSegmentSize := (pmtu - headerSize) &^ 7 - opts := gopacket.SerializeOptions{ - FixLengths: false, - ComputeChecksums: true} - payloadSize := int(ip.Length) - headerSize - payload := ip.BaseLayer.Payload[:payloadSize] - offsetBase := int(ip.FragOffset) << 3 - origFlags := ip.Flags - ip.Flags = ip.Flags | layers.IPv4MoreFragments - ip.Length = uint16(headerSize + maxSegmentSize) - if eth.EthernetType == layers.EthernetTypeLLC { - // using LLC, so must set eth length correctly. eth length - // is just the length of the payload - eth.Length = ip.Length - } else { - eth.Length = 0 - } - for offset := 0; offset < payloadSize; offset += maxSegmentSize { - var segmentPayload []byte - if len(payload) <= maxSegmentSize { - // last one - segmentPayload = payload - ip.Length = uint16(len(payload) + headerSize) - ip.Flags = origFlags - if eth.EthernetType == layers.EthernetTypeLLC { - eth.Length = ip.Length - } else { - eth.Length = 0 - } - } else { - segmentPayload = payload[:maxSegmentSize] - payload = payload[maxSegmentSize:] - } - ip.FragOffset = uint16((offset + offsetBase) >> 3) - buf := gopacket.NewSerializeBuffer() - segPayload := gopacket.Payload(segmentPayload) - err := gopacket.SerializeLayers(buf, opts, ð, &ip, &segPayload) - if err != nil { - return err - } - // make copies of the frame we received - segFrame := *frame - segFrame.frame = buf.Bytes() - forward(&segFrame) - } - return nil -} - -// Forwarder - -type Forwarder struct { - conn *LocalConnection - ch chan<- *ForwardedFrame - finished <-chan struct{} - enc Encryptor - udpSender UDPSender - maxPayload int - processSendError func(error) error -} - -func NewForwarder(conn *LocalConnection, enc Encryptor, udpSender UDPSender, pmtu int) *Forwarder { - ch := make(chan *ForwardedFrame, ChannelSize) - finished := make(chan struct{}) - fwd := &Forwarder{ - conn: conn, - ch: ch, - finished: finished, - enc: enc, - udpSender: udpSender, - maxPayload: pmtu - UDPOverhead, - processSendError: func(err error) error { return err }} - go fwd.run(ch, finished) - return fwd -} - -func (fwd *Forwarder) Shutdown() { - fwd.ch <- nil -} - -func (fwd *Forwarder) Forward(frame *ForwardedFrame) { - select { - case fwd.ch <- frame: - case <-fwd.finished: - } -} - -func (fwd *Forwarder) run(ch <-chan *ForwardedFrame, finished chan<- struct{}) { - defer fwd.udpSender.Shutdown() - for { - if !fwd.accumulateAndSendFrames(ch, <-ch) { - close(finished) - return - } - } -} - -func (fwd *Forwarder) effectiveOverhead() int { - return UDPOverhead + fwd.enc.PacketOverhead() + fwd.enc.FrameOverhead() + EthernetOverhead -} - -// Drain the inbound channel of frames, aggregating them into larger -// packets for efficient transmission. -// -// FIXME Depending on the golang scheduler, and the rate at which -// franes get sent to the forwarder, we can be going around this loop -// forever. That is bad since there may be other stuff for us to do, -// i.e. the other branches in the run loop. -func (fwd *Forwarder) accumulateAndSendFrames(ch <-chan *ForwardedFrame, frame *ForwardedFrame) bool { - if frame == nil { - return false - } - if !fwd.appendFrame(frame) { - fwd.logDrop(frame) - // [1] The buffer is empty at this point and therefore we must - // not flush it. The easiest way to accomplish that is simply - // by returning to the surrounding run loop. - return true - } - for { - select { - case frame = <-ch: - if frame == nil { - return false - } - if !fwd.appendFrame(frame) { - fwd.flush() - if !fwd.appendFrame(frame) { - fwd.logDrop(frame) - return true // see [1] - } - } - default: - fwd.flush() - return true - } - } -} - -func (fwd *Forwarder) logDrop(frame *ForwardedFrame) { - fwd.conn.ErrorLog("Dropping too big frame during forwarding: frame len:", len(frame.frame), "; effective PMTU:", fwd.maxPayload+UDPOverhead-fwd.effectiveOverhead()) -} - -func (fwd *Forwarder) appendFrame(frame *ForwardedFrame) bool { - frameLen := len(frame.frame) - if fwd.enc.TotalLen()+fwd.enc.FrameOverhead()+frameLen > fwd.maxPayload { - return false - } - fwd.enc.AppendFrame(frame.srcPeer.NameByte, frame.dstPeer.NameByte, frame.frame) - return true -} - -func (fwd *Forwarder) flush() { - msg, err := fwd.enc.Bytes() - if err != nil { - fwd.conn.Shutdown(err) - } - err = fwd.processSendError(fwd.udpSender.Send(msg)) - if err != nil && PosixError(err) != syscall.ENOBUFS { - fwd.conn.Shutdown(err) - } -} - -type ForwarderDF struct { - Forwarder - verifyPMTUTick <-chan time.Time - verifyPMTU chan<- int - pmtuVerifyCount uint - pmtuVerified bool - highestGoodPMTU int - unverifiedPMTU int - lowestBadPMTU int -} - -func NewForwarderDF(conn *LocalConnection, enc Encryptor, udpSender UDPSender, pmtu int) *ForwarderDF { - ch := make(chan *ForwardedFrame, ChannelSize) - finished := make(chan struct{}) - verifyPMTU := make(chan int, ChannelSize) - fwd := &ForwarderDF{ - Forwarder: Forwarder{ - conn: conn, - ch: ch, - finished: finished, - enc: enc, - udpSender: udpSender, - maxPayload: pmtu - UDPOverhead}, - verifyPMTU: verifyPMTU} - fwd.Forwarder.processSendError = fwd.processSendError - fwd.unverifiedPMTU = pmtu - fwd.effectiveOverhead() - conn.setEffectivePMTU(fwd.unverifiedPMTU) - go fwd.run(ch, finished, verifyPMTU) - return fwd -} - -func (fwd *ForwarderDF) PMTUVerified(pmtu int) { - select { - case fwd.verifyPMTU <- pmtu: - case <-fwd.finished: - } -} - -func (fwd *ForwarderDF) run(ch <-chan *ForwardedFrame, finished chan<- struct{}, verifyPMTU <-chan int) { - defer fwd.udpSender.Shutdown() - for { - select { - case <-fwd.verifyPMTUTick: - // We only do this case here when we know the - // buffers are all empty so that we don't risk - // appending verify-frames to other data. - fwd.verifyPMTUTick = nil - if fwd.pmtuVerified { - continue - } - if fwd.pmtuVerifyCount > 0 { - fwd.pmtuVerifyCount-- - fwd.attemptVerifyEffectivePMTU() - } else { - // we've exceeded the verification - // attempts of the unverifiedPMTU - fwd.lowestBadPMTU = fwd.unverifiedPMTU - fwd.verifyEffectivePMTU((fwd.highestGoodPMTU + fwd.lowestBadPMTU) / 2) - } - case epmtu := <-verifyPMTU: - if fwd.pmtuVerified || epmtu != fwd.unverifiedPMTU { - continue - } - if epmtu+1 < fwd.lowestBadPMTU { - fwd.highestGoodPMTU = fwd.unverifiedPMTU // = epmtu - fwd.verifyEffectivePMTU((fwd.highestGoodPMTU + fwd.lowestBadPMTU) / 2) - } else { - fwd.pmtuVerified = true - fwd.maxPayload = epmtu + fwd.effectiveOverhead() - UDPOverhead - fwd.conn.setEffectivePMTU(epmtu) - fwd.conn.Log("Effective PMTU verified at", epmtu) - } - case frame := <-ch: - if !fwd.accumulateAndSendFrames(ch, frame) { - close(finished) - return - } - } - } -} - -func (fwd *ForwarderDF) verifyEffectivePMTU(newUnverifiedPMTU int) { - fwd.unverifiedPMTU = newUnverifiedPMTU - fwd.pmtuVerifyCount = PMTUVerifyAttempts - fwd.attemptVerifyEffectivePMTU() -} - -func (fwd *ForwarderDF) attemptVerifyEffectivePMTU() { - fwd.enc.AppendFrame(fwd.conn.local.NameByte, fwd.conn.remote.NameByte, - make([]byte, fwd.unverifiedPMTU+EthernetOverhead)) - fwd.flush() - if fwd.verifyPMTUTick == nil { - fwd.verifyPMTUTick = time.After(PMTUVerifyTimeout << (PMTUVerifyAttempts - fwd.pmtuVerifyCount)) - } -} - -func (fwd *ForwarderDF) processSendError(err error) error { - if mtbe, ok := err.(MsgTooBigError); ok { - newUnverifiedPMTU := mtbe.PMTU - fwd.effectiveOverhead() - if newUnverifiedPMTU < fwd.unverifiedPMTU { - fwd.pmtuVerified = false - fwd.maxPayload = mtbe.PMTU - UDPOverhead - fwd.highestGoodPMTU = 8 - fwd.lowestBadPMTU = newUnverifiedPMTU + 1 - fwd.conn.setEffectivePMTU(newUnverifiedPMTU) - fwd.verifyEffectivePMTU(newUnverifiedPMTU) - } - - return nil - } - - return err -} diff --git a/router/gossip_test.go b/router/gossip_test.go index fce0391a57..b618561ec2 100644 --- a/router/gossip_test.go +++ b/router/gossip_test.go @@ -42,11 +42,8 @@ func sendPendingGossip(routers ...*Router) { } func (router *Router) AddTestChannelConnection(r *Router) { - fromName := router.Ourself.Peer.Name - toName := r.Ourself.Peer.Name - - fromPeer := NewPeer(fromName, "", router.Ourself.Peer.UID, 0) - toPeer := NewPeer(toName, "", r.Ourself.Peer.UID, 0) + fromPeer := NewPeerFrom(router.Ourself.Peer) + toPeer := NewPeerFrom(r.Ourself.Peer) r.Peers.FetchWithDefault(fromPeer) // Has side-effect of incrementing refcount router.Peers.FetchWithDefault(toPeer) // @@ -80,13 +77,13 @@ func TestGossipTopology(t *testing.T) { // the routers supplied as arguments, carrying across all UID and // version information. func (router *Router) tp(routers ...*Router) *Peer { - peer := NewPeer(router.Ourself.Peer.Name, "", router.Ourself.Peer.UID, 0) + peer := NewPeerFrom(router.Ourself.Peer) connections := make(map[PeerName]Connection) for _, r := range routers { - p := NewPeer(r.Ourself.Peer.Name, "", r.Ourself.Peer.UID, r.Ourself.Peer.version) + p := NewPeerFrom(r.Ourself.Peer) connections[r.Ourself.Peer.Name] = newMockConnection(peer, p) } - peer.version = router.Ourself.Peer.version + peer.Version = router.Ourself.Peer.Version peer.connections = connections return peer } diff --git a/router/local_peer.go b/router/local_peer.go index 4819133d3b..6c4de7a1cf 100644 --- a/router/local_peer.go +++ b/router/local_peer.go @@ -19,7 +19,7 @@ type LocalPeerAction func() func NewLocalPeer(name PeerName, nickName string, router *Router) *LocalPeer { actionChan := make(chan LocalPeerAction, ChannelSize) peer := &LocalPeer{ - Peer: NewPeer(name, nickName, 0, 0), + Peer: NewPeer(name, nickName, randomPeerUID(), 0), router: router, actionChan: actionChan, } @@ -27,54 +27,54 @@ func NewLocalPeer(name PeerName, nickName string, router *Router) *LocalPeer { return peer } -func (peer *LocalPeer) Forward(dstPeer *Peer, frame []byte, dec *EthernetDecoder) error { - return peer.Relay(peer.Peer, dstPeer, frame, dec) +func (peer *LocalPeer) Forward(dstPeer *Peer, key PacketKey) FlowOp { + return peer.Relay(ForwardPacketKey{ + PacketKey: key, + SrcPeer: peer.Peer, + DstPeer: dstPeer, + }) } -func (peer *LocalPeer) Broadcast(frame []byte, dec *EthernetDecoder) { - peer.RelayBroadcast(peer.Peer, frame, dec) +func (peer *LocalPeer) Broadcast(key PacketKey) FlowOp { + return peer.RelayBroadcast(peer.Peer, key) } -func (peer *LocalPeer) Relay(srcPeer, dstPeer *Peer, frame []byte, dec *EthernetDecoder) error { - relayPeerName, found := peer.router.Routes.Unicast(dstPeer.Name) +func (peer *LocalPeer) Relay(key ForwardPacketKey) FlowOp { + relayPeerName, found := peer.router.Routes.Unicast(key.DstPeer.Name) if !found { // Not necessarily an error as there could be a race with the // dst disappearing whilst the frame is in flight - log.Println("Received packet for unknown destination:", dstPeer) + log.Println("Received packet for unknown destination:", key.DstPeer) return nil } + conn, found := peer.ConnectionTo(relayPeerName) if !found { // Again, could just be a race, not necessarily an error log.Println("Unable to find connection to relay peer", relayPeerName) return nil } - return conn.(*LocalConnection).Forward(&ForwardedFrame{ - srcPeer: srcPeer, - dstPeer: dstPeer, - frame: frame}, - dec) + + return conn.(*LocalConnection).Forward(key) } -func (peer *LocalPeer) RelayBroadcast(srcPeer *Peer, frame []byte, dec *EthernetDecoder) { +func (peer *LocalPeer) RelayBroadcast(srcPeer *Peer, key PacketKey) FlowOp { nextHops := peer.router.Routes.Broadcast(srcPeer.Name) if len(nextHops) == 0 { - return + return nil } + + op := NewMultiFlowOp(true) + for _, conn := range peer.ConnectionsTo(nextHops) { - err := conn.(*LocalConnection).Forward(&ForwardedFrame{ - srcPeer: srcPeer, - dstPeer: conn.Remote(), - frame: frame}, - dec) - if err != nil { - if ftbe, ok := err.(FrameTooBigError); ok { - log.Warningf("dropping too big DF broadcast frame (%v -> %v): PMTU= %v", dec.IP.DstIP, dec.IP.SrcIP, ftbe.EPMTU) - } else { - log.Errorln(err) - } - } + op.Add(conn.(*LocalConnection).Forward(ForwardPacketKey{ + PacketKey: key, + SrcPeer: srcPeer, + DstPeer: conn.Remote(), + })) } + + return op } func (peer *LocalPeer) Connections() ConnectionSet { @@ -265,20 +265,20 @@ func (peer *LocalPeer) addConnection(conn Connection) { peer.Lock() defer peer.Unlock() peer.connections[conn.Remote().Name] = conn - peer.version++ + peer.Version++ } func (peer *LocalPeer) deleteConnection(conn Connection) { peer.Lock() defer peer.Unlock() delete(peer.connections, conn.Remote().Name) - peer.version++ + peer.Version++ } func (peer *LocalPeer) connectionEstablished(conn Connection) { peer.Lock() defer peer.Unlock() - peer.version++ + peer.Version++ } func (peer *LocalPeer) connectionCount() int { diff --git a/router/mac_cache.go b/router/mac_cache.go index 021030c9f0..d0eb76844f 100644 --- a/router/mac_cache.go +++ b/router/mac_cache.go @@ -28,43 +28,60 @@ func NewMacCache(maxAge time.Duration, onExpiry func(net.HardwareAddr, *Peer)) * return cache } -func (cache *MacCache) Enter(mac net.HardwareAddr, peer *Peer) bool { +func (cache *MacCache) add(mac net.HardwareAddr, peer *Peer, force bool) (bool, *Peer) { key := macint(mac) now := time.Now() + cache.RLock() entry, found := cache.table[key] if found && entry.peer == peer && now.Before(entry.lastSeen.Add(cache.maxAge/10)) { cache.RUnlock() - return false + return false, nil } cache.RUnlock() + cache.Lock() defer cache.Unlock() + entry, found = cache.table[key] if !found { cache.table[key] = &MacCacheEntry{lastSeen: now, peer: peer} - return true + return true, nil } + if entry.peer != peer { - entry.lastSeen = now + if !force { + return false, entry.peer + } + entry.peer = peer - return true } + if now.After(entry.lastSeen.Add(cache.maxAge / 10)) { entry.lastSeen = now } - return false + + return false, nil +} + +func (cache *MacCache) Add(mac net.HardwareAddr, peer *Peer) (bool, *Peer) { + return cache.add(mac, peer, false) +} + +func (cache *MacCache) AddForced(mac net.HardwareAddr, peer *Peer) bool { + newMac, _ := cache.add(mac, peer, true) + return newMac } -func (cache *MacCache) Lookup(mac net.HardwareAddr) (*Peer, bool) { +func (cache *MacCache) Lookup(mac net.HardwareAddr) *Peer { key := macint(mac) cache.RLock() defer cache.RUnlock() entry, found := cache.table[key] if !found { - return nil, false + return nil } - return entry.peer, true + return entry.peer } func (cache *MacCache) Delete(peer *Peer) bool { diff --git a/router/mocks_test.go b/router/mocks_test.go index 183f642bd6..dcc5abebcf 100644 --- a/router/mocks_test.go +++ b/router/mocks_test.go @@ -14,7 +14,9 @@ import ( // Add to peers a connection from peers.ourself to p func (peers *Peers) AddTestConnection(p *Peer) { - toPeer := NewPeer(p.Name, "", p.UID, 0) + summary := p.PeerSummary + summary.Version = 0 + toPeer := NewPeerFromSummary(summary) peers.FetchWithDefault(toPeer) // Has side-effect of incrementing refcount conn := newMockConnection(peers.ourself.Peer, toPeer) peers.ourself.addConnection(conn) @@ -23,11 +25,9 @@ func (peers *Peers) AddTestConnection(p *Peer) { // Add to peers a connection from p1 to p2 func (peers *Peers) AddTestRemoteConnection(p1, p2 *Peer) { - fromName := p1.Name - fromPeer := NewPeer(fromName, "", p1.UID, 0) + fromPeer := NewPeerFrom(p1) fromPeer = peers.FetchWithDefault(fromPeer) - toName := p2.Name - toPeer := NewPeer(toName, "", p2.UID, 0) + toPeer := NewPeerFrom(p2) toPeer = peers.FetchWithDefault(toPeer) peers.ourself.addConnection(&RemoteConnection{fromPeer, toPeer, "", false, false}) } diff --git a/router/overlay.go b/router/overlay.go new file mode 100644 index 0000000000..0be39ae98a --- /dev/null +++ b/router/overlay.go @@ -0,0 +1,102 @@ +package router + +import ( + "net" +) + +// Interface to overlay network packet handling +type Overlay interface { + // Start consuming forwarded packets. + StartConsumingPackets(*Peer, *Peers, OverlayConsumer) error + + // Form a packet-forwarding connection. + MakeForwarder(ForwarderParams) (OverlayForwarder, error) + + // The routes have changed, so any cached information should + // be discarded. + InvalidateRoutes() +} + +type ForwarderParams struct { + RemotePeer *Peer + + // The local IP address to use for sending. Derived from the + // local address of the corresponding TCP socket, so may + // differ for different forwarders. + LocalIP net.IP + + // The remote address to send to. nil if unknown, i.e. an + // incoming connection, in which case the Overlay needs to + // discover it (e.g. from incoming datagrams). + RemoteAddr *net.UDPAddr + + // Unique identifier for this connection + ConnUID uint64 + + // Crypto bits. Nil if not encrypting + Crypto *OverlayCrypto + + // Function to send a control message to the counterpart + // forwarder. + SendControlMessage func([]byte) error +} + +// When a consumer is called, the decoder will already have been used +// to decode the frame. +type OverlayConsumer func(ForwardPacketKey) FlowOp + +// Crypto settings for a forwarder. +type OverlayCrypto struct { + Dec Decryptor + Enc Encryptor + EncDF Encryptor +} + +// All of the machinery to forward packets to a particular peer +type OverlayForwarder interface { + // Register a callback for forwarder state changes. + // side-effect, calling this confirms that the connection is + // really wanted, and so the provider should activate it. + // However, Forward might be called before this is called + // (e.g. on another thread). + SetListener(OverlayForwarderListener) + + // Forward a packet across the connection. + Forward(ForwardPacketKey) FlowOp + + Stop() + + // Handle a message from the peer + ControlMessage([]byte) +} + +type OverlayForwarderListener interface { + Established() + Error(error) +} + +type NullOverlay struct{} + +func (NullOverlay) StartConsumingPackets(*Peer, *Peers, OverlayConsumer) error { + return nil +} + +func (NullOverlay) MakeForwarder(ForwarderParams) (OverlayForwarder, error) { + return NullOverlay{}, nil +} + +func (NullOverlay) InvalidateRoutes() { +} + +func (NullOverlay) SetListener(OverlayForwarderListener) { +} + +func (NullOverlay) Forward(ForwardPacketKey) FlowOp { + return nil +} + +func (NullOverlay) Stop() { +} + +func (NullOverlay) ControlMessage([]byte) { +} diff --git a/router/pcap.go b/router/pcap.go index 698c5d0e25..cddc366eb4 100644 --- a/router/pcap.go +++ b/router/pcap.go @@ -2,18 +2,42 @@ package router import ( "fmt" + "net" + "sync" "github.com/google/gopacket/pcap" ) -type PcapIO struct { - handle *pcap.Handle +type Pcap struct { + iface *net.Interface + bufSz int + + // The libpcap handle for writing packets. It's possible that a + // single handle could be used for reading and writing, but + // we'd have to examine the performance implications. + writeHandle *pcap.Handle + + // pcap handles are single-threaded, so we need to lock around + // uses of writeHandle. + mutex sync.Mutex + + // The libpcap handle for reading packets + readHandle *pcap.Handle } -func NewPcapIO(ifName string, bufSz int) (PacketSourceSink, error) { - pio, err := newPcapIO(ifName, true, 65535, bufSz) +func NewPcap(iface *net.Interface, bufSz int) (Bridge, error) { + wh, err := newPcapHandle(iface.Name, false, 0, 0) if err != nil { - return pio, err + return nil, err + } + + return &Pcap{iface: iface, bufSz: bufSz, writeHandle: wh}, nil +} + +func (p *Pcap) StartConsumingPackets(consumer BridgeConsumer) error { + rh, err := newPcapHandle(p.iface.Name, true, 65535, p.bufSz) + if err != nil { + return err } // Under Linux, libpcap implements the SetDirection filtering @@ -21,16 +45,25 @@ func NewPcapIO(ifName string, bufSz int) (PacketSourceSink, error) { // packets inside the kernel. We do this here rather than in // newPcapIO because libpcap doesn't like this in combination // with a 0 snaplen. - err = pio.handle.SetBPFFilter("inbound") - return pio, err -} + err = rh.SetBPFFilter("inbound") + if err != nil { + rh.Close() + return err + } -func NewPcapO(ifName string) (po PacketSink, err error) { - po, err = newPcapIO(ifName, false, 0, 0) - return + // readHandle is just for the benefit of Stats. + p.mutex.Lock() + defer p.mutex.Unlock() + if p.readHandle != nil { + panic("already consuming") + } + + p.readHandle = rh + go p.sniff(rh, consumer) + return nil } -func newPcapIO(ifName string, promisc bool, snaplen int, bufSz int) (handle *PcapIO, err error) { +func newPcapHandle(ifName string, promisc bool, snaplen int, bufSz int) (handle *pcap.Handle, err error) { inactive, err := pcap.NewInactiveHandle(ifName) if err != nil { return @@ -61,38 +94,72 @@ func newPcapIO(ifName string, promisc bool, snaplen int, bufSz int) (handle *Pca if err = inactive.SetBufferSize(bufSz); err != nil { return } - active, err := inactive.Activate() + handle, err = inactive.Activate() if err != nil { return } - if err = active.SetDirection(pcap.DirectionIn); err != nil { - return - } - return &PcapIO{handle: active}, nil + err = handle.SetDirection(pcap.DirectionIn) + return } -func (pio *PcapIO) ReadPacket() (data []byte, err error) { +func (p *Pcap) String() string { + return fmt.Sprint(p.iface.Name, " (via pcap)") +} + +func (p *Pcap) InjectPacket(PacketKey) FlowOp { + return p +} + +func (p *Pcap) Process(frame []byte, dec *EthernetDecoder, broadcast bool) { + p.mutex.Lock() + defer p.mutex.Unlock() + checkWarn(p.writeHandle.WritePacketData(frame)) +} + +func (p *Pcap) sniff(readHandle *pcap.Handle, consumer BridgeConsumer) { + dec := NewEthernetDecoder() + for { - data, _, err = pio.handle.ZeroCopyReadPacketData() - if err == nil || err != pcap.NextErrorTimeoutExpired { - break + pkt, _, err := readHandle.ZeroCopyReadPacketData() + if err == pcap.NextErrorTimeoutExpired { + continue + } + + checkFatal(err) + dec.DecodeLayers(pkt) + if len(dec.decoded) == 0 { + continue + } + + if fop := consumer(dec.PacketKey()); fop != nil { + // We are handing over the frame to + // forwarders, so we need to make a copy of it + // in order to prevent the next capture from + // overwriting the data + pktLen := len(pkt) + pktCopy := make([]byte, pktLen, pktLen) + copy(pktCopy, pkt) + + fop.Process(pktCopy, dec, false) } } - return } -func (pio *PcapIO) WritePacket(data []byte) error { - return pio.handle.WritePacketData(data) -} +func (p *Pcap) Stats() map[string]int { + p.mutex.Lock() + rh := p.readHandle + p.mutex.Unlock() -func (pio *PcapIO) Stats() map[string]int { - stats, err := pio.handle.Stats() - if err != nil { - return nil + if rh != nil { + stats, err := rh.Stats() + if err == nil { + return map[string]int{ + "PacketsReceived": stats.PacketsReceived, + "PacketsDropped": stats.PacketsDropped, + "PacketsIfDropped": stats.PacketsIfDropped, + } + } } - res := make(map[string]int) - res["PacketsReceived"] = stats.PacketsReceived - res["PacketsDropped"] = stats.PacketsDropped - res["PacketsIfDropped"] = stats.PacketsIfDropped - return res + + return nil } diff --git a/router/peer.go b/router/peer.go index f01eece77d..7dfd08578d 100644 --- a/router/peer.go +++ b/router/peer.go @@ -8,34 +8,50 @@ import ( type PeerUID uint64 +func randomPeerUID() PeerUID { + return PeerUID(randUint64()) +} + func ParsePeerUID(s string) (PeerUID, error) { uid, err := strconv.ParseUint(s, 10, 64) return PeerUID(uid), err } +type PeerSummary struct { + NameByte []byte + NickName string + UID PeerUID + Version uint64 +} + type Peer struct { - Name PeerName - NameByte []byte - NickName string - UID PeerUID - version uint64 + Name PeerName + PeerSummary localRefCount uint64 // maintained by Peers connections map[PeerName]Connection } type ConnectionSet map[Connection]struct{} -func NewPeer(name PeerName, nickName string, uid PeerUID, version uint64) *Peer { - if uid == 0 { - uid = PeerUID(randUint64()) - } +func NewPeerFromSummary(summary PeerSummary) *Peer { return &Peer{ - Name: name, - NameByte: name.Bin(), - NickName: nickName, - UID: uid, - version: version, - connections: make(map[PeerName]Connection)} + Name: PeerNameFromBin(summary.NameByte), + PeerSummary: summary, + connections: make(map[PeerName]Connection), + } +} + +func NewPeer(name PeerName, nickName string, uid PeerUID, version uint64) *Peer { + return NewPeerFromSummary(PeerSummary{ + NameByte: name.Bin(), + NickName: nickName, + UID: uid, + Version: version, + }) +} + +func NewPeerFrom(peer *Peer) *Peer { + return NewPeerFromSummary(peer.PeerSummary) } func (peer *Peer) String() string { @@ -66,7 +82,7 @@ func (peer *Peer) String() string { // NB: This function should generally be invoked while holding a read // lock on Peers and LocalPeer. func (peer *Peer) Routes(stopAt *Peer, establishedAndSymmetric bool) (bool, map[PeerName]PeerName) { - routes := make(map[PeerName]PeerName) + routes := make(unicastRoutes) routes[peer.Name] = UnknownPeerName nextWorklist := []*Peer{peer} for len(nextWorklist) > 0 { diff --git a/router/peers.go b/router/peers.go index 0622eef577..144bc1a48c 100644 --- a/router/peers.go +++ b/router/peers.go @@ -24,13 +24,6 @@ type NameCollisionError struct { type PeerNameSet map[PeerName]struct{} -type PeerSummary struct { - NameByte []byte - NickName string - UID PeerUID - Version uint64 -} - type ConnectionSummary struct { NameByte []byte RemoteTCPAddr string @@ -206,13 +199,12 @@ func (peers *Peers) decodeUpdate(update []byte) (newPeers map[PeerName]*Peer, de err = decErr return } - name := PeerNameFromBin(peerSummary.NameByte) - newPeer := NewPeer(name, peerSummary.NickName, peerSummary.UID, peerSummary.Version) + newPeer := NewPeerFromSummary(peerSummary) decodedUpdate = append(decodedUpdate, newPeer) decodedConns = append(decodedConns, connSummaries) - existingPeer, found := peers.table[name] + existingPeer, found := peers.table[newPeer.Name] if !found { - newPeers[name] = newPeer + newPeers[newPeer.Name] = newPeer } else if existingPeer.UID != newPeer.UID { err = NameCollisionError{Name: newPeer.Name} return @@ -245,7 +237,7 @@ func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]Connecti // guaranteed to find peer in the peers.table peer := peers.table[name] if peer != newPeer && - (peer == peers.ourself.Peer || peer.version >= newPeer.version) { + (peer == peers.ourself.Peer || peer.Version >= newPeer.Version) { // Nobody but us updates us. And if we know more about a // peer than what's in the the update, we ignore the // latter. @@ -261,7 +253,7 @@ func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]Connecti // for an update would be someone else calling // router.Peers.ApplyUpdate. But ApplyUpdate takes the Lock on // the router.Peers, so there can be no race here. - peer.version = newPeer.version + peer.Version = newPeer.Version peer.connections = makeConnsMap(peer, connSummaries, peers.table) newUpdate[name] = peer } @@ -269,11 +261,7 @@ func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]Connecti } func (peer *Peer) Encode(enc *gob.Encoder) { - checkFatal(enc.Encode(PeerSummary{ - peer.NameByte, - peer.NickName, - peer.UID, - peer.version})) + checkFatal(enc.Encode(peer.PeerSummary)) connSummaries := []ConnectionSummary{} for _, conn := range peer.connections { diff --git a/router/protocol.go b/router/protocol.go index 8b5252d197..cc7a6e8193 100644 --- a/router/protocol.go +++ b/router/protocol.go @@ -329,9 +329,7 @@ type ProtocolTag byte const ( ProtocolHeartbeat ProtocolTag = iota - ProtocolConnectionEstablished - ProtocolFragmentationReceived - ProtocolPMTUVerified + ProtocolOverlayControlMsg ProtocolGossip ProtocolGossipUnicast ProtocolGossipBroadcast diff --git a/router/router.go b/router/router.go index 9268f280e8..7c8cc62d0a 100644 --- a/router/router.go +++ b/router/router.go @@ -1,13 +1,9 @@ package router import ( - "bytes" - "encoding/binary" "fmt" - "io" "net" "sync" - "syscall" "time" ) @@ -24,17 +20,21 @@ const ( // [3] control rate at which new tokens are added to the bucket -type LogFrameFunc func(string, []byte, *EthernetDecoder) - type Config struct { Port int ProtocolMinVersion byte - Iface *net.Interface Password []byte ConnLimit int PeerDiscovery bool BufSz int - LogFrame LogFrameFunc + PacketLogging PacketLogging + Bridge Bridge + Overlay Overlay +} + +type PacketLogging interface { + LogPacket(string, PacketKey) + LogForwardPacket(string, ForwardPacketKey) } type Router struct { @@ -47,27 +47,21 @@ type Router struct { gossipLock sync.RWMutex gossipChannels GossipChannels TopologyGossip Gossip - PacketSource PacketSource UDPListener *net.UDPConn acceptLimiter *TokenBucket } -type PacketSource interface { - ReadPacket() ([]byte, error) - Stats() map[string]int -} +func NewRouter(config Config, name PeerName, nickName string) *Router { + router := &Router{Config: config, gossipChannels: make(GossipChannels)} -type PacketSink interface { - WritePacket([]byte) error -} + if router.Bridge == nil { + router.Bridge = NullBridge{} + } -type PacketSourceSink interface { - PacketSource - PacketSink -} + if router.Overlay == nil { + router.Overlay = NullOverlay{} + } -func NewRouter(config Config, name PeerName, nickName string) *Router { - router := &Router{Config: config, gossipChannels: make(GossipChannels)} onMacExpiry := func(mac net.HardwareAddr, peer *Peer) { log.Println("Expired MAC", mac, "at", peer) } @@ -80,7 +74,7 @@ func NewRouter(config Config, name PeerName, nickName string) *Router { router.Peers = NewPeers(router.Ourself) router.Peers.OnGC(onPeerGC) router.Peers.FetchWithDefault(router.Ourself.Peer) - router.Routes = NewRoutes(router.Ourself, router.Peers) + router.Routes = NewRoutes(router.Ourself, router.Peers, router.Overlay.InvalidateRoutes) router.ConnectionMaker = NewConnectionMaker(router.Ourself, router.Peers, router.Port, router.PeerDiscovery) router.TopologyGossip = router.NewGossip("topology", router) router.acceptLimiter = NewTokenBucket(acceptMaxTokens, acceptTokenDelay) @@ -88,25 +82,13 @@ func NewRouter(config Config, name PeerName, nickName string) *Router { } // Start listening for TCP connections, locally captured packets, and -// packets forwarded over UDP. This is separate from NewRouter so +// forwarded packets. This is separate from NewRouter so // that gossipers can register before we start forming connections. func (router *Router) Start() { - // we need two pcap handles since they aren't thread-safe - var pio PacketSourceSink - var po PacketSink - var err error - if router.Iface != nil { - pio, err = NewPcapIO(router.Iface.Name, router.BufSz) - checkFatal(err) - po, err = NewPcapO(router.Iface.Name) - checkFatal(err) - } - router.UDPListener = router.listenUDP(router.Port, po) + log.Println("Sniffing traffic on", router.Bridge) + checkFatal(router.Bridge.StartConsumingPackets(router.handleCapturedPacket)) + checkFatal(router.Overlay.StartConsumingPackets(router.Ourself.Peer, router.Peers, router.handleForwardedPacket)) router.listenTCP(router.Port) - if pio != nil { - router.PacketSource = pio - router.sniff(pio) - } } func (router *Router) Stop() error { @@ -118,70 +100,46 @@ func (router *Router) UsingPassword() bool { return router.Password != nil } -func (router *Router) sniff(pio PacketSourceSink) { - log.Println("Sniffing traffic on", router.Iface) +func (router *Router) handleCapturedPacket(key PacketKey) FlowOp { + router.PacketLogging.LogPacket("Captured", key) + srcMac := net.HardwareAddr(key.SrcMAC[:]) - dec := NewEthernetDecoder() - mac := router.Iface.HardwareAddr - if router.Macs.Enter(mac, router.Ourself.Peer) { - log.Println("Discovered our MAC", mac) - } - go func() { - for { - pkt, err := pio.ReadPacket() - checkFatal(err) - router.LogFrame("Sniffed", pkt, nil) - router.handleCapturedPacket(pkt, dec, pio) - } - }() -} - -func (router *Router) handleCapturedPacket(frameData []byte, dec *EthernetDecoder, po PacketSink) { - dec.DecodeLayers(frameData) - decodedLen := len(dec.decoded) - if decodedLen == 0 { - return - } - srcMac := dec.Eth.SrcMAC - srcPeer, found := router.Macs.Lookup(srcMac) - // We need to filter out frames we injected ourselves. For such - // frames, the srcMAC will have been recorded as associated with a - // different peer. - if found && srcPeer != router.Ourself.Peer { - return - } - if router.Macs.Enter(srcMac, router.Ourself.Peer) { + switch newSrcMac, conflictPeer := router.Macs.Add(srcMac, router.Ourself.Peer); { + case newSrcMac: log.Println("Discovered local MAC", srcMac) + + case conflictPeer != nil: + // The MAC cache has an entry for the source MAC + // associated with another peer. This probably means + // we are seeing a frame we injected ourself. That + // shouldn't happen, but discard it just in case. + log.Errorln("Captured frame from MAC (", srcMac, ") associated with another peer", conflictPeer) + return nil } - if dec.DropFrame() { - return - } - dstMac := dec.Eth.DstMAC - dstPeer, found := router.Macs.Lookup(dstMac) - if found && dstPeer == router.Ourself.Peer { - return - } - router.LogFrame("Forwarding", frameData, dec) - - // at this point we are handing over the frame to forwarders, so - // we need to make a copy of it in order to prevent the next - // capture from overwriting the data - frameLen := len(frameData) - frameCopy := make([]byte, frameLen, frameLen) - copy(frameCopy, frameData) - - // If we don't know which peer corresponds to the dest MAC, - // broadcast it. - if !found { - router.Ourself.Broadcast(frameCopy, dec) - return + + // Discard STP broadcasts + if key.DstMAC == [...]byte{0x01, 0x80, 0xC2, 0x00, 0x00, 0x00} { + return nil } - err := router.Ourself.Forward(dstPeer, frameCopy, dec) - if ftbe, ok := err.(FrameTooBigError); ok { - err = dec.sendICMPFragNeeded(ftbe.EPMTU, po.WritePacket) + dstMac := net.HardwareAddr(key.DstMAC[:]) + switch dstPeer := router.Macs.Lookup(dstMac); dstPeer { + case router.Ourself.Peer: + // The packet is destined for a local MAC. The bridge + // won't normally send us such packets, and if it does + // it's likely to be broadcasting the packet to all + // ports. So if it happens, just drop the packet to + // avoid warnings if we try to forward it. + return nil + case nil: + // If we don't know which peer corresponds to the dest + // MAC, broadcast it. + router.PacketLogging.LogPacket("Broadcasting", key) + return router.Ourself.Broadcast(key) + default: + router.PacketLogging.LogPacket("Forwarding", key) + return router.Ourself.Forward(dstPeer, key) } - checkWarn(err) } func (router *Router) listenTCP(localPort int) { @@ -213,138 +171,44 @@ func (router *Router) acceptTCP(tcpConn *net.TCPConn) { StartLocalConnection(connRemote, tcpConn, nil, router, true) } -func (router *Router) listenUDP(localPort int, po PacketSink) *net.UDPConn { - localAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprint(":", localPort)) - checkFatal(err) - conn, err := net.ListenUDP("udp4", localAddr) - checkFatal(err) - f, err := conn.File() - defer f.Close() - checkFatal(err) - fd := int(f.Fd()) - // This one makes sure all packets we send out do not have DF set on them. - err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_MTU_DISCOVER, syscall.IP_PMTUDISC_DONT) - checkFatal(err) - go router.udpReader(conn, po) - return conn -} - -func (router *Router) udpReader(conn *net.UDPConn, po PacketSink) { - defer conn.Close() - dec := NewEthernetDecoder() - buf := make([]byte, MaxUDPPacketSize) - for { - n, sender, err := conn.ReadFromUDP(buf) - if err == io.EOF { - return - } else if err != nil { - log.Warnln("ignoring UDP read error", err) - continue - } else if n < NameSize { - log.Warnln("ignoring too short UDP packet from", sender) - continue - } - name := PeerNameFromBin(buf[:NameSize]) - packet := make([]byte, n-NameSize) - copy(packet, buf[NameSize:n]) - peerConn, found := router.Ourself.ConnectionTo(name) - if !found { - continue - } - relayConn, ok := peerConn.(*LocalConnection) - if !ok { - continue - } - if err := relayConn.Decryptor.IterateFrames(packet, router.handleUDPPacketFunc(relayConn, dec, sender, po)); err != nil { - // Errors during UDP packet decoding / processing are - // non-fatal. One common cause is that we receive and - // attempt to decrypt a "stray" packet. This can actually - // happen quite easily if there is some connection churn - // between two peers. After all, UDP isn't a - // connection-oriented protocol, yet we pretend it is. - // - // If anything really is seriously, unrecoverably amiss - // with a connection, that will typically result in missed - // heartbeats and the connection getting shut down because - // of that. - relayConn.Log(err) - } +func (router *Router) handleForwardedPacket(key ForwardPacketKey) FlowOp { + if key.DstPeer != router.Ourself.Peer { + // it's not for us, we're just relaying it + router.PacketLogging.LogForwardPacket("Relaying", key) + return router.Ourself.Relay(key) } -} - -func (router *Router) handleUDPPacketFunc(relayConn *LocalConnection, dec *EthernetDecoder, sender *net.UDPAddr, po PacketSink) FrameConsumer { - return func(srcNameByte, dstNameByte []byte, frame []byte) { - srcPeer := router.Peers.Fetch(PeerNameFromBin(srcNameByte)) - dstPeer := router.Peers.Fetch(PeerNameFromBin(dstNameByte)) - if srcPeer == nil || dstPeer == nil { - return - } - - dec.DecodeLayers(frame) - decodedLen := len(dec.decoded) - if decodedLen == 0 { - return - } - // Handle special frames produced internally (rather than - // captured/forwarded) by the remote router. - // - // We really shouldn't be decoding these above, since they are - // not genuine Ethernet frames. However, it is actually more - // efficient to do so, as we want to optimise for the common - // (i.e. non-special) frames. These always need decoding, and - // detecting special frames is cheaper post decoding than pre. - if decodedLen == 1 && dec.IsSpecial() { - if srcPeer == relayConn.Remote() && dstPeer == router.Ourself.Peer { - handleSpecialFrame(relayConn, sender, frame) - } - return - } - - if dstPeer != router.Ourself.Peer { - // it's not for us, we're just relaying it - router.LogFrame("Relaying", frame, dec) - err := router.Ourself.Relay(srcPeer, dstPeer, frame, dec) - if ftbe, ok := err.(FrameTooBigError); ok { - err = dec.sendICMPFragNeeded(ftbe.EPMTU, func(icmpFrame []byte) error { - return router.Ourself.Forward(srcPeer, icmpFrame, nil) - }) - } - - checkWarn(err) - return - } - srcMac := dec.Eth.SrcMAC - dstMac := dec.Eth.DstMAC + // At this point, it's either unicast to us, or a broadcast + // (because the DstPeer on a forwarded broadcast packet is + // always set to the peer being forwarded to) - if router.Macs.Enter(srcMac, srcPeer) { - log.Println("Discovered remote MAC", srcMac, "at", srcPeer) - } - if po != nil { - router.LogFrame("Injecting", frame, dec) - checkWarn(po.WritePacket(frame)) - } + srcMac := net.HardwareAddr(key.SrcMAC[:]) + dstMac := net.HardwareAddr(key.DstMAC[:]) + if router.Macs.AddForced(srcMac, key.SrcPeer) { + log.Println("Discovered remote MAC", srcMac, "at", key.SrcPeer) + } - dstPeer, found := router.Macs.Lookup(dstMac) - if !found || dstPeer != router.Ourself.Peer { - router.LogFrame("Relaying broadcast", frame, dec) - router.Ourself.RelayBroadcast(srcPeer, frame, dec) - } + router.PacketLogging.LogForwardPacket("Injecting", key) + injectFop := router.Bridge.InjectPacket(key.PacketKey) + dstPeer := router.Macs.Lookup(dstMac) + if dstPeer == router.Ourself.Peer { + return injectFop } -} -func handleSpecialFrame(relayConn *LocalConnection, sender *net.UDPAddr, frame []byte) { - frameLen := len(frame) + router.PacketLogging.LogForwardPacket("Relaying broadcast", key) + relayFop := router.Ourself.RelayBroadcast(key.SrcPeer, key.PacketKey) switch { - case frameLen == EthernetOverhead+8: - relayConn.ReceivedHeartbeat(sender, binary.BigEndian.Uint64(frame[EthernetOverhead:])) - case frameLen == FragTestSize && bytes.Equal(frame, FragTest): - relayConn.SendProtocolMsg(ProtocolMsg{ProtocolFragmentationReceived, nil}) - case frameLen == PMTUDiscoverySize && bytes.Equal(frame, PMTUDiscovery): + case injectFop == nil: + return relayFop + + case relayFop == nil: + return injectFop + default: - frameLenBytes := []byte{0, 0} - binary.BigEndian.PutUint16(frameLenBytes, uint16(frameLen-EthernetOverhead)) - relayConn.SendProtocolMsg(ProtocolMsg{ProtocolPMTUVerified, frameLenBytes}) + mfop := NewMultiFlowOp(false) + mfop.Add(injectFop) + mfop.Add(relayFop) + return mfop } } diff --git a/router/routes.go b/router/routes.go index bd8c3531e6..b463b658e6 100644 --- a/router/routes.go +++ b/router/routes.go @@ -5,30 +5,35 @@ import ( "sync" ) +type unicastRoutes map[PeerName]PeerName +type broadcastRoutes map[PeerName][]PeerName + type Routes struct { sync.RWMutex ourself *LocalPeer peers *Peers - unicast map[PeerName]PeerName - unicastAll map[PeerName]PeerName // [1] - broadcast map[PeerName][]PeerName - broadcastAll map[PeerName][]PeerName // [1] + onChange func() + unicast unicastRoutes + unicastAll unicastRoutes // [1] + broadcast broadcastRoutes + broadcastAll broadcastRoutes // [1] recalculate chan<- *struct{} wait chan<- chan struct{} // [1] based on *all* connections, not just established & // symmetric ones } -func NewRoutes(ourself *LocalPeer, peers *Peers) *Routes { +func NewRoutes(ourself *LocalPeer, peers *Peers, onChange func()) *Routes { recalculate := make(chan *struct{}, 1) wait := make(chan chan struct{}) routes := &Routes{ ourself: ourself, peers: peers, - unicast: make(map[PeerName]PeerName), - unicastAll: make(map[PeerName]PeerName), - broadcast: make(map[PeerName][]PeerName), - broadcastAll: make(map[PeerName][]PeerName), + onChange: onChange, + unicast: make(unicastRoutes), + unicastAll: make(unicastRoutes), + broadcast: make(broadcastRoutes), + broadcastAll: make(broadcastRoutes), recalculate: recalculate, wait: wait} routes.unicast[ourself.Name] = UnknownPeerName @@ -147,6 +152,8 @@ func (routes *Routes) calculate() { routes.peers.RLock() routes.ourself.RLock() var ( + oldUnicast = routes.unicast + oldBroadcast = routes.broadcast unicast = routes.calculateUnicast(true) unicastAll = routes.calculateUnicast(false) broadcast = routes.calculateBroadcast(true) @@ -161,6 +168,10 @@ func (routes *Routes) calculate() { routes.broadcast = broadcast routes.broadcastAll = broadcastAll routes.Unlock() + + if !unicast.equals(oldUnicast) || !broadcast.equals(oldBroadcast) { + routes.onChange() + } } // Calculate all the routes for the question: if *we* want to send a @@ -172,7 +183,7 @@ func (routes *Routes) calculate() { // any knowledge of the MAC address at all. Thus there's no need // to exchange knowledge of MAC addresses, nor any constraints on // the routes that we construct. -func (routes *Routes) calculateUnicast(establishedAndSymmetric bool) map[PeerName]PeerName { +func (routes *Routes) calculateUnicast(establishedAndSymmetric bool) unicastRoutes { _, unicast := routes.ourself.Routes(nil, establishedAndSymmetric) return unicast } @@ -195,8 +206,8 @@ func (routes *Routes) calculateUnicast(establishedAndSymmetric bool) map[PeerNam // Y =/= Z /\ X.Routes(Y) <= X.Routes(Z) => // X.Routes(Y) u [P | Y.HasSymmetricConnectionTo(P)] <= X.Routes(Z) // where <= is the subset relationship on keys of the returned map. -func (routes *Routes) calculateBroadcast(establishedAndSymmetric bool) map[PeerName][]PeerName { - broadcast := make(map[PeerName][]PeerName) +func (routes *Routes) calculateBroadcast(establishedAndSymmetric bool) broadcastRoutes { + broadcast := make(broadcastRoutes) for _, peer := range routes.peers.table { hops := []PeerName{} if found, reached := peer.Routes(routes.ourself.Peer, establishedAndSymmetric); found { @@ -207,3 +218,55 @@ func (routes *Routes) calculateBroadcast(establishedAndSymmetric bool) map[PeerN } return broadcast } + +func (a unicastRoutes) equals(b unicastRoutes) bool { + for key, aval := range a { + if bval, ok := b[key]; !ok || bval != aval { + return false + } + } + + for key := range b { + if _, ok := a[key]; !ok { + return false + } + } + + return true +} + +func (a broadcastRoutes) equals(b broadcastRoutes) bool { + set := make(map[PeerName]struct{}) + + for key, aval := range a { + bval, ok := b[key] + if !ok { + return false + } + + for _, peer := range aval { + set[peer] = struct{}{} + } + + for _, peer := range bval { + if _, ok := set[peer]; !ok { + return false + } + + delete(set, peer) + } + + if len(set) != 0 { + return false + } + + } + + for key := range b { + if _, ok := a[key]; !ok { + return false + } + } + + return true +} diff --git a/router/sleeve.go b/router/sleeve.go new file mode 100644 index 0000000000..8ab2b119ca --- /dev/null +++ b/router/sleeve.go @@ -0,0 +1,1144 @@ +// This contains the Overlay implementation for weave's own UDP +// encapsulation protocol ("sleeve" because a sleeve encapsulates +// something, it's often woven, it rhymes with "weave", make up your +// own cheesy reason). + +package router + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "net" + "sync" + "syscall" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +// This diagram explains the various arithmetic and variables related +// to packet offsets and lengths below: +// +// +----+-----+--------+--------+----------+--------------------------+ +// | IP | UDP | Sleeve | Sleeve | Overlay | Overlay Layer 3 Payload | +// | | | Packet | Frame | Ethernet | | +// | | | Header | Header | | | +// +----+-----+--------+--------+----------+--------------------------+ +// +// <------------------------------------ msgTooBigError.underlayPMTU -> +// +// <-------------------------- sleeveForwarder.maxPayload -> +// +// <----------> UDPOverhead +// +// <--------> Encryptor.PacketOverhead +// +// <--------> Encryptor.FrameOverhead +// +// <----------> EthernetOverhead +// +// <---------------------------------------> sleeveForwarder.overheadDF +// +// sleeveForwarder.mtu <--------------------------> + +const ( + EthernetOverhead = 14 + UDPOverhead = 28 // 20 bytes for IPv4, 8 bytes for UDP + DefaultMTU = 65535 + FragTestSize = 60001 + PMTUDiscoverySize = 60000 + FastHeartbeat = 500 * time.Millisecond + SlowHeartbeat = 10 * time.Second + FragTestInterval = 5 * time.Minute + MTUVerifyAttempts = 8 + MTUVerifyTimeout = 10 * time.Millisecond // doubled with each attempt + MaxMissedHeartbeats = 6 + HeartbeatTimeout = MaxMissedHeartbeats * SlowHeartbeat +) + +type SleeveOverlay struct { + localPort int + + // These fields are set in StartConsumingPackets, and not + // subsequently modified + localPeer *Peer + localPeerBin []byte + consumer OverlayConsumer + peers *Peers + conn *net.UDPConn + + lock sync.Mutex + forwarders map[PeerName]*sleeveForwarder +} + +func NewSleeveOverlay(localPort int) Overlay { + return &SleeveOverlay{localPort: localPort} +} + +func (sleeve *SleeveOverlay) StartConsumingPackets(localPeer *Peer, peers *Peers, + consumer OverlayConsumer) error { + localAddr, err := net.ResolveUDPAddr("udp4", + fmt.Sprint(":", sleeve.localPort)) + if err != nil { + return err + } + + conn, err := net.ListenUDP("udp4", localAddr) + if err != nil { + return err + } + + f, err := conn.File() + if err != nil { + return err + } + + defer f.Close() + fd := int(f.Fd()) + + // This makes sure all packets we send out do not have DF set + // on them. + err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, + syscall.IP_MTU_DISCOVER, syscall.IP_PMTUDISC_DONT) + if err != nil { + return err + } + + sleeve.lock.Lock() + defer sleeve.lock.Unlock() + + if sleeve.localPeer != nil { + conn.Close() + return fmt.Errorf("StartConsumingPackets already called") + } + + sleeve.localPeer = localPeer + sleeve.localPeerBin = localPeer.NameByte + sleeve.consumer = consumer + sleeve.peers = peers + sleeve.conn = conn + sleeve.forwarders = make(map[PeerName]*sleeveForwarder) + go sleeve.readUDP() + return nil +} + +func (*SleeveOverlay) InvalidateRoutes() { + // no cached information, so nothing to do +} + +func (sleeve *SleeveOverlay) lookupForwarder(peer PeerName) *sleeveForwarder { + sleeve.lock.Lock() + defer sleeve.lock.Unlock() + return sleeve.forwarders[peer] +} + +func (sleeve *SleeveOverlay) addForwarder(peer PeerName, fwd *sleeveForwarder) { + sleeve.lock.Lock() + defer sleeve.lock.Unlock() + sleeve.forwarders[peer] = fwd +} + +func (sleeve *SleeveOverlay) removeForwarder(peer PeerName, + fwd *sleeveForwarder) { + sleeve.lock.Lock() + defer sleeve.lock.Unlock() + if sleeve.forwarders[peer] == fwd { + delete(sleeve.forwarders, peer) + } +} + +func (sleeve *SleeveOverlay) readUDP() { + defer sleeve.conn.Close() + dec := NewEthernetDecoder() + buf := make([]byte, MaxUDPPacketSize) + + for { + n, sender, err := sleeve.conn.ReadFromUDP(buf) + if err == io.EOF { + return + } else if err != nil { + log.Print("ignoring UDP read error ", err) + continue + } else if n < NameSize { + log.Print("ignoring too short UDP packet from ", sender) + continue + } + + fwdName := PeerNameFromBin(buf[:NameSize]) + fwd := sleeve.lookupForwarder(fwdName) + if fwd == nil { + continue + } + + packet := make([]byte, n-NameSize) + copy(packet, buf[NameSize:n]) + + err = fwd.crypto.Dec.IterateFrames(packet, + func(src []byte, dst []byte, frame []byte) { + sleeve.handleFrame(sender, fwd, + src, dst, frame, dec) + }) + if err != nil { + // Errors during UDP packet decoding / + // processing are non-fatal. One common cause + // is that we receive and attempt to decrypt a + // "stray" packet. This can actually happen + // quite easily if there is some connection + // churn between two peers. After all, UDP + // isn't a connection-oriented protocol, yet + // we pretend it is. + // + // If anything really is seriously, + // unrecoverably amiss with a connection, that + // will typically result in missed heartbeats + // and the connection getting shut down + // because of that. + log.Print(fwd.logPrefixFor(sender), err) + } + } +} + +func (sleeve *SleeveOverlay) handleFrame(sender *net.UDPAddr, + fwd *sleeveForwarder, src []byte, dst []byte, frame []byte, + dec *EthernetDecoder) { + dec.DecodeLayers(frame) + decodedLen := len(dec.decoded) + if decodedLen == 0 { + return + } + + srcPeer := sleeve.peers.Fetch(PeerNameFromBin(src)) + dstPeer := sleeve.peers.Fetch(PeerNameFromBin(dst)) + if srcPeer == nil || dstPeer == nil { + return + } + + // Handle special frames produced internally (rather than + // captured/forwarded) by the remote router. + // + // We really shouldn't be decoding these above, since they are + // not genuine Ethernet frames. However, it is actually more + // efficient to do so, as we want to optimise for the common + // (i.e. non-special) frames. These always need decoding, and + // detecting special frames is cheaper post decoding than pre. + if decodedLen == 1 && dec.IsSpecial() { + if srcPeer == fwd.remotePeer && dstPeer == fwd.sleeve.localPeer { + select { + case fwd.specialChan <- specialFrame{sender, frame}: + case <-fwd.finishedChan: + } + } + + return + } + + sleeve.sendToConsumer(srcPeer, dstPeer, frame, dec) +} + +func (sleeve *SleeveOverlay) sendToConsumer(srcPeer, dstPeer *Peer, + frame []byte, dec *EthernetDecoder) { + if sleeve.consumer == nil { + return + } + + fop := sleeve.consumer(ForwardPacketKey{ + SrcPeer: srcPeer, + DstPeer: dstPeer, + PacketKey: dec.PacketKey(), + }) + if fop != nil { + fop.Process(frame, dec, false) + } +} + +type udpSender interface { + send([]byte, *net.UDPAddr) error +} + +func (sleeve *SleeveOverlay) send(msg []byte, raddr *net.UDPAddr) error { + sleeve.lock.Lock() + conn := sleeve.conn + sleeve.lock.Unlock() + + if conn == nil { + // Consume wasn't called yet + return nil + } + + _, err := conn.WriteToUDP(msg, raddr) + return err +} + +type sleeveForwarder struct { + // Immutable + sleeve *SleeveOverlay + remotePeer *Peer + remotePeerBin []byte + sendControlMsg func([]byte) error + connUID uint64 + + // Channels to communicate with the aggregator goroutine + aggregatorChan chan<- aggregatorFrame + aggregatorDFChan chan<- aggregatorFrame + specialChan chan<- specialFrame + confirmedChan chan<- struct{} + finishedChan <-chan struct{} + + // Explicitly locked state + lock sync.RWMutex + listener OverlayForwarderListener + remoteAddr *net.UDPAddr + + // These fields are accessed and updated independently, so no + // locking needed. + mtu int // the mtu for this link on the overlay network + stackFrag bool + + // State only used within the forwarder goroutine + crypto OverlayCrypto + senderDF *udpSenderDF + maxPayload int + + // How many bytes of overhead it takes to turn an IP packet on + // the overlay network into an encapsulated packet on the underlay + // network + overheadDF int + + heartbeatInterval time.Duration + heartbeatTimer *time.Timer + heartbeatTimeout *time.Timer + fragTestTicker *time.Ticker + ackedHeartbeat bool + + mtuTestTimeout *time.Timer + mtuTestsSent uint + mtuHighestGood int + mtuLowestBad int + mtuCandidate int +} + +type aggregatorFrame struct { + src []byte + dst []byte + frame []byte +} + +// A "special" message over UDP, or a control message. The sender is +// nil for control messages. +type specialFrame struct { + sender *net.UDPAddr + frame []byte +} + +func (sleeve *SleeveOverlay) MakeForwarder(params ForwarderParams) (OverlayForwarder, error) { + var crypto OverlayCrypto + if params.Crypto != nil { + crypto = *params.Crypto + } else { + name := sleeve.localPeer.NameByte + crypto = OverlayCrypto{ + Dec: NewNonDecryptor(), + Enc: NewNonEncryptor(name), + EncDF: NewNonEncryptor(name), + } + } + + aggChan := make(chan aggregatorFrame, ChannelSize) + aggDFChan := make(chan aggregatorFrame, ChannelSize) + specialChan := make(chan specialFrame, 1) + confirmedChan := make(chan struct{}) + finishedChan := make(chan struct{}) + + fwd := &sleeveForwarder{ + sleeve: sleeve, + remotePeer: params.RemotePeer, + remotePeerBin: params.RemotePeer.NameByte, + sendControlMsg: params.SendControlMessage, + connUID: params.ConnUID, + aggregatorChan: aggChan, + aggregatorDFChan: aggDFChan, + specialChan: specialChan, + confirmedChan: confirmedChan, + finishedChan: finishedChan, + remoteAddr: params.RemoteAddr, + mtu: DefaultMTU, + crypto: crypto, + maxPayload: DefaultMTU - UDPOverhead, + overheadDF: UDPOverhead + crypto.EncDF.PacketOverhead() + + crypto.EncDF.FrameOverhead() + EthernetOverhead, + senderDF: newUDPSenderDF(params.LocalIP, sleeve.localPort), + } + + go fwd.run(aggChan, aggDFChan, specialChan, confirmedChan, + finishedChan) + return fwd, nil +} + +func (fwd *sleeveForwarder) logPrefixFor(sender *net.UDPAddr) string { + return fmt.Sprintf("->[%s|%s]: ", sender, fwd.remotePeer) +} + +func (fwd *sleeveForwarder) logPrefix() string { + fwd.lock.RLock() + remoteAddr := fwd.remoteAddr + fwd.lock.RUnlock() + return fwd.logPrefixFor(remoteAddr) +} + +func (fwd *sleeveForwarder) SetListener(listener OverlayForwarderListener) { + log.Debug(fwd.logPrefix(), "SetListener ", listener) + + fwd.lock.Lock() + fwd.listener = listener + fwd.lock.Unlock() + + // Setting the listener confirms that the forwarder is really + // wanted + if listener != nil { + select { + case fwd.confirmedChan <- struct{}{}: + case <-fwd.finishedChan: + } + } +} + +type curriedForward struct { + fwd *sleeveForwarder + key ForwardPacketKey +} + +func (fwd *sleeveForwarder) Forward(key ForwardPacketKey) FlowOp { + return curriedForward{fwd, key} +} + +func (f curriedForward) Process(frame []byte, dec *EthernetDecoder, + broadcast bool) { + fwd := f.fwd + fwd.lock.RLock() + haveContact := (fwd.remoteAddr != nil) + mtu := fwd.mtu + stackFrag := fwd.stackFrag + fwd.lock.RUnlock() + + if !haveContact { + log.Print(fwd.logPrefix(), + "Cannot forward frame yet - awaiting contact") + return + } + + srcName := f.key.SrcPeer.NameByte + dstName := f.key.DstPeer.NameByte + + // We could use non-blocking channel sends here, i.e. drop frames + // on the floor when the forwarder is busy. This would allow our + // caller - the capturing loop in the router - to read frames more + // quickly when under load, i.e. we'd drop fewer frames on the + // floor during capture. And we could maximise CPU utilisation + // since we aren't stalling a thread. However, a lot of work has + // already been done by the time we get here. Since any packet we + // drop will likely get re-transmitted we end up paying that cost + // multiple times. So it's better to drop things at the beginning + // of our pipeline. + if dec.DF() { + if !frameTooBig(frame, mtu) { + fwd.aggregate(fwd.aggregatorDFChan, srcName, dstName, + frame) + return + } + + // Why do we need an explicit broadcast hint here, + // rather than just checking the frame for a broadcast + // destination MAC address? Because even + // non-broadcast frames can be broadcast, if the + // destination MAC was not in our MAC cache. + if broadcast { + log.Print(fwd.logPrefix(), "dropping too big DF broadcast frame (", dec.IP.DstIP, " -> ", dec.IP.SrcIP, "): MTU=", mtu) + return + } + + // Send an ICMP back to where the frame came from + fragNeededPacket, err := dec.makeICMPFragNeeded(mtu) + if err != nil { + log.Print(fwd.logPrefix(), err) + return + } + + dec.DecodeLayers(fragNeededPacket) + + // The frag-needed packet does not have DF set, so the + // potential recursion here is bounded. + fwd.sleeve.sendToConsumer(f.key.DstPeer, f.key.SrcPeer, + fragNeededPacket, dec) + return + } + + if stackFrag || len(dec.decoded) < 2 { + fwd.aggregate(fwd.aggregatorChan, srcName, dstName, frame) + return + } + + // Don't have trustworthy stack, so we're going to have to + // send it DF in any case. + if !frameTooBig(frame, mtu) { + fwd.aggregate(fwd.aggregatorDFChan, srcName, dstName, frame) + return + } + + // We can't trust the stack to fragment, we have IP, and we + // have a frame that's too big for the MTU, so we have to + // fragment it ourself. + checkWarn(fragment(dec.Eth, dec.IP, mtu, + func(segFrame []byte) { + fwd.aggregate(fwd.aggregatorDFChan, srcName, dstName, + segFrame) + })) +} + +func (fwd *sleeveForwarder) aggregate(ch chan<- aggregatorFrame, src []byte, + dst []byte, frame []byte) { + select { + case ch <- aggregatorFrame{src, dst, frame}: + case <-fwd.finishedChan: + } +} + +func fragment(eth layers.Ethernet, ip layers.IPv4, mtu int, + forward func([]byte)) error { + // We are not doing any sort of NAT, so we don't need to worry + // about checksums of IP payload (eg UDP checksum). + headerSize := int(ip.IHL) * 4 + // &^ is bit clear (AND NOT). So here we're clearing the lowest 3 + // bits. + maxSegmentSize := (mtu - headerSize) &^ 7 + opts := gopacket.SerializeOptions{ + FixLengths: false, + ComputeChecksums: true} + payloadSize := int(ip.Length) - headerSize + payload := ip.BaseLayer.Payload[:payloadSize] + offsetBase := int(ip.FragOffset) << 3 + origFlags := ip.Flags + ip.Flags = ip.Flags | layers.IPv4MoreFragments + ip.Length = uint16(headerSize + maxSegmentSize) + if eth.EthernetType == layers.EthernetTypeLLC { + // using LLC, so must set eth length correctly. eth length + // is just the length of the payload + eth.Length = ip.Length + } else { + eth.Length = 0 + } + for offset := 0; offset < payloadSize; offset += maxSegmentSize { + var segmentPayload []byte + if len(payload) <= maxSegmentSize { + // last one + segmentPayload = payload + ip.Length = uint16(len(payload) + headerSize) + ip.Flags = origFlags + if eth.EthernetType == layers.EthernetTypeLLC { + eth.Length = ip.Length + } else { + eth.Length = 0 + } + } else { + segmentPayload = payload[:maxSegmentSize] + payload = payload[maxSegmentSize:] + } + ip.FragOffset = uint16((offset + offsetBase) >> 3) + buf := gopacket.NewSerializeBuffer() + segPayload := gopacket.Payload(segmentPayload) + err := gopacket.SerializeLayers(buf, opts, ð, &ip, + &segPayload) + if err != nil { + return err + } + + forward(buf.Bytes()) + } + return nil +} + +func frameTooBig(frame []byte, mtu int) bool { + // We capture/forward complete ethernet frames. Therefore the + // frame length includes the ethernet header. However, MTUs + // operate at the IP layer and thus do not include the ethernet + // header. To put it another way, when a sender that was told an + // MTU of M sends an IP packet of exactly that length, we will + // capture/forward M + EthernetOverhead bytes of data. + return len(frame) > mtu+EthernetOverhead +} + +func (fwd *sleeveForwarder) ControlMessage(msg []byte) { + select { + case fwd.specialChan <- specialFrame{nil, msg}: + case <-fwd.finishedChan: + } +} + +func (fwd *sleeveForwarder) Stop() { + fwd.sleeve.removeForwarder(fwd.remotePeer.Name, fwd) + fwd.SetListener(nil) + + // Tell the forwarder goroutine to finish. We don't need to + // wait for it. + close(fwd.confirmedChan) +} + +func (fwd *sleeveForwarder) run(aggChan <-chan aggregatorFrame, + aggDFChan <-chan aggregatorFrame, + specialChan <-chan specialFrame, + confirmedChan <-chan struct{}, + finishedChan chan<- struct{}) { + defer close(finishedChan) + + var err error +loop: + for err == nil { + select { + case frame := <-aggChan: + err = fwd.aggregateAndSend(frame, aggChan, + fwd.crypto.Enc, fwd.sleeve, + MaxUDPPacketSize-UDPOverhead) + + case frame := <-aggDFChan: + err = fwd.aggregateAndSend(frame, aggDFChan, + fwd.crypto.EncDF, fwd.senderDF, fwd.maxPayload) + + case special := <-specialChan: + if special.sender == nil { + // Control messages are sent on specialChan, + // with a nil sender + err = fwd.handleControlMsg(special.frame) + } else { + err = fwd.handleSpecialFrame(special) + } + + case _, ok := <-confirmedChan: + if !ok { + // confirmedChan is closed to indicate + // the forwarder is being closed + break loop + } + + err = fwd.confirmed() + + case <-timerChan(fwd.heartbeatTimer): + err = fwd.sendHeartbeat() + + case <-timerChan(fwd.heartbeatTimeout): + err = fmt.Errorf("timed out waiting for UDP heartbeat") + + case <-tickerChan(fwd.fragTestTicker): + err = fwd.sendFragTest() + + case <-timerChan(fwd.mtuTestTimeout): + err = fwd.handleMTUTestFailure() + } + } + + if fwd.heartbeatTimer != nil { + fwd.heartbeatTimer.Stop() + } + if fwd.heartbeatTimeout != nil { + fwd.heartbeatTimeout.Stop() + } + if fwd.fragTestTicker != nil { + fwd.fragTestTicker.Stop() + } + if fwd.mtuTestTimeout != nil { + fwd.mtuTestTimeout.Stop() + } + + checkWarn(fwd.senderDF.close()) + + fwd.lock.RLock() + defer fwd.lock.RUnlock() + if fwd.listener != nil { + fwd.listener.Error(err) + } +} + +func (fwd *sleeveForwarder) aggregateAndSend(frame aggregatorFrame, + aggChan <-chan aggregatorFrame, enc Encryptor, sender udpSender, + limit int) error { + // Give up after processing N frames, to avoid starving the + // other activities of the forwarder goroutine. + i := 0 + + for { + // Adding the first frame to an empty buffer + if !fits(frame, enc, limit) { + log.Print(fwd.logPrefix(), "Dropping too big frame during forwarding: frame len ", len(frame.frame), ", limit ", limit) + return nil + } + + for { + enc.AppendFrame(frame.src, frame.dst, frame.frame) + i++ + + gotOne := false + if i < 100 { + select { + case frame = <-aggChan: + gotOne = true + default: + } + } + + if !gotOne { + return fwd.flushEncryptor(enc, sender) + } + + // Accumulate frames until doing so would + // exceed the MTU. Even in the non-DF case, + // it doesn't seem worth adding a frame where + // that would lead to fragmentation, + // potentially delaying or risking other + // frames. + if !fits(frame, enc, fwd.maxPayload) { + break + } + } + + if err := fwd.flushEncryptor(enc, sender); err != nil { + return err + } + } +} + +func fits(frame aggregatorFrame, enc Encryptor, limit int) bool { + return enc.TotalLen()+enc.FrameOverhead()+len(frame.frame) <= limit +} + +func (fwd *sleeveForwarder) flushEncryptor(enc Encryptor, + sender udpSender) error { + msg, err := enc.Bytes() + if err != nil { + return err + } + + return fwd.processSendError(sender.send(msg, fwd.remoteAddr)) +} + +func (fwd *sleeveForwarder) sendSpecial(enc Encryptor, sender udpSender, + data []byte) error { + enc.AppendFrame(fwd.sleeve.localPeerBin, fwd.remotePeerBin, data) + return fwd.flushEncryptor(enc, sender) +} + +func (fwd *sleeveForwarder) handleSpecialFrame(special specialFrame) error { + // The special frame types are distinguished by length + switch len(special.frame) { + case EthernetOverhead + 8: + return fwd.handleHeartbeat(special) + + case FragTestSize: + return fwd.handleFragTest(special.frame) + + default: + return fwd.handleMTUTest(special.frame) + } +} + +const ( + HeartbeatAck = iota + FragTestAck + MTUTestAck +) + +func (fwd *sleeveForwarder) handleControlMsg(msg []byte) error { + if len(msg) == 0 { + log.Print(fwd.logPrefix(), + "Received zero-length control message") + return nil + } + + switch msg[0] { + case HeartbeatAck: + return fwd.handleHeartbeatAck() + + case FragTestAck: + return fwd.handleFragTestAck() + + case MTUTestAck: + return fwd.handleMTUTestAck(msg) + + default: + log.Print(fwd.logPrefix(), + "Ignoring unknown control message: ", msg[0]) + return nil + } +} + +func (fwd *sleeveForwarder) confirmed() error { + log.Debug(fwd.logPrefix(), "confirmed") + + if fwd.heartbeatInterval != 0 { + // already confirmed + return nil + } + + // when the connection is confirmed, this should be the only + // forwarder to the peer. + fwd.sleeve.addForwarder(fwd.remotePeer.Name, fwd) + + // heartbeatInterval flags that we want to send heartbeats, + // even if we don't do sendHeartbeat() yet due to lacking the + // remote address. + fwd.heartbeatInterval = FastHeartbeat + if fwd.remoteAddr != nil { + if err := fwd.sendHeartbeat(); err != nil { + return err + } + } + + fwd.heartbeatTimeout = time.NewTimer(HeartbeatTimeout) + return nil +} + +func (fwd *sleeveForwarder) sendHeartbeat() error { + log.Debug(fwd.logPrefix(), "sendHeartbeat") + + // Prime the timer for the next heartbeat. We don't use a + // ticker because the interval is not constant. + fwd.heartbeatTimer = setTimer(fwd.heartbeatTimer, fwd.heartbeatInterval) + + buf := make([]byte, EthernetOverhead+8) + binary.BigEndian.PutUint64(buf[EthernetOverhead:], fwd.connUID) + return fwd.sendSpecial(fwd.crypto.EncDF, fwd.senderDF, buf) +} + +func (fwd *sleeveForwarder) handleHeartbeat(special specialFrame) error { + uid := binary.BigEndian.Uint64(special.frame[EthernetOverhead:]) + if uid != fwd.connUID { + return nil + } + + log.Debug(fwd.logPrefix(), "handleHeartbeat") + + if fwd.remoteAddr == nil { + fwd.setRemoteAddr(special.sender) + if fwd.heartbeatInterval != 0 { + if err := fwd.sendHeartbeat(); err != nil { + return err + } + } + } else if !udpAddrsEqual(fwd.remoteAddr, special.sender) { + log.Print(fwd.logPrefix(), + "Peer UDP address changed to ", special.sender) + fwd.setRemoteAddr(special.sender) + } + + if !fwd.ackedHeartbeat { + fwd.ackedHeartbeat = true + if err := fwd.sendControlMsg([]byte{HeartbeatAck}); err != nil { + return err + } + } + + // we can receive a heartbeat before confirmed() has set up + // heartbeatTimeout + if fwd.heartbeatTimeout != nil { + fwd.heartbeatTimeout.Reset(HeartbeatTimeout) + } + + return nil +} + +func (fwd *sleeveForwarder) setRemoteAddr(addr *net.UDPAddr) { + // remoteAddr is only modified here, so we don't need to hold + // the lock when reading it from the forwarder goroutine. But + // other threads may read it while holding the read lock, so + // when we modify it, we need to hold the write lock. + fwd.lock.Lock() + fwd.remoteAddr = addr + fwd.lock.Unlock() +} + +func (fwd *sleeveForwarder) handleHeartbeatAck() error { + // The connection is now regarded as established + fwd.notifyEstablished() + + if fwd.heartbeatInterval != SlowHeartbeat { + fwd.heartbeatInterval = SlowHeartbeat + if fwd.heartbeatTimer != nil { + fwd.heartbeatTimer.Reset(fwd.heartbeatInterval) + } + } + + fwd.fragTestTicker = time.NewTicker(FragTestInterval) + if err := fwd.sendFragTest(); err != nil { + return err + } + + // Send a large frame down the DF channel. An EMSGSIZE will + // result, which is handled in processSendError, prompting + // PMTU discovery to start. + return fwd.sendSpecial(fwd.crypto.EncDF, fwd.senderDF, + make([]byte, PMTUDiscoverySize)) +} + +func (fwd *sleeveForwarder) notifyEstablished() { + fwd.lock.RLock() + defer fwd.lock.RUnlock() + if fwd.listener != nil { + fwd.listener.Established() + } +} + +func (fwd *sleeveForwarder) sendFragTest() error { + log.Debug(fwd.logPrefix(), "sendFragTest") + fwd.stackFrag = false + return fwd.sendSpecial(fwd.crypto.Enc, fwd.sleeve, + make([]byte, FragTestSize)) +} + +func (fwd *sleeveForwarder) handleFragTest(frame []byte) error { + if !allZeros(frame) { + return nil + } + + return fwd.sendControlMsg([]byte{FragTestAck}) +} + +func (fwd *sleeveForwarder) handleFragTestAck() error { + log.Debug(fwd.logPrefix(), "handleFragTestAck") + fwd.stackFrag = true + return nil +} + +func (fwd *sleeveForwarder) processSendError(err error) error { + if mtbe, ok := err.(msgTooBigError); ok { + mtu := mtbe.underlayPMTU - fwd.overheadDF + if fwd.mtuCandidate != 0 && mtu >= fwd.mtuCandidate { + return nil + } + + fwd.mtuHighestGood = 8 + fwd.mtuLowestBad = mtu + 1 + fwd.mtuCandidate = mtu + fwd.mtuTestsSent = 0 + fwd.maxPayload = mtbe.underlayPMTU - UDPOverhead + fwd.mtu = mtu + return fwd.sendMTUTest() + } + + return err +} + +func (fwd *sleeveForwarder) sendMTUTest() error { + log.Debug(fwd.logPrefix(), + "sendMTUTest: mtu candidate ", fwd.mtuCandidate) + err := fwd.sendSpecial(fwd.crypto.EncDF, fwd.senderDF, + make([]byte, fwd.mtuCandidate+EthernetOverhead)) + if err != nil { + return err + } + + fwd.mtuTestTimeout = setTimer(fwd.mtuTestTimeout, + MTUVerifyTimeout<= fwd.mtuLowestBad { + mtu := fwd.mtuHighestGood + log.Print(fwd.logPrefix(), "Effective MTU verified at ", mtu) + + if fwd.mtuTestTimeout != nil { + fwd.mtuTestTimeout.Stop() + fwd.mtuTestTimeout = nil + } + + fwd.mtuCandidate = 0 + fwd.maxPayload = mtu + fwd.overheadDF - UDPOverhead + fwd.mtu = mtu + return nil + } + + fwd.mtuCandidate = (fwd.mtuHighestGood + fwd.mtuLowestBad) / 2 + fwd.mtuTestsSent = 0 + return fwd.sendMTUTest() +} + +type udpSenderDF struct { + ipBuf gopacket.SerializeBuffer + opts gopacket.SerializeOptions + udpHeader *layers.UDP + localIP net.IP + remoteIP net.IP + socket *net.IPConn +} + +func newUDPSenderDF(localIP net.IP, localPort int) *udpSenderDF { + return &udpSenderDF{ + ipBuf: gopacket.NewSerializeBuffer(), + opts: gopacket.SerializeOptions{ + FixLengths: true, + // UDP header is calculated with a phantom IP + // header. Yes, it's totally nuts. Thankfully, + // for UDP over IPv4, the checksum is + // optional. It's not optional for IPv6, but + // we'll ignore that for now. TODO + ComputeChecksums: false, + }, + udpHeader: &layers.UDP{SrcPort: layers.UDPPort(localPort)}, + localIP: localIP, + } +} + +func (sender *udpSenderDF) dial() error { + if sender.socket != nil { + if err := sender.socket.Close(); err != nil { + return err + } + + sender.socket = nil + } + + laddr := &net.IPAddr{IP: sender.localIP} + raddr := &net.IPAddr{IP: sender.remoteIP} + s, err := net.DialIP("ip4:UDP", laddr, raddr) + + f, err := s.File() + if err != nil { + return err + } + + defer f.Close() + + // This makes sure all packets we send out have DF set on them. + err = syscall.SetsockoptInt(int(f.Fd()), syscall.IPPROTO_IP, + syscall.IP_MTU_DISCOVER, syscall.IP_PMTUDISC_DO) + if err != nil { + return err + } + + sender.socket = s + return nil +} + +func (sender *udpSenderDF) send(msg []byte, raddr *net.UDPAddr) error { + // Ensure we have a socket sending to the right IP address + if sender.socket == nil || !bytes.Equal(sender.remoteIP, raddr.IP) { + sender.remoteIP = raddr.IP + if err := sender.dial(); err != nil { + return err + } + } + + sender.udpHeader.DstPort = layers.UDPPort(raddr.Port) + payload := gopacket.Payload(msg) + err := gopacket.SerializeLayers(sender.ipBuf, sender.opts, + sender.udpHeader, &payload) + if err != nil { + return err + } + + packet := sender.ipBuf.Bytes() + _, err = sender.socket.Write(packet) + if err == nil || PosixError(err) != syscall.EMSGSIZE { + return err + } + + f, err := sender.socket.File() + if err != nil { + return err + } + defer f.Close() + + log.Print("EMSGSIZE on send, expecting PMTU update (IP packet was ", + len(packet), " bytes, payload was ", len(msg), " bytes)") + pmtu, err := syscall.GetsockoptInt(int(f.Fd()), syscall.IPPROTO_IP, + syscall.IP_MTU) + if err != nil { + return err + } + + return msgTooBigError{underlayPMTU: pmtu} +} + +type msgTooBigError struct { + underlayPMTU int // actual pmtu, i.e. what the kernel told us +} + +func (mtbe msgTooBigError) Error() string { + return fmt.Sprint("Msg too big error. PMTU is ", mtbe.underlayPMTU) +} + +func (sender *udpSenderDF) close() error { + if sender.socket == nil { + return nil + } + + return sender.socket.Close() +} + +func udpAddrsEqual(a *net.UDPAddr, b *net.UDPAddr) bool { + return bytes.Equal(a.IP, b.IP) && a.Port == b.Port && a.Zone == b.Zone +} + +func allZeros(s []byte) bool { + for _, b := range s { + if b != byte(0) { + return false + } + } + + return true +} + +func setTimer(timer *time.Timer, d time.Duration) *time.Timer { + if timer == nil { + return time.NewTimer(d) + } + + timer.Reset(d) + return timer + +} + +func timerChan(timer *time.Timer) <-chan time.Time { + if timer != nil { + return timer.C + } + return nil +} diff --git a/router/status.go b/router/status.go index 833c24d196..c1486c77ec 100644 --- a/router/status.go +++ b/router/status.go @@ -63,14 +63,6 @@ type LocalConnectionStatus struct { } func NewStatus(router *Router) *Status { - var ifaceName string - if router.Iface != nil { - ifaceName = router.Iface.Name - } - var captureStats map[string]int - if router.PacketSource != nil { - captureStats = router.PacketSource.Stats() - } return &Status{ Protocol, ProtocolMinVersion, @@ -80,8 +72,8 @@ func NewStatus(router *Router) *Status { router.Ourself.Name.String(), router.Ourself.NickName, router.Port, - ifaceName, - captureStats, + router.Bridge.String(), + router.Bridge.Stats(), NewMACStatusSlice(router.Macs), NewPeerStatusSlice(router.Peers), NewUnicastRouteStatusSlice(router.Routes), @@ -127,7 +119,7 @@ func NewPeerStatusSlice(peers *Peers) []PeerStatus { peer.Name.String(), peer.NickName, peer.UID, - peer.version, + peer.Version, connections}) }) diff --git a/router/udp_sender.go b/router/udp_sender.go deleted file mode 100644 index a80268bf96..0000000000 --- a/router/udp_sender.go +++ /dev/null @@ -1,138 +0,0 @@ -package router - -import ( - "net" - "syscall" - - "github.com/google/gopacket" - "github.com/google/gopacket/layers" -) - -type UDPSender interface { - Send([]byte) error - Shutdown() error -} - -type SimpleUDPSender struct { - conn *LocalConnection - udpConn *net.UDPConn -} - -type RawUDPSender struct { - ipBuf gopacket.SerializeBuffer - opts gopacket.SerializeOptions - udpHeader *layers.UDP - socket *net.IPConn - conn *LocalConnection -} - -type MsgTooBigError struct { - PMTU int // actual pmtu, i.e. what the kernel told us -} - -func NewSimpleUDPSender(conn *LocalConnection) *SimpleUDPSender { - return &SimpleUDPSender{udpConn: conn.Router.UDPListener, conn: conn} -} - -func (sender *SimpleUDPSender) Send(msg []byte) error { - _, err := sender.udpConn.WriteToUDP(msg, sender.conn.RemoteUDPAddr()) - return err -} - -func (sender *SimpleUDPSender) Shutdown() error { - return nil -} - -func NewRawUDPSender(conn *LocalConnection) (*RawUDPSender, error) { - ipSocket, err := dialIP(conn) - if err != nil { - return nil, err - } - udpHeader := &layers.UDP{SrcPort: layers.UDPPort(conn.Router.Port)} - ipBuf := gopacket.NewSerializeBuffer() - opts := gopacket.SerializeOptions{ - FixLengths: true, - // UDP header is calculated with a phantom IP - // header. Yes, it's totally nuts. Thankfully, for UDP - // over IPv4, the checksum is optional. It's not - // optional for IPv6, but we'll ignore that for - // now. TODO - ComputeChecksums: false} - - return &RawUDPSender{ - ipBuf: ipBuf, - opts: opts, - udpHeader: udpHeader, - socket: ipSocket, - conn: conn}, nil -} - -func (sender *RawUDPSender) Send(msg []byte) error { - payload := gopacket.Payload(msg) - sender.udpHeader.DstPort = layers.UDPPort(sender.conn.RemoteUDPAddr().Port) - - err := gopacket.SerializeLayers(sender.ipBuf, sender.opts, sender.udpHeader, &payload) - if err != nil { - return err - } - packet := sender.ipBuf.Bytes() - _, err = sender.socket.Write(packet) - if err == nil || PosixError(err) != syscall.EMSGSIZE { - return err - } - f, err := sender.socket.File() - if err != nil { - return err - } - defer f.Close() - fd := int(f.Fd()) - log.Println("EMSGSIZE on send, expecting PMTU update (IP packet was", - len(packet), "bytes, payload was", len(msg), "bytes)") - pmtu, err := syscall.GetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_MTU) - if err != nil { - return err - } - return MsgTooBigError{PMTU: pmtu} -} - -func (sender *RawUDPSender) Shutdown() error { - defer func() { sender.socket = nil }() - return sender.socket.Close() -} - -func dialIP(conn *LocalConnection) (*net.IPConn, error) { - ipLocalAddr, err := ipAddr(conn.TCPConn.LocalAddr()) - if err != nil { - return nil, err - } - ipRemoteAddr, err := ipAddr(conn.TCPConn.RemoteAddr()) - if err != nil { - return nil, err - } - ipSocket, err := net.DialIP("ip4:UDP", ipLocalAddr, ipRemoteAddr) - if err != nil { - return nil, err - } - f, err := ipSocket.File() - if err != nil { - return nil, err - } - defer f.Close() - fd := int(f.Fd()) - // This Makes sure all packets we send out have DF set on them. - err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_MTU_DISCOVER, syscall.IP_PMTUDISC_DO) - if err != nil { - return nil, err - } - return ipSocket, nil -} - -func ipAddr(addr net.Addr) (*net.IPAddr, error) { - host, _, err := net.SplitHostPort(addr.String()) - if err != nil { - return nil, err - } - return &net.IPAddr{ - IP: net.ParseIP(host), - Zone: ""}, nil -} diff --git a/router/utils.go b/router/utils.go index 26b59f3423..24ea5d760e 100644 --- a/router/utils.go +++ b/router/utils.go @@ -42,14 +42,6 @@ func PosixError(err error) error { return err } -func (mtbe MsgTooBigError) Error() string { - return fmt.Sprint("Msg too big error. PMTU is ", mtbe.PMTU) -} - -func (ftbe FrameTooBigError) Error() string { - return fmt.Sprint("Frame too big error. Effective PMTU is ", ftbe.EPMTU) -} - func (upe UnknownPeerError) Error() string { return fmt.Sprint("Reference to unknown peer ", upe.Name) } diff --git a/test/config.sh b/test/config.sh index 1ab62c3db6..1d9e6b49ab 100644 --- a/test/config.sh +++ b/test/config.sh @@ -34,7 +34,7 @@ HOST3=$(echo $HOSTS | cut -f 3 -d ' ') SSH_DIR=${SSH_DIR:-$DIR} SSH=${SSH:-ssh -l vagrant -i "$SSH_DIR/insecure_private_key" -o "UserKnownHostsFile=$SSH_DIR/.ssh_known_hosts" -o CheckHostIP=no -o StrictHostKeyChecking=no} -SMALL_IMAGE="gliderlabs/alpine" +SMALL_IMAGE="alpine" DNS_IMAGE="aanand/docker-dnsutils" TEST_IMAGES="$SMALL_IMAGE $DNS_IMAGE"