Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC metrics to agent #1180

Merged
merged 4 commits into from
Nov 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,45 @@ Changes by Version

##### Breaking Changes

- Add gRPC metrics to agent ([#1180](https://github.com/jaegertracing/jaeger/pull/1180), [@pavolloffay](https://github.com/pavolloffay))

The following metrics:
```
jaeger_agent_tchannel_reporter_batch_size{format="jaeger"} 0
jaeger_agent_tchannel_reporter_batch_size{format="zipkin"} 0
jaeger_agent_tchannel_reporter_batches_failures{format="jaeger"} 0
jaeger_agent_tchannel_reporter_batches_failures{format="zipkin"} 0
jaeger_agent_tchannel_reporter_batches_submitted{format="jaeger"} 0
jaeger_agent_tchannel_reporter_batches_submitted{format="zipkin"} 0
jaeger_agent_tchannel_reporter_spans_failures{format="jaeger"} 0
jaeger_agent_tchannel_reporter_spans_failures{format="zipkin"} 0
jaeger_agent_tchannel_reporter_spans_submitted{format="jaeger"} 0
jaeger_agent_tchannel_reporter_spans_submitted{format="zipkin"} 0

jaeger_agent_collector_proxy{endpoint="baggage",result="err"} 0
jaeger_agent_collector_proxy{endpoint="baggage",result="ok"} 0
jaeger_agent_collector_proxy{endpoint="sampling",result="err"} 0
jaeger_agent_collector_proxy{endpoint="sampling",result="ok"} 0
```
have been renamed to:
```
jaeger_agent_reporter_batch_size{format="jaeger",protocol="tchannel"} 0
jaeger_agent_reporter_batch_size{format="zipkin",protocol="tchannel"} 0
jaeger_agent_reporter_batches_failures{format="jaeger",protocol="tchannel"} 0
jaeger_agent_reporter_batches_failures{format="zipkin",protocol="tchannel"} 0
jaeger_agent_reporter_batches_submitted{format="jaeger",protocol="tchannel"} 0
jaeger_agent_reporter_batches_submitted{format="zipkin",protocol="tchannel"} 0
jaeger_agent_reporter_spans_failures{format="jaeger",protocol="tchannel"} 0
jaeger_agent_reporter_spans_failures{format="zipkin",protocol="tchannel"} 0
jaeger_agent_reporter_spans_submitted{format="jaeger",protocol="tchannel"} 0
jaeger_agent_reporter_spans_submitted{format="zipkin",protocol="tchannel"} 0

jaeger_agent_collector_proxy{endpoint="baggage",protocol="tchannel",result="err"} 0
jaeger_agent_collector_proxy{endpoint="baggage",protocol="tchannel",result="ok"} 0
jaeger_agent_collector_proxy{endpoint="sampling",protocol="tchannel",result="err"} 0
jaeger_agent_collector_proxy{endpoint="sampling",protocol="tchannel",result="ok"} 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if we remove "EmitZipkinBatch" in #1193, we need to change the changelog since the new metrics won't be partitioned by the "format".

```

- Rename tcollector proxy metric in agent ([#1182](https://github.com/jaegertracing/jaeger/pull/1182), [@pavolloffay](https://github.com/pavolloffay))

The following metric:
Expand Down
34 changes: 3 additions & 31 deletions cmd/agent/app/httpserver/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package httpserver
import (
"time"

"github.com/uber/jaeger-lib/metrics"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"

Expand All @@ -28,29 +27,15 @@ import (
type collectorProxy struct {
samplingClient sampling.TChanSamplingManager
baggageClient baggage.TChanBaggageRestrictionManager
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
metrics struct {
// Number of successful sampling rate responses from collector
SamplingSuccess metrics.Counter `metric:"collector-proxy" tags:"result=ok,endpoint=sampling"`

// Number of failed sampling rate responses from collector
SamplingFailures metrics.Counter `metric:"collector-proxy" tags:"result=err,endpoint=sampling"`

// Number of successful baggage restriction responses from collector
BaggageSuccess metrics.Counter `metric:"collector-proxy" tags:"result=ok,endpoint=baggage"`

// Number of failed baggage restriction responses from collector
BaggageFailures metrics.Counter `metric:"collector-proxy" tags:"result=err,endpoint=baggage"`
}
}

// NewCollectorProxy implements Manager by proxying the requests to collector.
func NewCollectorProxy(svc string, channel *tchannel.Channel, mFactory metrics.Factory) ClientConfigManager {
func NewCollectorProxy(svc string, channel *tchannel.Channel) ClientConfigManager {
thriftClient := thrift.NewClient(channel, svc, nil)
res := &collectorProxy{
samplingClient: sampling.NewTChanSamplingManagerClient(thriftClient),
baggageClient: baggage.NewTChanBaggageRestrictionManagerClient(thriftClient),
}
metrics.Init(&res.metrics, mFactory, nil)
return res
}

Expand All @@ -59,24 +44,11 @@ func (c *collectorProxy) GetSamplingStrategy(serviceName string) (*sampling.Samp
defer cancel()

// TODO: enable tracer on the tchannel and get metrics for free (sampler can be off)
resp, err := c.samplingClient.GetSamplingStrategy(ctx, serviceName)
if err != nil {
c.metrics.SamplingFailures.Inc(1)
return nil, err
}
c.metrics.SamplingSuccess.Inc(1)
return resp, nil
return c.samplingClient.GetSamplingStrategy(ctx, serviceName)
}

func (c *collectorProxy) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
ctx, cancel := tchannel.NewContextBuilder(time.Second).DisableTracing().Build()
defer cancel()

resp, err := c.baggageClient.GetBaggageRestrictions(ctx, serviceName)
if err != nil {
c.metrics.BaggageFailures.Inc(1)
return nil, err
}
c.metrics.BaggageSuccess.Inc(1)
return resp, nil
return c.baggageClient.GetBaggageRestrictions(ctx, serviceName)
}
21 changes: 2 additions & 19 deletions cmd/agent/app/httpserver/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ package httpserver
import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
mTestutils "github.com/uber/jaeger-lib/metrics/testutils"
"github.com/uber/tchannel-go/thrift"

"github.com/jaegertracing/jaeger/cmd/agent/app/testutils"
Expand All @@ -31,7 +28,7 @@ import (
)

func TestCollectorProxy(t *testing.T) {
metricsFactory, collector := testutils.InitMockCollector(t)
_, collector := testutils.InitMockCollector(t)
defer collector.Close()

collector.AddSamplingStrategy("service1", &sampling.SamplingStrategyResponse{
Expand All @@ -43,7 +40,7 @@ func TestCollectorProxy(t *testing.T) {
{BaggageKey: "luggage", MaxValueLength: 10},
})

mgr := NewCollectorProxy("jaeger-collector", collector.Channel, metricsFactory)
mgr := NewCollectorProxy("jaeger-collector", collector.Channel)

sResp, err := mgr.GetSamplingStrategy("service1")
require.NoError(t, err)
Expand All @@ -57,28 +54,14 @@ func TestCollectorProxy(t *testing.T) {
require.Len(t, bResp, 1)
assert.Equal(t, "luggage", bResp[0].BaggageKey)
assert.EqualValues(t, 10, bResp[0].MaxValueLength)

// must emit metrics
mTestutils.AssertCounterMetrics(t, metricsFactory, []mTestutils.ExpectedMetric{
{Name: "collector-proxy", Tags: map[string]string{"result": "ok", "endpoint": "sampling"}, Value: 1},
{Name: "collector-proxy", Tags: map[string]string{"result": "err", "endpoint": "sampling"}, Value: 0},
{Name: "collector-proxy", Tags: map[string]string{"result": "ok", "endpoint": "baggage"}, Value: 1},
{Name: "collector-proxy", Tags: map[string]string{"result": "err", "endpoint": "baggage"}, Value: 0},
}...)
}

func TestTCollectorProxyClientErrorPropagates(t *testing.T) {
mFactory := metrics.NewLocalFactory(time.Minute)
proxy := &collectorProxy{samplingClient: &failingClient{}, baggageClient: &failingClient{}}
metrics.Init(&proxy.metrics, mFactory, nil)
_, err := proxy.GetSamplingStrategy("test")
require.EqualError(t, err, "error")
_, err = proxy.GetBaggageRestrictions("test")
require.EqualError(t, err, "error")
mTestutils.AssertCounterMetrics(t, mFactory, []mTestutils.ExpectedMetric{
{Name: "collector-proxy", Tags: map[string]string{"result": "err", "endpoint": "sampling"}, Value: 1},
{Name: "collector-proxy", Tags: map[string]string{"result": "err", "endpoint": "baggage"}, Value: 1},
}...)
}

type failingClient struct{}
Expand Down
72 changes: 72 additions & 0 deletions cmd/agent/app/httpserver/config_mgr_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver

import (
"github.com/uber/jaeger-lib/metrics"

"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// configManagerMetrics holds metrics related to ClientConfigManager
type configManagerMetrics struct {
// Number of successful sampling rate responses from collector
SamplingSuccess metrics.Counter `metric:"collector-proxy" tags:"result=ok,endpoint=sampling"`

// Number of failed sampling rate responses from collector
SamplingFailures metrics.Counter `metric:"collector-proxy" tags:"result=err,endpoint=sampling"`

// Number of successful baggage restriction responses from collector
BaggageSuccess metrics.Counter `metric:"collector-proxy" tags:"result=ok,endpoint=baggage"`

// Number of failed baggage restriction responses from collector
BaggageFailures metrics.Counter `metric:"collector-proxy" tags:"result=err,endpoint=baggage"`
}

// ManagerWithMetrics is manager with metrics integration.
type ManagerWithMetrics struct {
wrapped ClientConfigManager
metrics configManagerMetrics
}

// WrapWithMetrics wraps ClientConfigManager and creates metrics for its invocations.
func WrapWithMetrics(manager ClientConfigManager, mFactory metrics.Factory) *ManagerWithMetrics {
m := configManagerMetrics{}
metrics.Init(&m, mFactory, nil)
return &ManagerWithMetrics{wrapped: manager, metrics: m}
}

// GetSamplingStrategy returns sampling strategy from server.
func (m *ManagerWithMetrics) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
r, err := m.wrapped.GetSamplingStrategy(serviceName)
if err != nil {
m.metrics.SamplingFailures.Inc(1)
} else {
m.metrics.SamplingSuccess.Inc(1)
}
return r, err
}

// GetBaggageRestrictions returns baggage restrictions from server.
func (m *ManagerWithMetrics) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
r, err := m.wrapped.GetBaggageRestrictions(serviceName)
if err != nil {
m.metrics.BaggageFailures.Inc(1)
} else {
m.metrics.BaggageSuccess.Inc(1)
}
return r, err
}
87 changes: 87 additions & 0 deletions cmd/agent/app/httpserver/config_mgr_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
mTestutils "github.com/uber/jaeger-lib/metrics/testutils"

"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

type noopManager struct {
}

func (noopManager) GetSamplingStrategy(s string) (*sampling.SamplingStrategyResponse, error) {
if s == "failed" {
return nil, errors.New("failed")
}
return &sampling.SamplingStrategyResponse{StrategyType: sampling.SamplingStrategyType_PROBABILISTIC}, nil
}
func (noopManager) GetBaggageRestrictions(s string) ([]*baggage.BaggageRestriction, error) {
if s == "failed" {
return nil, errors.New("failed")
}
return []*baggage.BaggageRestriction{{BaggageKey: "foo"}}, nil
}

func TestMetrics(t *testing.T) {
tests := []struct {
expected []mTestutils.ExpectedMetric
err error
}{
{expected: []mTestutils.ExpectedMetric{
{Name: "collector-proxy", Tags: map[string]string{"result": "ok", "endpoint": "sampling"}, Value: 1},
{Name: "collector-proxy", Tags: map[string]string{"result": "err", "endpoint": "sampling"}, Value: 0},
{Name: "collector-proxy", Tags: map[string]string{"result": "ok", "endpoint": "baggage"}, Value: 1},
{Name: "collector-proxy", Tags: map[string]string{"result": "err", "endpoint": "baggage"}, Value: 0},
}},
{expected: []mTestutils.ExpectedMetric{
{Name: "collector-proxy", Tags: map[string]string{"result": "ok", "endpoint": "sampling"}, Value: 0},
{Name: "collector-proxy", Tags: map[string]string{"result": "err", "endpoint": "sampling"}, Value: 1},
{Name: "collector-proxy", Tags: map[string]string{"result": "ok", "endpoint": "baggage"}, Value: 0},
{Name: "collector-proxy", Tags: map[string]string{"result": "err", "endpoint": "baggage"}, Value: 1},
}, err: errors.New("failed")},
}

for _, test := range tests {
metricsFactory := metrics.NewLocalFactory(time.Microsecond)
mgr := WrapWithMetrics(&noopManager{}, metricsFactory)

if test.err != nil {
s, err := mgr.GetSamplingStrategy(test.err.Error())
require.Nil(t, s)
assert.EqualError(t, err, test.err.Error())
b, err := mgr.GetBaggageRestrictions(test.err.Error())
require.Nil(t, b)
assert.EqualError(t, err, test.err.Error())
} else {
s, err := mgr.GetSamplingStrategy("")
require.NoError(t, err)
require.NotNil(t, s)
b, err := mgr.GetBaggageRestrictions("")
require.NoError(t, err)
require.NotNil(t, b)
}
mTestutils.AssertCounterMetrics(t, metricsFactory, test.expected...)
}
}
6 changes: 3 additions & 3 deletions cmd/agent/app/processors/thrift_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func createProcessor(t *testing.T, mFactory metrics.Factory, tFactory thrift.TPr

func initCollectorAndReporter(t *testing.T) (*metrics.LocalFactory, *testutils.MockTCollector, reporter.Reporter) {
metricsFactory, collector := testutils.InitMockCollector(t)
reporter := tchreporter.New("jaeger-collector", collector.Channel, nil, metricsFactory, zap.NewNop())
reporter := reporter.WrapWithMetrics(tchreporter.New("jaeger-collector", collector.Channel, nil, zap.NewNop()), metricsFactory)
return metricsFactory, collector, reporter
}

Expand Down Expand Up @@ -219,8 +219,8 @@ func assertProcessorCorrectness(

// agentReporter must emit metrics
mTestutils.AssertCounterMetrics(t, metricsFactory, []mTestutils.ExpectedMetric{
{Name: "tchannel-reporter.batches.submitted", Tags: map[string]string{"format": format}, Value: 1},
{Name: "tchannel-reporter.spans.submitted", Tags: map[string]string{"format": format}, Value: 1},
{Name: "reporter.batches.submitted", Tags: map[string]string{"format": format}, Value: 1},
{Name: "reporter.spans.submitted", Tags: map[string]string{"format": format}, Value: 1},
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
{Name: "thrift.udp.server.packets.processed", Value: 1},
}...)
}
12 changes: 6 additions & 6 deletions cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package grpc
import (
"errors"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
Expand All @@ -35,13 +36,10 @@ type ProxyBuilder struct {
}

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(o *Options, logger *zap.Logger) (*ProxyBuilder, error) {
func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
if len(o.CollectorHostPort) == 0 {
return nil, errors.New("could not create collector proxy, address is missing")
}

// It does not return error if the collector is not running
// a way to fail immediately is to call WithBlock and WithTimeout
var conn *grpc.ClientConn
if len(o.CollectorHostPort) > 1 {
r, _ := manual.GenerateAndRegisterManualResolver()
Expand All @@ -52,12 +50,14 @@ func NewCollectorProxy(o *Options, logger *zap.Logger) (*ProxyBuilder, error) {
r.InitialAddrs(resolvedAddrs)
conn, _ = grpc.Dial(r.Scheme()+":///round_robin", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
} else {
// It does not return error if the collector is not running
conn, _ = grpc.Dial(o.CollectorHostPort[0], grpc.WithInsecure())
}
grpcMetrics := mFactory.Namespace("", map[string]string{"protocol": "grpc"})
return &ProxyBuilder{
conn: conn,
reporter: NewReporter(conn, logger),
manager: NewSamplingManager(conn)}, nil
reporter: aReporter.WrapWithMetrics(NewReporter(conn, logger), grpcMetrics),
manager: httpserver.WrapWithMetrics(NewSamplingManager(conn), grpcMetrics)}, nil
}

// GetReporter returns Reporter
Expand Down
Loading