Skip to content

Commit

Permalink
Refactor tenancy checking from gRPC to gRPC batch consumer (#3718)
Browse files Browse the repository at this point in the history
* Refactor tenancy checking from gRPC to gRPC batch consumer

Signed-off-by: Ed Snible <[email protected]>

* Pseudo-constructor to initialize batchConsumer

Signed-off-by: Ed Snible <[email protected]>
  • Loading branch information
esnible authored Jun 2, 2022
1 parent 3499c88 commit 2530903
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 34 deletions.
57 changes: 34 additions & 23 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,51 +33,62 @@ import (
type GRPCHandler struct {
logger *zap.Logger
batchConsumer batchConsumer
tenancyConfig *tenancy.TenancyConfig
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler {
return &GRPCHandler{
logger: logger,
batchConsumer: batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
},
},
tenancyConfig: tenancyConfig,
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.GRPCTransport,
processor.ProtoSpanFormat,
tenancyConfig),
}
}

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
tenant, err := g.validateTenant(ctx)
if err != nil {
g.logger.Error("rejecting spans (tenancy)", zap.Error(err))
return nil, err
}

batch := &r.Batch
err = g.batchConsumer.consume(batch, tenant)
err := g.batchConsumer.consume(ctx, batch)
return &api_v2.PostSpansResponse{}, err
}

type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
tenancyConfig tenancy.TenancyConfig
}

func (c *batchConsumer) consume(batch *model.Batch, tenant string) error {
func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyConfig *tenancy.TenancyConfig) batchConsumer {
if tenancyConfig == nil {
tenancyConfig = &tenancy.TenancyConfig{}
}
return batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
InboundTransport: transport,
SpanFormat: spanFormat,
},
tenancyConfig: *tenancyConfig,
}
}

func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
tenant, err := c.validateTenant(ctx)
if err != nil {
c.logger.Error("rejecting spans (tenancy)", zap.Error(err))
return err
}

for _, span := range batch.Spans {
if span.GetProcess() == nil {
span.Process = batch.Process
}
}
_, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
_, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
Tenant: tenant,
Expand All @@ -92,8 +103,8 @@ func (c *batchConsumer) consume(batch *model.Batch, tenant string) error {
return nil
}

func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) {
if !g.tenancyConfig.Enabled {
func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) {
if !c.tenancyConfig.Enabled {
return "", nil
}

Expand All @@ -102,14 +113,14 @@ func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
}

tenants := md[g.tenancyConfig.Header]
tenants := md[c.tenancyConfig.Header]
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
return "", status.Errorf(codes.PermissionDenied, "extra tenant header")
}

if !g.tenancyConfig.Valid(tenants[0]) {
if !c.tenancyConfig.Valid(tenants[0]) {
return "", status.Errorf(codes.PermissionDenied, "unknown tenant")
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func TestGetTenant(t *testing.T) {
}))
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tenant, err := handler.validateTenant(test.ctx)
tenant, err := handler.batchConsumer.validateTenant(test.ctx)
if test.mustFail {
require.Error(t, err)
} else {
Expand Down
17 changes: 7 additions & 10 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,11 @@ func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting {

func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate {
return &consumerDelegate{
batchConsumer: batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
SpanFormat: processor.OTLPSpanFormat,
InboundTransport: processor.UnknownTransport, // could be gRPC or HTTP
},
},
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
nil),
protoFromTraces: otlp2jaeger.ProtoFromTraces,
}
}
Expand All @@ -156,13 +153,13 @@ type consumerDelegate struct {
protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error)
}

func (c *consumerDelegate) consume(_ context.Context, td ptrace.Traces) error {
func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error {
batches, err := c.protoFromTraces(td)
if err != nil {
return err
}
for _, batch := range batches {
err := c.batchConsumer.consume(batch, "")
err := c.batchConsumer.consume(ctx, batch)
if err != nil {
return err
}
Expand Down

0 comments on commit 2530903

Please sign in to comment.