Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-2840 Add basic monitoring for telemetry client #14661

Merged
merged 14 commits into from
Oct 8, 2024
5 changes: 5 additions & 0 deletions .changeset/giant-pillows-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Add prometheus metrics exposing health of telemetry client
34 changes: 34 additions & 0 deletions core/services/synchronization/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package synchronization

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
emate marked this conversation as resolved.
Show resolved Hide resolved
TelemetryClientConnectionStatus = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "telemetry_client_connection_status",
Help: "Status of the connection to the telemetry ingress server",
}, []string{"endpoint"})
)

var (
TelemetryClientMessagesSent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_messages_sent",
Help: "Number of telemetry messages sent to the telemetry ingress server",
}, []string{"endpoint", "telemetry_type"})
)

var (
TelemetryClientMessagesDropped = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_messages_dropped",
Help: "Number of telemetry messages dropped",
}, []string{"endpoint", "worker_name"})
)

var (
TelemetryClientWorkers = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "telemetry_client_workers",
Help: "Number of telemetry workers",
}, []string{"endpoint"})
)
46 changes: 46 additions & 0 deletions core/services/synchronization/telemetry_ingress_batch_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

"github.com/smartcontractkit/wsrpc"
"github.com/smartcontractkit/wsrpc/examples/simple/keys"
"google.golang.org/grpc/connectivity"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand Down Expand Up @@ -60,6 +61,8 @@
workersMutex sync.Mutex

useUniConn bool

healthMonitorCancel context.CancelFunc
}

// NewTelemetryIngressBatchClient returns a client backed by wsrpc that
Expand Down Expand Up @@ -127,14 +130,56 @@
}
tc.telemClient = telemPb.NewTelemClient(conn)
tc.closeFn = func() error { conn.Close(); return nil }
tc.startHealthMonitoring(ctx, conn)
}
}

return nil
}

// startHealthMonitoring starts a goroutine to monitor the connection state and update other relevant metrics every 5 seconds
func (tc *telemetryIngressBatchClient) startHealthMonitoring(ctx context.Context, conn *wsrpc.ClientConn) {
ctx, cancel := context.WithCancel(ctx)

Check failure on line 142 in core/services/synchronization/telemetry_ingress_batch_client.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to ctx (ineffassign)
emate marked this conversation as resolved.
Show resolved Hide resolved
tc.healthMonitorCancel = cancel

tc.eng.Go(func(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Check the connection state
state := conn.GetState()
if state == connectivity.Ready {
emate marked this conversation as resolved.
Show resolved Hide resolved
TelemetryClientConnectionStatus.WithLabelValues(tc.url.String()).Set(1)
tc.connected.Store(true)
} else {
TelemetryClientConnectionStatus.WithLabelValues(tc.url.String()).Set(0)
tc.connected.Store(false)
}

// Report number of workers
tc.workersMutex.Lock()
emate marked this conversation as resolved.
Show resolved Hide resolved
TelemetryClientWorkers.WithLabelValues(tc.url.String()).Set(float64(len(tc.workers)))
emate marked this conversation as resolved.
Show resolved Hide resolved

// Report number of dropped messages
for workerName, worker := range tc.workers {
TelemetryClientMessagesDropped.WithLabelValues(tc.url.String(), workerName).Add((float64(worker.dropMessageCount.Load())))
emate marked this conversation as resolved.
Show resolved Hide resolved
}
tc.workersMutex.Unlock()
case <-ctx.Done():
return
}
}
})
}

// Close disconnects the wsrpc client from the ingress server and waits for all workers to exit
func (tc *telemetryIngressBatchClient) close() error {
if tc.healthMonitorCancel != nil {
tc.healthMonitorCancel()
}
if (tc.useUniConn && tc.connected.Load()) || !tc.useUniConn {
return tc.closeFn()
}
Expand Down Expand Up @@ -172,6 +217,7 @@
select {
case worker.chTelemetry <- payload:
worker.dropMessageCount.Store(0)
TelemetryClientMessagesSent.WithLabelValues(tc.url.String(), string(payload.TelemType)).Inc()
emate marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
return
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand Down Expand Up @@ -86,3 +87,46 @@ func TestTelemetryIngressBatchClient_HappyPath(t *testing.T) {
return []uint32{contractCounter1.Load(), contractCounter3.Load()}
}).Should(gomega.Equal([]uint32{3, 1}))
}

func TestTelemetryIngressBatchClient_startHealthMonitoring(t *testing.T) {
g := gomega.NewWithT(t)

// Create mocks
telemClient := mocks.NewTelemClient(t)
csaKeystore := new(ksmocks.CSA)

// Set mock handlers for keystore
key := cltest.DefaultCSAKey
keyList := []csakey.KeyV2{key}
csaKeystore.On("GetAll").Return(keyList, nil)

// Wire up the telem ingress client
url := &url.URL{}
serverPubKeyHex := "33333333333"
sendInterval := time.Millisecond * 5
telemIngressClient := synchronization.NewTestTelemetryIngressBatchClient(t, url, serverPubKeyHex, csaKeystore, false, telemClient, sendInterval, false)
servicetest.Run(t, telemIngressClient)

// Wait for the health monitoring to update metrics
time.Sleep(6 * time.Second)

// Check the connection status metric
g.Eventually(func() float64 {
return testutil.ToFloat64(synchronization.TelemetryClientConnectionStatus.WithLabelValues(url.String()))
}).Should(gomega.Equal(1.0))

// Check the number of workers metric
g.Eventually(func() float64 {
return testutil.ToFloat64(synchronization.TelemetryClientWorkers.WithLabelValues(url.String()))
}).Should(gomega.Equal(0.0))

// Check the number of dropped messages metric
g.Eventually(func() float64 {
return testutil.ToFloat64(synchronization.TelemetryClientMessagesDropped.WithLabelValues(url.String(), ""))
}).Should(gomega.Equal(0.0))

// Check the number of sent messages metric
g.Eventually(func() float64 {
return testutil.ToFloat64(synchronization.TelemetryClientMessagesSent.WithLabelValues(url.String(), ""))
}).Should(gomega.Equal(0.0))
}
Loading