Skip to content

Commit

Permalink
Response status in telemetry (#828)
Browse files Browse the repository at this point in the history
### Description of change
This introduces `aperture.response_status` column in telemetry. It
mirrors the implementation of `response_status` label for metrics.
This also extends above logic to include `1xx`, `2xx`, and `3xx` codes
as OK instead of only `2xx` codes.

Besides this, some cleanup is done:
1. Above logic is moved from `FluxMeter` to OTEL package. **This changes
FluxMeter interface!**
2. A log of logic is moved from `metricsprocessor` to
`metricsprocessor/internal` for better visibility and easier separation
of functions which are called directly in metricsprocessor and helpers,
3. The above made creating UT much easier, so this PR also includes
some.

Ref: https://github.com/fluxninja/cloud/issues/6788

##### Checklist

- [x] Tested in playground or other setup
- [x] Tests and/or benchmarks are included
- [x] Breaking changes
  • Loading branch information
kwapik authored Oct 28, 2022
1 parent ed72581 commit 65bab78
Show file tree
Hide file tree
Showing 17 changed files with 719 additions and 315 deletions.
6 changes: 6 additions & 0 deletions pkg/otelcollector/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ const (
ApertureClassifiersLabel = "aperture.classifiers"
// ApertureClassifierErrorsLabel describes encountered classifier errors for specified policy.
ApertureClassifierErrorsLabel = "aperture.classifier_errors"
// ApertureResponseStatusLabel label to denote OK or Error across all protocols.
ApertureResponseStatusLabel = "aperture.response_status"
// ApertureResponseStatusOK OK response across all protocols.
ApertureResponseStatusOK = ApertureFeatureStatusOK
// ApertureResponseStatusError Error response across all protocols.
ApertureResponseStatusError = ApertureFeatureStatusError

/* HTTP Specific labels. */

Expand Down
143 changes: 143 additions & 0 deletions pkg/otelcollector/metricsprocessor/internal/check_response_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package internal

import (
"fmt"
"strings"

flowcontrolv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/flowcontrol/v1"
"github.com/rs/zerolog"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/fluxninja/aperture/pkg/log"
"github.com/fluxninja/aperture/pkg/metrics"
"github.com/fluxninja/aperture/pkg/otelcollector"
)

// AddCheckResponseBasedLabels adds the following labels:
// * otelcollector.ApertureProcessingDurationLabel
// * otelcollector.ApertureServicesLabel
// * otelcollector.ApertureControlPointLabel
// * otelcollector.ApertureRateLimitersLabel
// * otelcollector.ApertureDroppingRateLimitersLabel
// * otelcollector.ApertureConcurrencyLimitersLabel
// * otelcollector.ApertureDroppingConcurrencyLimitersLabel
// * otelcollector.ApertureWorkloadsLabel
// * otelcollector.ApertureDroppingWorkloadsLabel
// * otelcollector.ApertureFluxMetersLabel
// * otelcollector.ApertureFlowLabelKeysLabel
// * otelcollector.ApertureClassifiersLabel
// * otelcollector.ApertureClassifierErrorsLabel
// * otelcollector.ApertureDecisionTypeLabel
// * otelcollector.ApertureRejectReasonLabel
// * otelcollector.ApertureErrorLabel,
// * dynamic flow labels.
func AddCheckResponseBasedLabels(attributes pcommon.Map, checkResponse *flowcontrolv1.CheckResponse, sourceStr string) {
// Aperture Processing Duration
startTime := checkResponse.GetStart().AsTime()
endTime := checkResponse.GetEnd().AsTime()
if !startTime.IsZero() && !endTime.IsZero() {
attributes.PutDouble(otelcollector.ApertureProcessingDurationLabel, float64(endTime.Sub(startTime).Milliseconds()))
} else {
log.Sample(zerolog.Sometimes).Warn().Msgf("Aperture processing duration not found in %s access logs", sourceStr)
}
// Services
servicesValue := pcommon.NewValueSlice()
for _, service := range checkResponse.Services {
servicesValue.Slice().AppendEmpty().SetStr(service)
}
servicesValue.CopyTo(attributes.PutEmpty(otelcollector.ApertureServicesLabel))

// Control Point
attributes.PutString(otelcollector.ApertureControlPointLabel, checkResponse.GetControlPointInfo().String())

labels := map[string]pcommon.Value{
otelcollector.ApertureRateLimitersLabel: pcommon.NewValueSlice(),
otelcollector.ApertureDroppingRateLimitersLabel: pcommon.NewValueSlice(),
otelcollector.ApertureConcurrencyLimitersLabel: pcommon.NewValueSlice(),
otelcollector.ApertureDroppingConcurrencyLimitersLabel: pcommon.NewValueSlice(),
otelcollector.ApertureWorkloadsLabel: pcommon.NewValueSlice(),
otelcollector.ApertureDroppingWorkloadsLabel: pcommon.NewValueSlice(),
otelcollector.ApertureFluxMetersLabel: pcommon.NewValueSlice(),
otelcollector.ApertureFlowLabelKeysLabel: pcommon.NewValueSlice(),
otelcollector.ApertureClassifiersLabel: pcommon.NewValueSlice(),
otelcollector.ApertureClassifierErrorsLabel: pcommon.NewValueSlice(),
otelcollector.ApertureDecisionTypeLabel: pcommon.NewValueStr(checkResponse.DecisionType.String()),
otelcollector.ApertureRejectReasonLabel: pcommon.NewValueStr(checkResponse.GetRejectReason().String()),
otelcollector.ApertureErrorLabel: pcommon.NewValueStr(checkResponse.GetError().String()),
}
for _, decision := range checkResponse.LimiterDecisions {
if decision.GetRateLimiterInfo() != nil {
rawValue := []string{
fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, decision.GetPolicyName()),
fmt.Sprintf("%s:%v", metrics.ComponentIndexLabel, decision.GetComponentIndex()),
fmt.Sprintf("%s:%v", metrics.PolicyHashLabel, decision.GetPolicyHash()),
}
value := strings.Join(rawValue, ",")
labels[otelcollector.ApertureRateLimitersLabel].Slice().AppendEmpty().SetStr(value)
if decision.Dropped {
labels[otelcollector.ApertureDroppingRateLimitersLabel].Slice().AppendEmpty().SetStr(value)
}
}
if cl := decision.GetConcurrencyLimiterInfo(); cl != nil {
rawValue := []string{
fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, decision.GetPolicyName()),
fmt.Sprintf("%s:%v", metrics.ComponentIndexLabel, decision.GetComponentIndex()),
fmt.Sprintf("%s:%v", metrics.PolicyHashLabel, decision.GetPolicyHash()),
}
value := strings.Join(rawValue, ",")
labels[otelcollector.ApertureConcurrencyLimitersLabel].Slice().AppendEmpty().SetStr(value)
if decision.Dropped {
labels[otelcollector.ApertureDroppingConcurrencyLimitersLabel].Slice().AppendEmpty().SetStr(value)
}

workloadsRawValue := []string{
fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, decision.GetPolicyName()),
fmt.Sprintf("%s:%v", metrics.ComponentIndexLabel, decision.GetComponentIndex()),
fmt.Sprintf("%s:%v", metrics.WorkloadIndexLabel, cl.GetWorkloadIndex()),
fmt.Sprintf("%s:%v", metrics.PolicyHashLabel, decision.GetPolicyHash()),
}
value = strings.Join(workloadsRawValue, ",")
labels[otelcollector.ApertureWorkloadsLabel].Slice().AppendEmpty().SetStr(value)
if decision.Dropped {
labels[otelcollector.ApertureDroppingWorkloadsLabel].Slice().AppendEmpty().SetStr(value)
}
}
}
for _, fluxMeter := range checkResponse.FluxMeterInfos {
value := fluxMeter.GetFluxMeterName()
labels[otelcollector.ApertureFluxMetersLabel].Slice().AppendEmpty().SetStr(value)
}

