Skip to content

Commit

Permalink
Rollup based on metrics (#821)
Browse files Browse the repository at this point in the history
Closes: GH-515
  • Loading branch information
DariaKunoichi authored Oct 26, 2022
1 parent ef59326 commit 6e635fd
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 22 deletions.
24 changes: 20 additions & 4 deletions pkg/otelcollector/rollupprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ func initRollupsLog() []*Rollup {
{
FromField: otelcollector.WorkloadDurationLabel,
TreatAsZero: []string{},
Datasketch: true,
},
{
FromField: otelcollector.FlowDurationLabel,
TreatAsZero: []string{},
Datasketch: true,
},
{
FromField: otelcollector.ApertureProcessingDurationLabel,
TreatAsZero: []string{},
Datasketch: true,
},
{
FromField: otelcollector.HTTPRequestContentLength,
Expand All @@ -59,6 +62,11 @@ func _initRollupsPerType(rollupsInit []*Rollup, rollupTypes []RollupType) []*Rol
var rollups []*Rollup
for _, rollupInit := range rollupsInit {
for _, rollupType := range rollupTypes {

if rollupType == RollupDatasketch && !rollupInit.Datasketch {
continue
}

rollups = append(rollups, &Rollup{
FromField: rollupInit.FromField,
ToField: AggregateField(rollupInit.FromField, rollupType),
Expand Down Expand Up @@ -120,13 +128,16 @@ func (rp *rollupProcessor) Shutdown(context.Context) error {
// ConsumeLogs implements LogsProcessor.
func (rp *rollupProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
applyCardinalityLimits(ld, rp.cfg.AttributeCardinalityLimit)

rollupData := make(map[string]pcommon.Map)
datasketches := make(map[string]map[string]*sketches.HeapDoublesSketch)

err := otelcollector.IterateLogRecords(ld, func(logRecord plog.LogRecord) error {
key := rp.key(logRecord.Attributes(), rollupsLog)
attributes := logRecord.Attributes()
key := rp.key(attributes, rollupsLog)
_, exists := rollupData[key]
if !exists {
rollupData[key] = logRecord.Attributes()
rollupData[key] = attributes
rollupData[key].PutInt(RollupCountKey, 0)
}
_, exists = datasketches[key]
Expand All @@ -135,7 +146,7 @@ func (rp *rollupProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error
}
rawCount, _ := rollupData[key].Get(RollupCountKey)
rollupData[key].PutInt(RollupCountKey, rawCount.Int()+1)
rp.rollupAttributes(datasketches[key], rollupData[key], logRecord.Attributes(), rollupsLog)
rp.rollupAttributes(datasketches[key], rollupData[key], attributes, rollupsLog)
return nil
})
if err != nil {
Expand Down Expand Up @@ -188,7 +199,12 @@ func applyCardinalityLimits(ld plog.Logs, limit int) {
})
}

func (rp *rollupProcessor) rollupAttributes(datasketches map[string]*sketches.HeapDoublesSketch, baseAttributes, attributes pcommon.Map, rollups []*Rollup) {
func (rp *rollupProcessor) rollupAttributes(
datasketches map[string]*sketches.HeapDoublesSketch,
baseAttributes,
attributes pcommon.Map,
rollups []*Rollup,
) {
// TODO tgill: need to track latest timestamp from attributes as the timestamp in baseAttributes
for _, rollup := range rollups {
switch rollup.Type {
Expand Down
37 changes: 19 additions & 18 deletions pkg/otelcollector/rollupprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@ var _ = Describe("Rollup processor", func() {
ScopeLogs().AppendEmpty().
LogRecords()
logRecord := logs.AppendEmpty()
logRecord.Attributes().PutString("fizz", "buzz")
logRecord.Attributes().PutString(otelcollector.WorkloadDurationLabel, strconv.Itoa(attributeValues[0]))
logRecord.Attributes().PutStr("fizz", "buzz")
logRecord.Attributes().PutStr(otelcollector.ApertureSourceLabel, otelcollector.ApertureSourceEnvoy)
logRecord.Attributes().PutStr(otelcollector.WorkloadDurationLabel, strconv.Itoa(attributeValues[0]))

err = logsProcessor.ConsumeLogs(context.TODO(), input)
Expect(err).NotTo(HaveOccurred())

Expect(testConsumer.receivedLogs).To(HaveLen(1))
attributes := testConsumer.receivedLogs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()
Expect(attributes).To(HaveLen(7))
Expect(attributes).To(HaveLen(8))
Expect(attributes).To(HaveKeyWithValue(RollupCountKey, int64(1)))
Expect(attributes).To(HaveKeyWithValue(AggregateField(otelcollector.WorkloadDurationLabel, RollupDatasketch), expectedSerializedDatasketch))
Expect(attributes).To(HaveKeyWithValue(AggregateField(otelcollector.WorkloadDurationLabel, RollupSum), float64(5)))
Expand All @@ -80,42 +81,42 @@ var _ = Describe("Rollup processor", func() {
Expect(err).NotTo(HaveOccurred())

input := plog.NewLogs()
logs := input.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty().
LogRecords()
logs := input.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
logRecord := logs.AppendEmpty()
logRecord.Attributes().PutString(otelcollector.WorkloadDurationLabel, strconv.Itoa(attributeValues[0]))
logRecord.Attributes().PutStr(otelcollector.WorkloadDurationLabel, strconv.Itoa(attributeValues[0]))
logRecord = logs.AppendEmpty()
logRecord.Attributes().PutStr(otelcollector.WorkloadDurationLabel, strconv.Itoa(attributeValues[1]))
logRecord = logs.AppendEmpty()
logRecord.Attributes().PutString(otelcollector.WorkloadDurationLabel, strconv.Itoa(attributeValues[1]))
logRecord.Attributes().PutStr(otelcollector.WorkloadDurationLabel, strconv.Itoa(attributeValues[2]))
logRecord = logs.AppendEmpty()
logRecord.Attributes().PutString(otelcollector.WorkloadDurationLabel, strconv.Itoa(attributeValues[2]))
logRecord.Attributes().PutStr(otelcollector.HTTPRequestContentLength, strconv.Itoa(1234))

err = logsProcessor.ConsumeLogs(context.TODO(), input)
Expect(err).NotTo(HaveOccurred())

Expect(testConsumer.receivedLogs).To(HaveLen(1))
attributes := testConsumer.receivedLogs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()
Expect(attributes).To(HaveLen(6))
Expect(attributes).To(HaveKeyWithValue(RollupCountKey, int64(3)))
Expect(attributes).To(HaveLen(10))
Expect(attributes).To(HaveKeyWithValue(RollupCountKey, int64(4)))
Expect(attributes).To(HaveKeyWithValue(AggregateField(otelcollector.WorkloadDurationLabel, RollupDatasketch), expectedSerializedDatasketch))
Expect(attributes).To(HaveKeyWithValue(AggregateField(otelcollector.WorkloadDurationLabel, RollupSum), float64(18)))
Expect(attributes).To(HaveKeyWithValue(AggregateField(otelcollector.WorkloadDurationLabel, RollupMin), float64(5)))
Expect(attributes).To(HaveKeyWithValue(AggregateField(otelcollector.WorkloadDurationLabel, RollupMax), float64(7)))
Expect(attributes).To(HaveKeyWithValue(AggregateField(otelcollector.WorkloadDurationLabel, RollupSumOfSquares), float64(110)))
Expect(attributes).NotTo(HaveKey(AggregateField(otelcollector.HTTPRequestContentLength, RollupDatasketch)))
})

It("applies cardinality limits", func() {
input := plog.NewLogs()
logs := input.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty().
LogRecords()
logs := input.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()

for i := 0; i < 30; i++ {
logRecord := logs.AppendEmpty()
logRecord.Attributes().PutString(otelcollector.WorkloadDurationLabel, strconv.Itoa(i))
logRecord.Attributes().PutString("low-cardinality", strconv.Itoa(i%2))
logRecord.Attributes().PutString("almost-high-cardinality", strconv.Itoa(i%10))
logRecord.Attributes().PutString("high-cardinality", strconv.Itoa(i))
logRecord.Attributes().PutStr(otelcollector.WorkloadDurationLabel, strconv.Itoa(i))
logRecord.Attributes().PutStr(otelcollector.ApertureSourceLabel, otelcollector.ApertureSourceEnvoy)
logRecord.Attributes().PutStr("low-cardinality", strconv.Itoa(i%2))
logRecord.Attributes().PutStr("almost-high-cardinality", strconv.Itoa(i%10))
logRecord.Attributes().PutStr("high-cardinality", strconv.Itoa(i))
}

err := logsProcessor.ConsumeLogs(context.TODO(), input)
Expand Down
1 change: 1 addition & 0 deletions pkg/otelcollector/rollupprocessor/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Rollup struct {
ToField string `mapstructure:"to"`
Type RollupType `mapstructure:"type"`
TreatAsZero []string `mapstructure:"treat_as_zero"`
Datasketch bool `mapstructure:"datasketch"`
}

// GetFromFieldValue returns value of `FromField` from attributes as float64.
Expand Down

0 comments on commit 6e635fd

Please sign in to comment.