Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix2640: Distributor refactoring to assert ingestion rate limits as early as possible #2703

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## main / unreleased

* [FEATURE] New encoding vParquet3 with support for dedicated attribute columns (@mapno, @stoewer) [#2649](https://github.com/grafana/tempo/pull/2649)
* [ENHANCEMENT] Assert ingestion rate limits as early as possible [#2640](https://github.com/grafana/tempo/pull/2703) (@mghildiy)

## v2.2.0-rc0 / 2023-07-21

Expand Down
66 changes: 37 additions & 29 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,48 @@ func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}

func (d *Distributor) checkForRateLimits(tracesSize, spanCount int, userID string) error {
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, tracesSize) {
overrides.RecordDiscardedSpans(spanCount, reasonRateLimited, userID)
return status.Errorf(codes.ResourceExhausted,
"%s ingestion rate limit (%d bytes) exceeded while adding %d bytes",
overrides.ErrorPrefixRateLimited,
int(d.ingestionRateLimiter.Limit(now, userID)),
tracesSize)
}

return nil
}

func (d *Distributor) extractBasicInfo(ctx context.Context, traces ptrace.Traces) (userID string, spanCount, tracesSize int, err error) {
user, e := user.ExtractOrgID(ctx)
mghildiy marked this conversation as resolved.
Show resolved Hide resolved
if e != nil {
return "", 0, 0, e
}

return user, traces.SpanCount(), (&ptrace.ProtoMarshaler{}).TracesSize(traces), nil
}

// PushTraces pushes a batch of traces
func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*tempopb.PushResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "distributor.PushTraces")
defer span.Finish()

userID, spanCount, size, err := d.extractBasicInfo(ctx, traces)
if err != nil {
// can't record discarded spans here b/c there's no tenant
return nil, err
}
if spanCount == 0 {
return &tempopb.PushResponse{}, nil
}
// check limits
err = d.checkForRateLimits(size, spanCount, userID)
if err != nil {
return nil, err
}

// Convert to bytes and back. This is unfortunate for efficiency, but it works
// around the otel-collector internalization of otel-proto which Tempo also uses.
convert, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces)
Expand All @@ -290,12 +327,6 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te

batches := trace.Batches

userID, err := user.ExtractOrgID(ctx)
if err != nil {
// can't record discarded spans here b/c there's no tenant
return nil, err
}

if d.cfg.LogReceivedSpans.Enabled || d.cfg.LogReceivedTraces {
if d.cfg.LogReceivedSpans.IncludeAllAttributes {
logSpansWithAllAttributes(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger)
Expand All @@ -304,32 +335,9 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
}
}

// metric size
size := 0
spanCount := 0
for _, b := range batches {
size += b.Size()
for _, ils := range b.ScopeSpans {
spanCount += len(ils.Spans)
}
}
if spanCount == 0 {
return &tempopb.PushResponse{}, nil
}
metricBytesIngested.WithLabelValues(userID).Add(float64(size))
metricSpansIngested.WithLabelValues(userID).Add(float64(spanCount))

// check limits
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, size) {
overrides.RecordDiscardedSpans(spanCount, reasonRateLimited, userID)
return nil, status.Errorf(codes.ResourceExhausted,
"%s ingestion rate limit (%d bytes) exceeded while adding %d bytes",
overrides.ErrorPrefixRateLimited,
int(d.ingestionRateLimiter.Limit(now, userID)),
size)
}

keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
if err != nil {
overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
Expand Down
40 changes: 40 additions & 0 deletions modules/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,46 @@ func TestLogSpans(t *testing.T) {
}
}

func TestRateLimitRespected(t *testing.T) {
// prepare test data
limits := overrides.Limits{
IngestionRateStrategy: overrides.LocalIngestionRateStrategy,
IngestionRateLimitBytes: 400,
IngestionBurstSizeBytes: 200,
}
buf := &bytes.Buffer{}
logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf))
d := prepare(t, &limits, nil, logger)
batches := []*v1.ResourceSpans{
makeResourceSpans("test-service", []*v1.ScopeSpans{
makeScope(
makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span1", nil,
makeAttribute("tag1", "value1")),
makeSpan("e3210a2b38097332d1fe43083ea93d29", "6c21c48da4dbd1a7", "Test Span2", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR},
makeAttribute("tag1", "value1"),
makeAttribute("tag2", "value2"))),
makeScope(
makeSpan("bb42ec04df789ff04b10ea5274491685", "1b3a296034f4031e", "Test Span3", nil)),
}, makeAttribute("resource_attribute1", "value1")),
makeResourceSpans("test-service2", []*v1.ScopeSpans{
makeScope(
makeSpan("b1c792dea27d511c145df8402bdd793a", "56afb9fe18b6c2d6", "Test Span", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})),
}, makeAttribute("resource_attribute2", "value2")),
}
traces := batchesToTraces(t, batches)

// invoke unit
_, err := d.PushTraces(ctx, traces)

// validations
if err == nil {
t.Fatal("Expected error")
}
status, ok := status.FromError(err)
assert.True(t, ok)
assert.True(t, status.Code() == codes.ResourceExhausted, "Wrong status code")
}

type logSpan struct {
Msg string `json:"msg"`
Level string `json:"level"`
Expand Down