Skip to content

Commit

Permalink
Refactored Jaeger exporter to move connection into start for upcoming…
Browse files Browse the repository at this point in the history
… Auth changes. (#3299)

* Refactored Jaeger exporter to move connection into start

* added more thorough cleznup

* fixed linting
  • Loading branch information
pavankrish123 authored May 26, 2021
1 parent 76d1c70 commit 66c4908
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 40 deletions.
62 changes: 30 additions & 32 deletions exporter/jaegerexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
Expand All @@ -40,24 +41,7 @@ import (
// The exporter name is the name to be used in the observability of the exporter.
// The collectorEndpoint should be of the form "hostname:14250" (a gRPC target).
func newTracesExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter, error) {
opts, err := cfg.GRPCClientSettings.ToDialOptions()
if err != nil {
return nil, err
}

conn, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
if err != nil {
return nil, err
}

collectorServiceClient := jaegerproto.NewCollectorServiceClient(conn)
s := newProtoGRPCSender(logger,
cfg.ID().String(),
collectorServiceClient,
metadata.New(cfg.GRPCClientSettings.Headers),
cfg.WaitForReady,
conn,
)
s := newProtoGRPCSender(cfg, logger)
return exporterhelper.NewTracesExporter(
cfg, logger, s.pushTraceData,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
Expand All @@ -82,23 +66,21 @@ type protoGRPCSender struct {
connStateReporterInterval time.Duration
stateChangeCallbacks []func(connectivity.State)

stopCh chan struct{}
stopped bool
stopLock sync.Mutex
stopCh chan struct{}
stopped bool
stopLock sync.Mutex
clientSettings *configgrpc.GRPCClientSettings
}

func newProtoGRPCSender(logger *zap.Logger, name string, cl jaegerproto.CollectorServiceClient, md metadata.MD, waitForReady bool, conn stateReporter) *protoGRPCSender {
func newProtoGRPCSender(cfg *Config, logger *zap.Logger) *protoGRPCSender {
s := &protoGRPCSender{
name: name,
logger: logger,
client: cl,
metadata: md,
waitForReady: waitForReady,

conn: conn,
name: cfg.ID().String(),
logger: logger,
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
waitForReady: cfg.WaitForReady,
connStateReporterInterval: time.Second,

stopCh: make(chan struct{}),
stopCh: make(chan struct{}),
clientSettings: &cfg.GRPCClientSettings,
}
s.AddStateChangeCallback(s.onStateChange)
return s
Expand Down Expand Up @@ -144,7 +126,23 @@ func (s *protoGRPCSender) shutdown(context.Context) error {
return nil
}

func (s *protoGRPCSender) start(context.Context, component.Host) error {
func (s *protoGRPCSender) start(_ context.Context, host component.Host) error {
if s.clientSettings == nil {
return fmt.Errorf("client settings not found")
}
opts, err := s.clientSettings.ToDialOptions()
if err != nil {
return err
}

conn, err := grpc.Dial(s.clientSettings.Endpoint, opts...)
if err != nil {
return err
}

s.client = jaegerproto.NewCollectorServiceClient(conn)
s.conn = conn

go s.startConnectionStatusReporter()
return nil
}
Expand Down
51 changes: 43 additions & 8 deletions exporter/jaegerexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,19 @@ func TestNew(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newTracesExporter(&tt.config, zap.NewNop())
if (err != nil) != tt.wantErr {
t.Errorf("newTracesExporter() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got == nil {
assert.NoError(t, err)
assert.NotNil(t, got)
t.Cleanup(func() {
require.NoError(t, got.Shutdown(context.Background()))
})

err = got.Start(context.Background(), componenttest.NewNopHost())
if tt.wantErr {
assert.Error(t, err)
return
}

require.NoError(t, err)
// This is expected to fail.
err = got.ConsumeTraces(context.Background(), testdata.GenerateTracesNoLibraries())
assert.Error(t, err)
Expand Down Expand Up @@ -220,7 +225,7 @@ func TestMutualTLS(t *testing.T) {
}
exporter, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
err = exporter.Start(context.Background(), nil)
err = exporter.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, exporter.Shutdown(context.Background())) })

Expand Down Expand Up @@ -256,6 +261,15 @@ func TestConnectionStateChange(t *testing.T) {
stopCh: make(chan struct{}),
conn: sr,
connStateReporterInterval: 10 * time.Millisecond,
clientSettings: &configgrpc.GRPCClientSettings{
Headers: nil,
Endpoint: "foo.bar",
Compression: "",
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
Keepalive: nil,
},
}

wg.Add(1)
Expand All @@ -264,8 +278,20 @@ func TestConnectionStateChange(t *testing.T) {
wg.Done()
})

require.NoError(t, sender.start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, sender.shutdown(context.Background())) })
done := make(chan struct{})
go func() {
sender.startConnectionStatusReporter()
done <- struct{}{}
}()

t.Cleanup(func() {
// set the stopped flag, and wait for statusReporter to finish and signal back
sender.stopLock.Lock()
sender.stopped = true
sender.stopLock.Unlock()
<-done
})

wg.Wait() // wait for the initial state to be propagated

// test
Expand All @@ -287,6 +313,15 @@ func TestConnectionReporterEndsOnStopped(t *testing.T) {
stopCh: make(chan struct{}),
conn: sr,
connStateReporterInterval: 10 * time.Millisecond,
clientSettings: &configgrpc.GRPCClientSettings{
Headers: nil,
Endpoint: "foo.bar",
Compression: "",
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
Keepalive: nil,
},
}

wg := sync.WaitGroup{}
Expand Down

0 comments on commit 66c4908

Please sign in to comment.