for _, flowLabelKey := range checkResponse.GetFlowLabelKeys() {
labels[otelcollector.ApertureFlowLabelKeysLabel].Slice().AppendEmpty().SetStr(flowLabelKey)
}

for key, value := range checkResponse.GetTelemetryFlowLabels() {
pcommon.NewValueStr(value).CopyTo(attributes.PutEmpty(key))
}

for _, classifier := range checkResponse.ClassifierInfos {
rawValue := []string{
fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, classifier.PolicyName),
fmt.Sprintf("%s:%v", metrics.ClassifierIndexLabel, classifier.ClassifierIndex),
}
value := strings.Join(rawValue, ",")
labels[otelcollector.ApertureClassifiersLabel].Slice().AppendEmpty().SetStr(value)

// add errors as attributes as well
if classifier.Error != flowcontrolv1.ClassifierInfo_ERROR_NONE {
errorsValue := []string{
classifier.Error.String(),
fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, classifier.PolicyName),
fmt.Sprintf("%s:%v", metrics.ClassifierIndexLabel, classifier.ClassifierIndex),
fmt.Sprintf("%s:%v", metrics.PolicyHashLabel, classifier.PolicyHash),
}
joinedValue := strings.Join(errorsValue, ",")
labels[otelcollector.ApertureClassifierErrorsLabel].Slice().AppendEmpty().SetStr(joinedValue)
}
}

