Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/telemetry): implement telemetry message network_state #1618

Merged
merged 32 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fd1024c
refactor telemetry messages to map format
edwardmack May 31, 2021
d867d2f
add basic network state telemetry message
edwardmack Jun 1, 2021
17aba94
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 11, 2021
cd3abbb
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 11, 2021
74abc1b
refactor message sender to handle interface{} types
edwardmack Jun 11, 2021
6997fa1
refactor telemetry messages to be structs
edwardmack Jun 15, 2021
90506c0
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 15, 2021
9b4eb7a
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 16, 2021
7c00e1e
lint
edwardmack Jun 16, 2021
5ceddd9
go fmt
edwardmack Jun 16, 2021
266c3dd
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 18, 2021
529bac0
lint
edwardmack Jun 18, 2021
6eb7b63
move msg building logic outside msg sending loop
edwardmack Jun 18, 2021
4e1f92b
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 19, 2021
13024dd
make telemetry messages an interface
edwardmack Jun 19, 2021
7263912
Lookup transactions count from TransactionsState
edwardmack Jun 19, 2021
6f07942
address comments
edwardmack Jun 19, 2021
545c25c
fix mocks for tests
edwardmack Jun 21, 2021
17bd211
lint
edwardmack Jun 21, 2021
ac904b0
refactor TelemetryMessage to Message
edwardmack Jun 21, 2021
b0e43fa
update mock handler to return result
edwardmack Jun 22, 2021
3247f7e
add TransactionsCount to mockhandler
edwardmack Jun 22, 2021
ffc8428
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 23, 2021
cf87b93
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 23, 2021
9ab5339
move logic to build new network state message
edwardmack Jun 23, 2021
08fd38a
lint
edwardmack Jun 23, 2021
e5c4de9
fix interface
edwardmack Jun 23, 2021
02f03db
update mockhandler
edwardmack Jun 24, 2021
0428696
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 24, 2021
a160f58
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 24, 2021
a257def
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 28, 2021
0321eca
lint
edwardmack Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 1 addition & 32 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package network
import (
"context"
"errors"
"fmt"
"io"
"math/big"
"os"
Expand Down Expand Up @@ -305,12 +304,6 @@ func (s *Service) logPeerCount() {
}
}

type peerInfo struct {
Roles byte `json:"roles"`
BestHash string `json:"bestHash"`
BestNumber uint64 `json:"bestNumber"`
}

func (s *Service) publishNetworkTelemetry(done chan interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering why this uses done chan instead of the service ctx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following suggestions from this thread: #1528 (comment)

ticker := time.NewTicker(s.telemetryInterval)
defer ticker.Stop()
Expand All @@ -324,35 +317,11 @@ main:
case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
err := telemetry.GetInstance().SendMessage(telemetry.NewBandwidthTM(o.RateIn, o.RateOut, s.host.peerCount()))

if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
netState := make(map[string]interface{})
netState["peerId"] = s.host.h.ID()
hostAddrs := []string{}
for _, v := range s.host.h.Addrs() {
hostAddrs = append(hostAddrs, v.String())
}
netState["externalAddressess"] = hostAddrs
listAddrs := []string{}
for _, v := range s.host.h.Network().ListenAddresses() {
listAddrs = append(listAddrs, fmt.Sprintf("%s/p2p/%s", v, s.host.h.ID()))
}
netState["listenedAddressess"] = listAddrs

peers := make(map[string]interface{})
for _, v := range s.Peers() {
p := &peerInfo{
Roles: v.Roles,
BestHash: v.BestHash.String(),
BestNumber: v.BestNumber,
}
peers[v.PeerID] = *p
}
netState["connectedPeers"] = peers

err = telemetry.GetInstance().SendMessage(telemetry.NewNetworkStateTM(netState))
err = telemetry.GetInstance().SendMessage(telemetry.NewNetworkStateTM(s.host.h, s.Peers()))
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ type Syncer interface {

// TransactionHandler is the interface used by the transactions sub-protocol
type TransactionHandler interface {
HandleTransactionMessage(*TransactionMessage) error
HandleTransactionMessage(*TransactionMessage) (bool, error)
TransactionsCount() int
}
2 changes: 1 addition & 1 deletion dot/network/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestDecodeTransactionMessage(t *testing.T) {
func TestHandleTransactionMessage(t *testing.T) {
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(nil)
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)

config := &Config{
Expand Down
38 changes: 35 additions & 3 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package telemetry
import (
"encoding/json"
"errors"
"fmt"
"math/big"
"sync"
"time"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/ChainSafe/gossamer/lib/genesis"
log "github.com/ChainSafe/log15"
"github.com/gorilla/websocket"
libp2phost "github.com/libp2p/go-libp2p-core/host"
)

type telemetryConnection struct {
Expand Down Expand Up @@ -95,7 +97,7 @@ func (h *Handler) startListening() {
msg := <-h.msg
go func() {
msgBytes, err := h.msgToJSON(msg)
if err != nil || len(msgBytes) == 0 {
if err != nil {
h.log.Debug("issue decoding telemetry message", "error", err)
return
}
Expand Down Expand Up @@ -236,17 +238,47 @@ func (tm *SystemIntervalTM) messageType() string {
return tm.Msg
}

type peerInfo struct {
Roles byte `json:"roles"`
BestHash string `json:"bestHash"`
BestNumber uint64 `json:"bestNumber"`
}

// NetworkStateTM struct to hold network state telemetry messages
type NetworkStateTM struct {
Msg string `json:"msg"`
State map[string]interface{} `json:"state"`
}

// NewNetworkStateTM function to create new Network State Telemetry Message
func NewNetworkStateTM(state map[string]interface{}) *NetworkStateTM {
func NewNetworkStateTM(host libp2phost.Host, peerInfos []common.PeerInfo) *NetworkStateTM {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewNetworkStateTM(host libp2phost.Host, peerInfos []common.PeerInfo) *NetworkStateTM {
func NewNetworkStateTM(host libp2phost.Host, peerInfo []common.PeerInfo) *NetworkStateTM {

netState := make(map[string]interface{})
netState["peerId"] = host.ID()
hostAddrs := []string{}
for _, v := range host.Addrs() {
hostAddrs = append(hostAddrs, v.String())
}
netState["externalAddressess"] = hostAddrs
listAddrs := []string{}
for _, v := range host.Network().ListenAddresses() {
listAddrs = append(listAddrs, fmt.Sprintf("%s/p2p/%s", v, host.ID()))
}
netState["listenedAddressess"] = listAddrs

peers := make(map[string]interface{})
for _, v := range peerInfos {
p := &peerInfo{
Roles: v.Roles,
BestHash: v.BestHash.String(),
BestNumber: v.BestNumber,
}
peers[v.PeerID] = *p
}
netState["connectedPeers"] = peers

return &NetworkStateTM{
Msg: "system.network_state",
State: state,
State: netState,
}
}
func (tm *NetworkStateTM) messageType() string {
Expand Down