diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index 6087bc97ab92..0ad2b7cf3a45 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -153,11 +153,7 @@ func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConf for i := 0; len(onNodes) > 0 && i < nodesDown; i++ { c := rand.Intn(len(onNodes)) sn := onNodes[c] - err := eventer.Post(&NodeEvent{ - Type: "node", - Action: "down", - node: sn, - }) + err := eventer.Post(sn.EmitEvent(ControlEvent)) if err != nil { panic(err.Error()) } @@ -168,11 +164,7 @@ func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConf for i := 0; len(offNodes) > 0 && i < nodesUp; i++ { c := rand.Intn(len(offNodes)) sn := &Node{Id: offNodes[c]} - err := eventer.Post(&NodeEvent{ - Type: "node", - Action: "up", - node: sn, - }) + err := eventer.Post(sn.EmitEvent(ControlEvent)) if err != nil { panic(err.Error()) } @@ -231,11 +223,7 @@ func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConf lab := ConnLabel(sc.One, sc.Other) onConnsMap[lab] = len(onConns) onConns = append(onConns, sc) - err := eventer.Post(&ConnEvent{ - Type: "conn", - Action: "up", - conn: sc, - }) + err := eventer.Post(sc.EmitEvent(ControlEvent)) if err != nil { panic(err.Error()) } @@ -247,11 +235,7 @@ func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConf onConns = append(onConns[0:c], onConns[c+1:]...) lab := ConnLabel(conn.One, conn.Other) delete(onConnsMap, lab) - err := eventer.Post(&ConnEvent{ - Type: "conn", - Action: "down", - conn: conn, - }) + err := eventer.Post(conn.EmitEvent(ControlEvent)) if err != nil { panic(err.Error()) } diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index a04eb29dcc5b..5e2bba1079fe 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -58,7 +58,9 @@ type NetworkControl interface { // event types related to connectivity, i.e., nodes coming on dropping off // and connections established and dropped -var ConnectivityEvents = []interface{}{&NodeEvent{}, &ConnEvent{}, &MsgEvent{}} +var ConnectivityControlEvents = []interface{}{&NodeControlEvent{}, &ConnControlEvent{}, &MsgControlEvent{}} +var ConnectivityLiveEvents = []interface{}{&NodeEvent{}, &ConnEvent{}, &MsgEvent{}} +var ConnectivityAllEvents = append(ConnectivityControlEvents, ConnectivityLiveEvents...) type NetworkController struct { *ResourceController @@ -70,7 +72,7 @@ type NetworkController struct { // stream of Server-Sent-Events, with each event being a JSON encoded SimUpdate // object func (n *NetworkController) ServeStream(w http.ResponseWriter, req *http.Request) { - sub := n.events.Subscribe(ConnectivityEvents...) + sub := n.events.Subscribe(ConnectivityAllEvents...) defer sub.Unsubscribe() // stop the stream if the client goes away @@ -126,12 +128,12 @@ func NewNetworkController(net NetworkControl, nodesController *ResourceControlle eventer := &event.TypeMux{} conf := net.Config() if conf.Backend { - journal.Subscribe(net.Events(), ConnectivityEvents...) + journal.Subscribe(net.Events(), ConnectivityAllEvents...) // the network can subscribe to the eventer fed by mockers and players - net.Subscribe(eventer, ConnectivityEvents...) + net.Subscribe(eventer, ConnectivityAllEvents...) } else { // alternatively mocked and replayed events bypass the simulation network backend - journal.Subscribe(eventer, ConnectivityEvents...) + journal.Subscribe(eventer, ConnectivityAllEvents...) } self := NewResourceContoller( &ResourceHandlers{ @@ -285,37 +287,62 @@ func (self *Network) Subscribe(eventer *event.TypeMux, types ...interface{}) { }() } +func (self *Network) executeNodeEvent(ne *NodeControlEvent) { + if ne.Up { + err := self.NewNode(&NodeConfig{Id: ne.Node.Id}) + if err != nil { + log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) + } + err = self.Start(ne.Node.Id) + if err != nil { + log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) + } + } else { + err := self.Stop(ne.Node.Id) + if err != nil { + log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) + } + } + ne.Node.controlFired = ne.Up +} + +func (self *Network) executeConnEvent(ce *ConnControlEvent) { + if ce.Up { + err := self.Connect(ce.Connection.One, ce.Connection.Other) + if err != nil { + log.Trace(fmt.Sprintf("error execute event %v: %v", ce, err)) + } + } else { + err := self.Disconnect(ce.Connection.One, ce.Connection.Other) + if err != nil { + log.Trace(fmt.Sprintf("error execute event %v: %v", ce, err)) + } + } + ce.Connection.controlFired = ce.Up +} + func (self *Network) execute(in *event.TypeMuxEvent) { log.Trace(fmt.Sprintf("execute event %v", in)) ev := in.Data if ne, ok := ev.(*NodeEvent); ok { - if ne.Action == "up" { - err := self.NewNode(&NodeConfig{Id: ne.node.Id}) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) - } - err = self.Start(ne.node.Id) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) - } + if ne.Up && ne.Node.controlFired || (!ne.Up && !ne.Node.controlFired) { + log.Trace(fmt.Sprintf("Got NodeEvent %v, but Control Event has already been applied for : %v", ne, ne.Node)) + //ignore this real event; control event already took care of this } else { - err := self.Stop(ne.node.Id) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) - } + self.executeNodeEvent(ne.ToControlEvent()) } } else if ce, ok := ev.(*ConnEvent); ok { - if ce.Action == "up" { - err := self.Connect(ce.conn.One, ce.conn.Other) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) - } + if ce.Up && ce.Connection.controlFired || (!ce.Up && !ce.Connection.controlFired) { + log.Trace(fmt.Sprintf("Got ConnEvent %v, but Control Event has already been applied for : %v", ce, ce.Connection)) + //ignore this real event; control event already took care of this } else { - err := self.Disconnect(ce.conn.One, ce.conn.Other) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) - } + self.executeConnEvent(ce.ToControlEvent()) } + } + if ne, ok := ev.(*NodeControlEvent); ok { + self.executeNodeEvent(ne) + } else if ce, ok := ev.(*ConnControlEvent); ok { + self.executeConnEvent(ce) } else { log.Trace(fmt.Sprintf("event: %#v", ev)) panic("unhandled event") @@ -327,113 +354,160 @@ func (self *Network) Events() *event.TypeMux { return self.events } -type Node struct { - Id *adapters.NodeId `json:"id"` - Up bool - config *NodeConfig - na adapters.NodeAdapter +type EventType int + +const ( + ControlEvent EventType = iota + LiveEvent +) + +type EventEmitter interface { + EmitEvent() } -func (self *Node) Adapter() adapters.NodeAdapter { - return self.na +type LiveEventer interface { + ToControlEvent() +} + +type Node struct { + Id *adapters.NodeId `json:"id"` + Up bool + config *NodeConfig + na adapters.NodeAdapter + controlFired bool } func (self *Node) String() string { return fmt.Sprintf("Node %v", self.Id.Label()) } +// active connections are represented by the Node entry object so that +// you journal updates could filter if passive knowledge about peers is +// irrelevant +type Conn struct { + One *adapters.NodeId `json:"one"` + Other *adapters.NodeId `json:"other"` + one, other *Node + // connection down by default + Up bool `json:"up"` + // reverse is false by default (One dialled/dropped the Other) + Reverse bool `json:"reverse"` + // Info + // average throughput, recent average throughput etc + controlFired bool +} + +func (self *Conn) String() string { + return fmt.Sprintf("Conn %v->%v", self.One.Label(), self.Other.Label()) +} + +type Msg struct { + One *adapters.NodeId `json:"one"` + Other *adapters.NodeId `json:"other"` + Code uint64 `json:"conn"` + controlFired bool +} + +func (self *Msg) String() string { + return fmt.Sprintf("Msg(%d) %v->%v", self.Code, self.One.Label(), self.Other.Label()) +} + type NodeEvent struct { - Action string - Type string - node *Node + Node *Node + Up bool } type ConnEvent struct { - Action string - Type string - conn *Conn + Connection *Conn + Up bool + Reverse bool } type MsgEvent struct { - Action string - Type string - msg *Msg + Message *Msg } -func (self *ConnEvent) String() string { - return fmt.Sprintf("\n", self.Action, self.Type, self.conn) +type NodeControlEvent struct { + *NodeEvent +} + +type ConnControlEvent struct { + *ConnEvent +} + +type MsgControlEvent struct { + *MsgEvent } func (self *NodeEvent) String() string { - return fmt.Sprintf("\n", self.Action, self.Type, self.node) + return fmt.Sprintf("\n", self.Up, self.Node) +} + +func (self *ConnEvent) String() string { + return fmt.Sprintf("\n", self.Up, self.Reverse, self.Connection) } func (self *MsgEvent) String() string { - return fmt.Sprintf("\n", self.Action, self.Type, self.msg) + return fmt.Sprintf("\n", self.Message) } -func (self *Node) event(up bool) *NodeEvent { - var action string - if up { - action = "up" - } else { - action = "down" +func (self *Node) EmitEvent(eventType EventType) interface{} { + evt := &NodeEvent{ + Node: self, + Up: self.Up, } - return &NodeEvent{ - Action: action, - Type: "node", - node: self, + if eventType == ControlEvent { + return &NodeControlEvent{ + evt, + } + } else { + return evt } } -// active connections are represented by the Node entry object so that -// you journal updates could filter if passive knowledge about peers is -// irrelevant -type Conn struct { - One *adapters.NodeId `json:"one"` - Other *adapters.NodeId `json:"other"` - one, other *Node - // connection down by default - Up bool `json:"up"` - // reverse is false by default (One dialled/dropped the Other) - Reverse bool `json:"reverse"` - // Info - // average throughput, recent average throughput etc -} +func (self *Conn) EmitEvent(eventType EventType) interface{} { + evt := &ConnEvent{ + Connection: self, + Up: self.Up, + Reverse: self.Reverse, + } -func (self *Conn) String() string { - return fmt.Sprintf("Conn %v->%v", self.One.Label(), self.Other.Label()) + if eventType == ControlEvent { + return &ConnControlEvent{ + evt, + } + } else { + return evt + } } -func (self *Conn) event(up, rev bool) *ConnEvent { - var action string - if up { - action = "up" - } else { - action = "down" +func (self *Msg) EmitEvent(eventType EventType) interface{} { + evt := &MsgEvent{ + Message: self, } - return &ConnEvent{ - Action: action, - Type: "conn", - conn: self, + if eventType == ControlEvent { + return &MsgControlEvent{ + evt, + } + } else { + return evt } } -type Msg struct { - One *adapters.NodeId `json:"one"` - Other *adapters.NodeId `json:"other"` - Code uint64 `json:"conn"` +func (self *MsgEvent) ToControlEvent() *MsgControlEvent { + return &MsgControlEvent{self} } -func (self *Msg) String() string { - return fmt.Sprintf("Msg(%d) %v->%v", self.Code, self.One.Label(), self.Other.Label()) +func (self *ConnEvent) ToControlEvent() *ConnControlEvent { + return &ConnControlEvent{self} } -func (self *Msg) event() *MsgEvent { - return &MsgEvent{ - Action: "up", - msg: self, - } +func (self *NodeEvent) ToControlEvent() *NodeControlEvent { + return &NodeControlEvent{self} +} + +func (self *Node) Adapter() adapters.NodeAdapter { + return self.na } type NodeConfig struct { @@ -528,11 +602,7 @@ func (self *Network) Start(id *adapters.NodeId) error { node.Up = true log.Info(fmt.Sprintf("started node %v: %v", id, node.Up)) - self.events.Post(&NodeEvent{ - Action: "up", - Type: "node", - node: node, - }) + self.events.Post(node.EmitEvent(ControlEvent)) return nil } @@ -551,11 +621,7 @@ func (self *Network) Stop(id *adapters.NodeId) error { node.Up = false log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up)) - self.events.Post(&NodeEvent{ - Action: "down", - Type: "node", - node: node, - }) + self.events.Post(node.EmitEvent(ControlEvent)) return nil } @@ -595,6 +661,7 @@ func (self *Network) Connect(oneId, otherId *adapters.NodeId) error { if err != nil { return err } + self.events.Post(conn.EmitEvent(ControlEvent)) return client.Call(nil, "admin_addPeer", string(addr)) } @@ -628,12 +695,13 @@ func (self *Network) Disconnect(oneId, otherId *adapters.NodeId) error { if err != nil { return err } + self.events.Post(conn.EmitEvent(ControlEvent)) return client.Call(nil, "admin_removePeer", string(addr)) } func (self *Network) DidConnect(one, other *adapters.NodeId) error { - conn := self.GetConn(one, other) - if conn == nil { + conn, err := self.GetOrCreateConn(one, other) + if err != nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } if conn.Up { @@ -642,13 +710,13 @@ func (self *Network) DidConnect(one, other *adapters.NodeId) error { conn.Reverse = conn.One.NodeID != one.NodeID conn.Up = true // connection event posted - self.events.Post(conn.event(true, conn.Reverse)) + self.events.Post(conn.EmitEvent(LiveEvent)) return nil } func (self *Network) DidDisconnect(one, other *adapters.NodeId) error { - conn := self.GetConn(one, other) - if conn == nil { + conn, err := self.GetOrCreateConn(one, other) + if err != nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } if !conn.Up { @@ -656,7 +724,7 @@ func (self *Network) DidDisconnect(one, other *adapters.NodeId) error { } conn.Reverse = conn.One.NodeID != one.NodeID conn.Up = false - self.events.Post(conn.event(false, conn.Reverse)) + self.events.Post(conn.EmitEvent(LiveEvent)) return nil } @@ -668,7 +736,7 @@ func (self *Network) Send(senderid, receiverid *adapters.NodeId, msgcode uint64, Code: msgcode, } //self.GetNode(senderid).na.(*adapters.SimNode).GetPeer(receiverid).SendMsg(msgcode, protomsg) // phew! - self.events.Post(msg.event()) // should also include send status maybe + self.events.Post(msg.EmitEvent(ControlEvent)) } // GetNodeAdapter(id) returns the NodeAdapter for node with id diff --git a/p2p/simulations/sim_events.go b/p2p/simulations/sim_events.go index 1a472ddff46d..e17091a22fb5 100644 --- a/p2p/simulations/sim_events.go +++ b/p2p/simulations/sim_events.go @@ -19,8 +19,9 @@ type SimData struct { type SimElement struct { Data *SimData `json:"data"` - Classes string `json:"classes,omitempty"` - Group string `json:"group"` + Classes string `json:"classes,omitempty"` + Group string `json:"group"` + Control bool `json:"control"` // selected: false, // whether the element is selected (default false) // selectable: true, // whether the selection state is mutable (default true) // locked: false, // when locked a node's position is immutable (default false) @@ -37,21 +38,65 @@ func NewSimUpdate(e *event.TypeMuxEvent) (*SimUpdate, error) { var update SimUpdate var el *SimElement entry := e.Data - var action string - if ev, ok := entry.(*NodeEvent); ok { - el = &SimElement{Group: "nodes", Data: &SimData{Id: ev.node.Id.String()}} - action = ev.Action - } else if ev, ok := entry.(*MsgEvent); ok { - msg := ev.msg + + switch entry.(type) { + case *NodeControlEvent, *NodeEvent: + var data *SimData + var control bool + nce, ok := entry.(*NodeControlEvent) + if ok { + data = &SimData{Id: nce.Node.Id.String()} + data.Up = nce.Up + control = true + } else { + ne := entry.(*NodeEvent) + data = &SimData{Id: ne.Node.Id.String()} + data.Up = ne.Up + control = false + } + el = &SimElement{Group: "nodes", Data: data} + el.Control = control + if el.Data.Up { + update.Add = append(update.Add, el) + } else { + update.Remove = append(update.Remove, el) + } + case *MsgControlEvent, *MsgEvent: + var control bool + var msg *Msg + mce, ok := entry.(*MsgControlEvent) + if ok { + msg = mce.Message + control = true + } else { + me := entry.(*MsgEvent) + msg = me.Message + control = false + } id := ConnLabel(msg.One, msg.Other) var source, target string source = msg.One.String() target = msg.Other.String() el = &SimElement{Group: "msgs", Data: &SimData{Id: id, Source: source, Target: target}} - action = ev.Action - } else if ev, ok := entry.(*ConnEvent); ok { + el.Data.Up = true + el.Control = control + update.Message = append(update.Message, el) + case *ConnControlEvent, *ConnEvent: + var control bool + var conn *Conn + var up bool + cce, ok := entry.(*ConnControlEvent) + if ok { + conn = cce.Connection + up = cce.Up + control = true + } else { + ce := entry.(*ConnEvent) + conn = ce.Connection + up = ce.Up + control = false + } // mutually exclusive directed edge (caller -> callee) - conn := ev.conn id := ConnLabel(conn.One, conn.Other) var source, target string if conn.Reverse { @@ -62,23 +107,15 @@ func NewSimUpdate(e *event.TypeMuxEvent) (*SimUpdate, error) { target = conn.Other.String() } el = &SimElement{Group: "edges", Data: &SimData{Id: id, Source: source, Target: target}} - action = ev.Action - } else { - return nil, fmt.Errorf("unknown event type: %T", entry) - } - - switch action { - case "up": - el.Data.Up = true - update.Add = append(update.Add, el) - case "down": - el.Data.Up = false - update.Remove = append(update.Remove, el) - case "msg": - el.Data.Up = true - update.Message = append(update.Message, el) + el.Control = control + el.Data.Up = up + if up { + update.Add = append(update.Add, el) + } else { + update.Remove = append(update.Remove, el) + } default: - return nil, fmt.Errorf("unknown action: %q", action) + return nil, fmt.Errorf("unknown event type: %T", entry) } return &update, nil diff --git a/p2p/simulations/simulation.go b/p2p/simulations/simulation.go index 41f955bc8cfd..b7fdbcf61a68 100644 --- a/p2p/simulations/simulation.go +++ b/p2p/simulations/simulation.go @@ -68,7 +68,7 @@ func (s *Simulation) Run(ctx context.Context, step *Step) (result *StepResult) { func (s *Simulation) watchNetwork(result *StepResult) func() { stop := make(chan struct{}) done := make(chan struct{}) - sub := s.network.Events().Subscribe(ConnectivityEvents...) + sub := s.network.Events().Subscribe(ConnectivityAllEvents...) go func() { defer close(done) defer sub.Unsubscribe()