Skip to content

Commit

Permalink
Collect metrics from ES bulk service (#688)
Browse files Browse the repository at this point in the history
* Collect ES bulk metrics

Signed-off-by: Pavol Loffay <[email protected]>

* Fix tests

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Feb 8, 2018
1 parent dc6c4a7 commit ef7a367
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
23 changes: 21 additions & 2 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package config
import (
"bytes"
"context"
"sync"
"time"

"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/pkg/es"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
Expand All @@ -43,23 +46,39 @@ type Configuration struct {

// ClientBuilder creates new es.Client
type ClientBuilder interface {
NewClient(logger *zap.Logger) (es.Client, error)
NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
}

// NewClient creates a new ElasticSearch client
func (c *Configuration) NewClient(logger *zap.Logger) (es.Client, error) {
func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
rawClient, err := elastic.NewClient(c.GetConfigs()...)
if err != nil {
return nil, err
}

sm := storageMetrics.NewWriteMetrics(metricsFactory, "BulkIndex")
m := sync.Map{}

service, err := rawClient.BulkProcessor().
Before(func(id int64, requests []elastic.BulkableRequest) {
m.Store(id, time.Now())
}).
After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
start, ok := m.Load(id)
if !ok {
return
}
m.Delete(id)

duration := time.Since(start.(time.Time))
sm.Emit(err, duration)

if err != nil {
var buffer bytes.Buffer
for i, r := range requests {
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.primaryConfig.NewClient(logger)
primaryClient, err := f.primaryConfig.NewClient(logger, metricsFactory)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type mockClientBuilder struct {
err error
}

func (m *mockClientBuilder) NewClient(logger *zap.Logger) (es.Client, error) {
func (m *mockClientBuilder) NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
if m.err == nil {
return &mocks.Client{}, nil
}
Expand Down

0 comments on commit ef7a367

Please sign in to comment.