From 2530903c84230a5a68bf616fd8dde45e23e79dea Mon Sep 17 00:00:00 2001 From: Ed Snible Date: Thu, 2 Jun 2022 11:26:58 -0400 Subject: [PATCH] Refactor tenancy checking from gRPC to gRPC batch consumer (#3718) * Refactor tenancy checking from gRPC to gRPC batch consumer Signed-off-by: Ed Snible * Pseudo-constructor to initialize batchConsumer Signed-off-by: Ed Snible --- cmd/collector/app/handler/grpc_handler.go | 57 +++++++++++-------- .../app/handler/grpc_handler_test.go | 2 +- cmd/collector/app/handler/otlp_receiver.go | 17 +++--- 3 files changed, 42 insertions(+), 34 deletions(-) diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index 095bb679a19..30a047900d3 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -33,35 +33,24 @@ import ( type GRPCHandler struct { logger *zap.Logger batchConsumer batchConsumer - tenancyConfig *tenancy.TenancyConfig } // NewGRPCHandler registers routes for this handler on the given router. func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler { return &GRPCHandler{ logger: logger, - batchConsumer: batchConsumer{ - logger: logger, - spanProcessor: spanProcessor, - spanOptions: processor.SpansOptions{ - InboundTransport: processor.GRPCTransport, - SpanFormat: processor.ProtoSpanFormat, - }, - }, - tenancyConfig: tenancyConfig, + batchConsumer: newBatchConsumer(logger, + spanProcessor, + processor.GRPCTransport, + processor.ProtoSpanFormat, + tenancyConfig), } } // PostSpans implements gRPC CollectorService. func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) { - tenant, err := g.validateTenant(ctx) - if err != nil { - g.logger.Error("rejecting spans (tenancy)", zap.Error(err)) - return nil, err - } - batch := &r.Batch - err = g.batchConsumer.consume(batch, tenant) + err := g.batchConsumer.consume(ctx, batch) return &api_v2.PostSpansResponse{}, err } @@ -69,15 +58,37 @@ type batchConsumer struct { logger *zap.Logger spanProcessor processor.SpanProcessor spanOptions processor.SpansOptions + tenancyConfig tenancy.TenancyConfig } -func (c *batchConsumer) consume(batch *model.Batch, tenant string) error { +func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyConfig *tenancy.TenancyConfig) batchConsumer { + if tenancyConfig == nil { + tenancyConfig = &tenancy.TenancyConfig{} + } + return batchConsumer{ + logger: logger, + spanProcessor: spanProcessor, + spanOptions: processor.SpansOptions{ + InboundTransport: transport, + SpanFormat: spanFormat, + }, + tenancyConfig: *tenancyConfig, + } +} + +func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error { + tenant, err := c.validateTenant(ctx) + if err != nil { + c.logger.Error("rejecting spans (tenancy)", zap.Error(err)) + return err + } + for _, span := range batch.Spans { if span.GetProcess() == nil { span.Process = batch.Process } } - _, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{ + _, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{ InboundTransport: processor.GRPCTransport, SpanFormat: processor.ProtoSpanFormat, Tenant: tenant, @@ -92,8 +103,8 @@ func (c *batchConsumer) consume(batch *model.Batch, tenant string) error { return nil } -func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) { - if !g.tenancyConfig.Enabled { +func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) { + if !c.tenancyConfig.Enabled { return "", nil } @@ -102,14 +113,14 @@ func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) { return "", status.Errorf(codes.PermissionDenied, "missing tenant header") } - tenants := md[g.tenancyConfig.Header] + tenants := md[c.tenancyConfig.Header] if len(tenants) < 1 { return "", status.Errorf(codes.PermissionDenied, "missing tenant header") } else if len(tenants) > 1 { return "", status.Errorf(codes.PermissionDenied, "extra tenant header") } - if !g.tenancyConfig.Valid(tenants[0]) { + if !c.tenancyConfig.Valid(tenants[0]) { return "", status.Errorf(codes.PermissionDenied, "unknown tenant") } diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 0b248f36016..f4d3393ce39 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -350,7 +350,7 @@ func TestGetTenant(t *testing.T) { })) for _, test := range tests { t.Run(test.name, func(t *testing.T) { - tenant, err := handler.validateTenant(test.ctx) + tenant, err := handler.batchConsumer.validateTenant(test.ctx) if test.mustFail { require.Error(t, err) } else { diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index 69854cd32d4..5fcd69640ce 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -139,14 +139,11 @@ func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting { func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate { return &consumerDelegate{ - batchConsumer: batchConsumer{ - logger: logger, - spanProcessor: spanProcessor, - spanOptions: processor.SpansOptions{ - SpanFormat: processor.OTLPSpanFormat, - InboundTransport: processor.UnknownTransport, // could be gRPC or HTTP - }, - }, + batchConsumer: newBatchConsumer(logger, + spanProcessor, + processor.UnknownTransport, // could be gRPC or HTTP + processor.OTLPSpanFormat, + nil), protoFromTraces: otlp2jaeger.ProtoFromTraces, } } @@ -156,13 +153,13 @@ type consumerDelegate struct { protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error) } -func (c *consumerDelegate) consume(_ context.Context, td ptrace.Traces) error { +func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error { batches, err := c.protoFromTraces(td) if err != nil { return err } for _, batch := range batches { - err := c.batchConsumer.consume(batch, "") + err := c.batchConsumer.consume(ctx, batch) if err != nil { return err }