Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/telemetry): implement telemetry message network_state #1618

Merged
merged 32 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fd1024c
refactor telemetry messages to map format
edwardmack May 31, 2021
d867d2f
add basic network state telemetry message
edwardmack Jun 1, 2021
17aba94
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 11, 2021
cd3abbb
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 11, 2021
74abc1b
refactor message sender to handle interface{} types
edwardmack Jun 11, 2021
6997fa1
refactor telemetry messages to be structs
edwardmack Jun 15, 2021
90506c0
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 15, 2021
9b4eb7a
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 16, 2021
7c00e1e
lint
edwardmack Jun 16, 2021
5ceddd9
go fmt
edwardmack Jun 16, 2021
266c3dd
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 18, 2021
529bac0
lint
edwardmack Jun 18, 2021
6eb7b63
move msg building logic outside msg sending loop
edwardmack Jun 18, 2021
4e1f92b
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 19, 2021
13024dd
make telemetry messages an interface
edwardmack Jun 19, 2021
7263912
Lookup transactions count from TransactionsState
edwardmack Jun 19, 2021
6f07942
address comments
edwardmack Jun 19, 2021
545c25c
fix mocks for tests
edwardmack Jun 21, 2021
17bd211
lint
edwardmack Jun 21, 2021
ac904b0
refactor TelemetryMessage to Message
edwardmack Jun 21, 2021
b0e43fa
update mock handler to return result
edwardmack Jun 22, 2021
3247f7e
add TransactionsCount to mockhandler
edwardmack Jun 22, 2021
ffc8428
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 23, 2021
cf87b93
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 23, 2021
9ab5339
move logic to build new network state message
edwardmack Jun 23, 2021
08fd38a
lint
edwardmack Jun 23, 2021
e5c4de9
fix interface
edwardmack Jun 23, 2021
02f03db
update mockhandler
edwardmack Jun 24, 2021
0428696
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 24, 2021
a160f58
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 24, 2021
a257def
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 28, 2021
0321eca
lint
edwardmack Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions chain/dev/genesis.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
"id": "dev",
"chainType": "Local",
"bootNodes": [],
"telemetryEndpoints": null,
"telemetryEndpoints": [
[
"wss://telemetry.polkadot.io/submit/",
0
]
],
"protocolId": "/gossamer/dev/0",
"genesis": {
"raw": {
Expand Down Expand Up @@ -32,4 +37,4 @@
"forkBlocks": null,
"badBlocks": null,
"consensusEngine": ""
}
}
8 changes: 7 additions & 1 deletion chain/gssmr/genesis.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
"id": "gssmr",
"chainType": "Local",
"bootNodes": [],
"telemetryEndpoints": [
[
"wss://telemetry.polkadot.io/submit/",
0
]
],
"protocolId": "/gossamer/gssmr/0",
"genesis": {
"raw": {
Expand Down Expand Up @@ -40,4 +46,4 @@
"forkBlocks": null,
"badBlocks": null,
"consensusEngine": ""
}
}
35 changes: 35 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ func (s *Service) logPeerCount() {
}
}

type peerInfo struct {
Roles byte `json:"roles"`
BestHash string `json:"bestHash"`
BestNumber uint64 `json:"bestNumber"`
}

func (s *Service) publishNetworkTelemetry(done chan interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering why this uses done chan instead of the service ctx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following suggestions from this thread: #1528 (comment)

ticker := time.NewTicker(s.telemetryInterval)
defer ticker.Stop()
Expand All @@ -321,6 +327,35 @@ main:
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
netState := make(map[string]interface{})
netState["peerId"] = s.host.h.ID()
hostAddrs := []string{}
for _, v := range s.host.h.Addrs() {
hostAddrs = append(hostAddrs, v.String())
}
netState["externalAddressess"] = hostAddrs
listAddrs := []string{}
for _, v := range s.host.h.Network().ListenAddresses() {
listAddrs = append(listAddrs, v.String())
}
netState["listenedAddressess"] = listAddrs

Copy link
Contributor

@noot noot Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expecting the multiaddress including peer ID? if so you will need to use something likenthis to create the listen address strings:

fmt.Sprintf("%s/p2p/%s", multiaddress, id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

peers := make(map[string]interface{})
for _, v := range s.Peers() {
p := &peerInfo{
Roles: v.Roles,
BestHash: v.BestHash.String(),
BestNumber: v.BestNumber,
}
peers[v.PeerID] = *p
}
netState["connectedPeers"] = peers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this code be moved into NewNetworkStateTM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would make more sense there, moved.

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at the msgToBytes method it seems all that does is json.Marshal() the map[string]interface{}. So there's no real need to construct these messages as map[string]interface{}. You could essentially just pass in structs. So maybe in this case:

type netState struct {
    PeerID string `json:"peerId"`
    ExternalAddresses []string `json:"externalAddresses"`
    ListenedAddresses []string `json:"listenedAddresses"`  
}

You could create a constructor for this type as well to handle the population of the fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactor this to use structs instead of maps. Good idea.

telemetry.NewKeyValue("msg", "system.network_state"),
telemetry.NewKeyValue("state", netState)))
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
}
}
}
Expand Down
14 changes: 2 additions & 12 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,9 @@ func (h *Handler) startListening() {
}
}

type response struct {
ID int `json:"id"`
Payload map[string]interface{} `json:"payload"`
Timestamp time.Time `json:"ts"`
}

func msgToBytes(message Message) []byte {
res := response{
ID: 1, // todo (ed) determine how this is used
Payload: message.values,
Timestamp: time.Now(),
}
resB, err := json.Marshal(res)
message.values["ts"] = time.Now()
resB, err := json.Marshal(message.values)
if err != nil {
return nil
}
Expand Down
19 changes: 15 additions & 4 deletions dot/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package telemetry

import (
"bytes"
"fmt"
"log"
"math/big"
"net/http"
Expand Down Expand Up @@ -89,10 +90,10 @@ func TestHandler_SendMulti(t *testing.T) {

wg.Wait()

expected1 := []byte(`{"id":1,"payload":{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1},"ts":`)
expected2 := []byte(`{"id":1,"payload":{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync"},"ts":`)
expected3 := []byte(`{"id":1,"payload":{"authority":false,"chain":"chain","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","version":"version"},"ts":`)
expected4 := []byte(`{"id":1,"payload":{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","txcount":2,"used_state_cache_size":1886357},"ts":`) // nolint
expected1 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`)
expected2 := []byte(`{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`)
expected3 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`)
expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint

expected := [][]byte{expected3, expected1, expected4, expected2}

Expand Down Expand Up @@ -139,6 +140,16 @@ func TestListenerConcurrency(t *testing.T) {
}
}

// TestInfiniteListener starts loop that print out data received on websocket ws://localhost:8001/
// this can be useful to see what data is sent to telemetry server
func TestInfiniteListener(t *testing.T) {
t.Skip()
resultCh = make(chan []byte)
for data := range resultCh {
fmt.Printf("Data %s\n", data)
}
}

func listen(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
Expand Down