for key, value := range labels {
value.CopyTo(attributes.PutEmpty(key))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package internal_test

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.opentelemetry.io/collector/pdata/pcommon"
"google.golang.org/protobuf/types/known/timestamppb"

flowcontrolv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/flowcontrol/v1"
"github.com/fluxninja/aperture/pkg/otelcollector"
"github.com/fluxninja/aperture/pkg/otelcollector/metricsprocessor/internal"
)

var _ = DescribeTable("Check Response labels", func(checkResponse *flowcontrolv1.CheckResponse, after map[string]interface{}) {
attributes := pcommon.NewMap()
internal.AddCheckResponseBasedLabels(attributes, checkResponse, "source")
for k, v := range after {
Expect(attributes.AsRaw()).To(HaveKeyWithValue(k, v))
}
},
Entry("Sets processing duration",
&flowcontrolv1.CheckResponse{
Start: timestamppb.New(time.Date(1969, time.Month(7), 20, 17, 0, 0, 0, time.UTC)),
End: timestamppb.New(time.Date(1969, time.Month(7), 20, 17, 0, 1, 0, time.UTC)),
},
map[string]interface{}{otelcollector.ApertureProcessingDurationLabel: float64(1000)},
),

Entry("Sets services",
&flowcontrolv1.CheckResponse{
Services: []string{"svc1", "svc2"},
},
map[string]interface{}{otelcollector.ApertureServicesLabel: []interface{}{"svc1", "svc2"}},
),

Entry("Sets control point",
&flowcontrolv1.CheckResponse{
ControlPointInfo: &flowcontrolv1.ControlPointInfo{
Type: flowcontrolv1.ControlPointInfo_TYPE_INGRESS,
},
},
map[string]interface{}{otelcollector.ApertureControlPointLabel: "type:TYPE_INGRESS"},
),

Entry("Sets rate limiters",
&flowcontrolv1.CheckResponse{
LimiterDecisions: []*flowcontrolv1.LimiterDecision{
{
PolicyName: "foo",
PolicyHash: "foo-hash",
ComponentIndex: 2,
Dropped: true,
Details: &flowcontrolv1.LimiterDecision_RateLimiterInfo_{
RateLimiterInfo: &flowcontrolv1.LimiterDecision_RateLimiterInfo{
Remaining: 1,
Current: 1,
Label: "test",
},
},
},
},
},
map[string]interface{}{
otelcollector.ApertureRateLimitersLabel: []interface{}{"policy_name:foo,component_index:2,policy_hash:foo-hash"},
otelcollector.ApertureDroppingRateLimitersLabel: []interface{}{"policy_name:foo,component_index:2,policy_hash:foo-hash"},
},
),

Entry("Sets concurrency limiters",
&flowcontrolv1.CheckResponse{
LimiterDecisions: []*flowcontrolv1.LimiterDecision{
{
PolicyName: "foo",
PolicyHash: "foo-hash",
ComponentIndex: 1,
Dropped: true,
Details: &flowcontrolv1.LimiterDecision_ConcurrencyLimiterInfo_{
ConcurrencyLimiterInfo: &flowcontrolv1.LimiterDecision_ConcurrencyLimiterInfo{
WorkloadIndex: "0",
},
},
},
},
},
map[string]interface{}{
otelcollector.ApertureConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
otelcollector.ApertureDroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
},
),

Entry("Sets flux meters",
&flowcontrolv1.CheckResponse{
FluxMeterInfos: []*flowcontrolv1.FluxMeterInfo{
{FluxMeterName: "foo"},
{FluxMeterName: "bar"},
},
},
map[string]interface{}{otelcollector.ApertureFluxMetersLabel: []interface{}{"foo", "bar"}},
),

Entry("Sets flow labels",
&flowcontrolv1.CheckResponse{
FlowLabelKeys: []string{"someLabel", "otherLabel"},
},
map[string]interface{}{otelcollector.ApertureFlowLabelKeysLabel: []interface{}{"someLabel", "otherLabel"}},
),

Entry("Sets telemetry flow labels",
&flowcontrolv1.CheckResponse{
TelemetryFlowLabels: map[string]string{
"someLabel": "someValue",
"otherLabel": "otherValue",
},
},
map[string]interface{}{
"someLabel": "someValue",
"otherLabel": "otherValue",
},
),

Entry("Sets classifiers",
&flowcontrolv1.CheckResponse{
ClassifierInfos: []*flowcontrolv1.ClassifierInfo{
{
PolicyName: "foo",
PolicyHash: "bar",
ClassifierIndex: 42,
LabelKey: "timing",
Error: flowcontrolv1.ClassifierInfo_ERROR_MULTI_EXPRESSION,
},
},
},
map[string]interface{}{
otelcollector.ApertureClassifiersLabel: []interface{}{"policy_name:foo,classifier_index:42"},
otelcollector.ApertureClassifierErrorsLabel: []interface{}{"ERROR_MULTI_EXPRESSION,policy_name:foo,classifier_index:42,policy_hash:bar"},
},
),
)
30 changes: 30 additions & 0 deletions pkg/otelcollector/metricsprocessor/internal/envoy_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package internal

