diff --git a/cmd/agent/app/reporter/connect_metrics.go b/cmd/agent/app/reporter/connect_metrics.go new file mode 100644 index 00000000000..6efe555a6ba --- /dev/null +++ b/cmd/agent/app/reporter/connect_metrics.go @@ -0,0 +1,54 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" +) + +type connectMetrics struct { + // used for reflect current connection stability + Reconnects metrics.Counter `metric:"collector_reconnects" help:"Number of successful connections (including reconnects) to the collector."` + + // Connection status that jaeger-agent to jaeger-collector, 1 is connected, 0 is disconnected + Status metrics.Gauge `metric:"collector_connected" help:"Status of connection between the agent and the collector; 1 is connected, 0 is disconnected"` +} + +// ConnectMetrics include connectMetrics necessary params if want to modify metrics of connectMetrics, must via ConnectMetrics API +type ConnectMetrics struct { + Logger *zap.Logger // required + MetricsFactory metrics.Factory // required + connectMetrics *connectMetrics +} + +// NewConnectMetrics will be initialize ConnectMetrics +func (r *ConnectMetrics) NewConnectMetrics() { + r.connectMetrics = new(connectMetrics) + r.MetricsFactory = r.MetricsFactory.Namespace(metrics.NSOptions{Name: "connection_status"}) + metrics.MustInit(r.connectMetrics, r.MetricsFactory, nil) +} + +// OnConnectionStatusChange used for pass the status parameter when connection is changed +// 0 is disconnected, 1 is connected +// For quick view status via use `sum(jaeger_agent_connection_status_collector_connected{}) by (instance) > bool 0` +func (r *ConnectMetrics) OnConnectionStatusChange(connected bool) { + if connected { + r.connectMetrics.Status.Update(1) + r.connectMetrics.Reconnects.Inc(1) + } else { + r.connectMetrics.Status.Update(0) + } +} diff --git a/cmd/agent/app/reporter/connect_metrics_test.go b/cmd/agent/app/reporter/connect_metrics_test.go new file mode 100644 index 00000000000..09bec30987e --- /dev/null +++ b/cmd/agent/app/reporter/connect_metrics_test.go @@ -0,0 +1,81 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics/metricstest" +) + +type connectMetricsTest struct { + mf *metricstest.Factory +} + +func testConnectMetrics(fn func(tr *connectMetricsTest, r *ConnectMetrics)) { + testConnectMetricsWithParams(&ConnectMetrics{}, fn) +} + +func testConnectMetricsWithParams(cm *ConnectMetrics, fn func(tr *connectMetricsTest, r *ConnectMetrics)) { + mf := metricstest.NewFactory(time.Hour) + cm.MetricsFactory = mf + cm.NewConnectMetrics() + + tr := &connectMetricsTest{ + mf: mf, + } + + fn(tr, cm) +} + +func testCollectorConnected(r *ConnectMetrics) { + r.OnConnectionStatusChange(true) +} + +func testCollectorAborted(r *ConnectMetrics) { + r.OnConnectionStatusChange(false) +} + +func TestConnectMetrics(t *testing.T) { + + testConnectMetrics(func(tr *connectMetricsTest, r *ConnectMetrics) { + getGauge := func() map[string]int64 { + _, gauges := tr.mf.Snapshot() + return gauges + } + + getCount := func() map[string]int64 { + counts, _ := tr.mf.Snapshot() + return counts + } + + // testing connect aborted + testCollectorAborted(r) + assert.EqualValues(t, 0, getGauge()["connection_status.collector_connected"]) + + // testing connect connected + testCollectorConnected(r) + assert.EqualValues(t, 1, getGauge()["connection_status.collector_connected"]) + assert.EqualValues(t, 1, getCount()["connection_status.collector_reconnects"]) + + // testing reconnect counts + testCollectorAborted(r) + testCollectorConnected(r) + assert.EqualValues(t, 2, getCount()["connection_status.collector_reconnects"]) + + }) +} diff --git a/cmd/agent/app/reporter/grpc/builder.go b/cmd/agent/app/reporter/grpc/builder.go index d1a2a04ec04..91dc6171efa 100644 --- a/cmd/agent/app/reporter/grpc/builder.go +++ b/cmd/agent/app/reporter/grpc/builder.go @@ -17,16 +17,20 @@ package grpc import ( "context" "errors" + "expvar" "fmt" "strings" - grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + "github.com/grpc-ecosystem/go-grpc-middleware/retry" + "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/discovery" "github.com/jaegertracing/jaeger/pkg/discovery/grpcresolver" @@ -43,6 +47,9 @@ type ConnBuilder struct { DiscoveryMinPeers int Notifier discovery.Notifier Discoverer discovery.Discoverer + + // for unit test and provide ConnectMetrics and outside call + ConnectMetrics *reporter.ConnectMetrics } // NewConnBuilder creates a new grpc connection builder. @@ -51,7 +58,7 @@ func NewConnBuilder() *ConnBuilder { } // CreateConnection creates the gRPC connection -func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, error) { +func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Factory) (*grpc.ClientConn, error) { var dialOptions []grpc.DialOption var dialTarget string if b.TLS.Enabled { // user requested a secure connection @@ -97,14 +104,38 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, er return nil, err } - go func(cc *grpc.ClientConn) { + if b.ConnectMetrics == nil { + cm := reporter.ConnectMetrics{ + Logger: logger, + MetricsFactory: mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}}), + } + cm.NewConnectMetrics() + b.ConnectMetrics = &cm + } + + go func(cc *grpc.ClientConn, cm *reporter.ConnectMetrics) { logger.Info("Checking connection to collector") + var egt *expvar.String + r := expvar.Get("gRPCTarget") + if r == nil { + egt = expvar.NewString("gRPCTarget") + } else { + egt = r.(*expvar.String) + } + for { s := cc.GetState() + if s == connectivity.Ready { + cm.OnConnectionStatusChange(true) + egt.Set(cc.Target()) + } else { + cm.OnConnectionStatusChange(false) + } + logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s)) cc.WaitForStateChange(context.Background(), s) } - }(conn) + }(conn, b.ConnectMetrics) return conn, nil } diff --git a/cmd/agent/app/reporter/grpc/builder_test.go b/cmd/agent/app/reporter/grpc/builder_test.go index 31949754a2a..4df5c395ba4 100644 --- a/cmd/agent/app/reporter/grpc/builder_test.go +++ b/cmd/agent/app/reporter/grpc/builder_test.go @@ -59,7 +59,7 @@ func TestBuilderFromConfig(t *testing.T) { t, []string{"127.0.0.1:14268", "127.0.0.1:14269"}, cfg.CollectorHostPorts) - r, err := cfg.CreateConnection(zap.NewNop()) + r, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory) require.NoError(t, err) assert.NotNil(t, r) } @@ -149,7 +149,7 @@ func TestBuilderWithCollectors(t *testing.T) { cfg.Notifier = test.notifier cfg.Discoverer = test.discoverer - conn, err := cfg.CreateConnection(zap.NewNop()) + conn, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory) if test.expectedError == "" { require.NoError(t, err) require.NotNil(t, conn) diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index 6c5b19f4e65..2d9dc94783f 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -37,7 +37,7 @@ type ProxyBuilder struct { // NewCollectorProxy creates ProxyBuilder func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { - conn, err := builder.CreateConnection(logger) + conn, err := builder.CreateConnection(logger, mFactory) if err != nil { return nil, err }