import (
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/fluxninja/aperture/pkg/otelcollector"
)

// AddEnvoySpecificLabels adds labels specific to Envoy data source.
func AddEnvoySpecificLabels(attributes pcommon.Map) {
treatAsZero := []string{otelcollector.EnvoyMissingAttributeValue}
// Retrieve request length
requestLength, _ := otelcollector.GetFloat64(attributes, otelcollector.EnvoyBytesSentLabel, treatAsZero)
attributes.PutDouble(otelcollector.HTTPRequestContentLength, requestLength)
// Retrieve response lengths
responseLength, _ := otelcollector.GetFloat64(attributes, otelcollector.EnvoyBytesReceivedLabel, treatAsZero)
attributes.PutDouble(otelcollector.HTTPResponseContentLength, responseLength)

// Compute durations
responseDuration, responseDurationExists := otelcollector.GetFloat64(attributes, otelcollector.EnvoyResponseDurationLabel, treatAsZero)
authzDuration, authzDurationExists := otelcollector.GetFloat64(attributes, otelcollector.EnvoyAuthzDurationLabel, treatAsZero)

if responseDurationExists {
attributes.PutDouble(otelcollector.FlowDurationLabel, responseDuration)
}

if responseDurationExists && authzDurationExists {
attributes.PutDouble(otelcollector.WorkloadDurationLabel, responseDuration-authzDuration)
}
}
43 changes: 43 additions & 0 deletions pkg/otelcollector/metricsprocessor/internal/envoy_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package internal_test

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/fluxninja/aperture/pkg/otelcollector"
"github.com/fluxninja/aperture/pkg/otelcollector/metricsprocessor/internal"
)

var _ = DescribeTable("Envoy labels", func(before, after map[string]float64) {
attributes := pcommon.NewMap()
for k, v := range before {
attributes.PutDouble(k, v)
}
internal.AddEnvoySpecificLabels(attributes)
for k, v := range after {
rawOut, exists := attributes.Get(k)
Expect(exists).To(BeTrue())
Expect(rawOut.Double()).To(Equal(v))
}
},
Entry("Sets request content length",
map[string]float64{otelcollector.EnvoyBytesSentLabel: 123},
map[string]float64{otelcollector.HTTPRequestContentLength: 123},
),
Entry("Sets response content length",
map[string]float64{otelcollector.EnvoyBytesReceivedLabel: 123},
map[string]float64{otelcollector.HTTPResponseContentLength: 123},
),
Entry("Sets flow duration",
map[string]float64{otelcollector.EnvoyResponseDurationLabel: 123},
map[string]float64{otelcollector.FlowDurationLabel: 123},
),
Entry("Sets workload duration",
map[string]float64{
otelcollector.EnvoyResponseDurationLabel: 123,
otelcollector.EnvoyAuthzDurationLabel: 23,
},
map[string]float64{otelcollector.WorkloadDurationLabel: 100},
),
)
13 changes: 13 additions & 0 deletions pkg/otelcollector/metricsprocessor/internal/internal_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package internal_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestInternal(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Internal Suite")
}
Loading

0 comments on commit 65bab78

Please sign in to comment.