Skip to content

Commit

Permalink
plugins: Surface more decision log errors via status API
Browse files Browse the repository at this point in the history
Previously in open-policy-agent#5732 we updated the decision log plugin to
surface errors via the Status API. However in that change
certain events like encoder errors and log drops due to
buffer size limits had no metrics associated with them.
This change adds more metrics for these events so that they
can be surfaced via the Status API.

Fixes: open-policy-agent#5637

Signed-off-by: Ashutosh Narkar <[email protected]>
  • Loading branch information
ashutosh-narkar committed Mar 30, 2023
1 parent 2d1583e commit 9ba19f3
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 30 deletions.
2 changes: 1 addition & 1 deletion plugins/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,7 @@ func TestStatusMetricsForLogDrops(t *testing.T) {
}

builtInMet := e.Fields["metrics"].(map[string]interface{})["<built-in>"]
dropCount := builtInMet.(map[string]interface{})["counter_decision_logs_dropped"]
dropCount := builtInMet.(map[string]interface{})["counter_decision_logs_dropped_rate_limit_exceeded"]

actual, err := dropCount.(json.Number).Int64()
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions plugins/logs/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
)

const (
encHardLimitThreshold = 0.9
softLimitBaseFactor = 2
softLimitExponentScaleFactor = 0.2
encSoftLimitScaleUpCounterName = "enc_soft_limit_scale_up"
encSoftLimitScaleDownCounterName = "enc_soft_limit_scale_down"
encSoftLimitStableCounterName = "enc_soft_limit_stable"
encHardLimitThreshold = 0.9
softLimitBaseFactor = 2
softLimitExponentScaleFactor = 0.2
encLogExUploadSizeLimitCounterName = "enc_log_exceeded_upload_size_limit_bytes"
encSoftLimitScaleUpCounterName = "enc_soft_limit_scale_up"
encSoftLimitScaleDownCounterName = "enc_soft_limit_scale_down"
encSoftLimitStableCounterName = "enc_soft_limit_stable"
)

// chunkEncoder implements log buffer chunking and compression. Log events are
Expand Down Expand Up @@ -65,6 +66,9 @@ func (enc *chunkEncoder) Write(event EventV1) (result [][]byte, err error) {
if len(bs) == 0 {
return nil, nil
} else if int64(len(bs)+2) > enc.limit {
if enc.metrics != nil {
enc.metrics.Counter(encLogExUploadSizeLimitCounterName).Incr()
}
return nil, fmt.Errorf("upload chunk size (%d) exceeds upload_size_limit_bytes (%d)",
int64(len(bs)+2), enc.limit)
}
Expand Down
38 changes: 23 additions & 15 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,18 @@ func roundtripJSONToAST(x interface{}) (ast.Value, error) {

const (
// min amount of time to wait following a failure
minRetryDelay = time.Millisecond * 100
defaultMinDelaySeconds = int64(300)
defaultMaxDelaySeconds = int64(600)
defaultUploadSizeLimitBytes = int64(32768) // 32KB limit
defaultBufferSizeLimitBytes = int64(0) // unlimited
defaultMaskDecisionPath = "/system/log/mask"
defaultDropDecisionPath = "/system/log/drop"
logDropCounterName = "decision_logs_dropped"
logNDBDropCounterName = "decision_logs_nd_builtin_cache_dropped"
defaultResourcePath = "/logs"
minRetryDelay = time.Millisecond * 100
defaultMinDelaySeconds = int64(300)
defaultMaxDelaySeconds = int64(600)
defaultUploadSizeLimitBytes = int64(32768) // 32KB limit
defaultBufferSizeLimitBytes = int64(0) // unlimited
defaultMaskDecisionPath = "/system/log/mask"
defaultDropDecisionPath = "/system/log/drop"
logRateLimitExDropCounterName = "decision_logs_dropped_rate_limit_exceeded"
logNDBDropCounterName = "decision_logs_nd_builtin_cache_dropped"
logBufferSizeLimitExDropCounterName = "decision_logs_dropped_buffer_size_limit_bytes_exceeded"
logEncodingFailureCounterName = "decision_logs_encoding_failure"
defaultResourcePath = "/logs"
)

// ReportingConfig represents configuration for the plugin's reporting behaviour.
Expand Down Expand Up @@ -778,10 +780,6 @@ func (p *Plugin) doOneShot(ctx context.Context) error {

p.status.SetError(err)

if p.metrics != nil {
p.status.Metrics = p.metrics
}

if s := status.Lookup(p.manager); s != nil {
s.UpdateDecisionLogsStatus(*p.status)
}
Expand Down Expand Up @@ -873,7 +871,7 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
if p.limiter != nil {
if !p.limiter.Allow() {
if p.metrics != nil {
p.metrics.Counter(logDropCounterName).Incr()
p.metrics.Counter(logRateLimitExDropCounterName).Incr()
}

p.logger.Error("Decision log dropped as rate limit exceeded. Reduce reporting interval or increase rate limit.")
Expand All @@ -889,6 +887,10 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
// TODO(tsandall): revisit this now that we have an API that
// can return an error. Should the default behaviour be to
// fail-closed as we do for plugins?

if p.metrics != nil {
p.metrics.Counter(logEncodingFailureCounterName).Incr()
}
p.logger.Error("Log encoding failed: %v.", err)
return
}
Expand All @@ -899,6 +901,9 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {

result, err = p.enc.Write(newEvent)
if err != nil {
if p.metrics != nil {
p.metrics.Counter(logEncodingFailureCounterName).Incr()
}
p.logger.Error("Log encoding failed: %v.", err)
return
}
Expand All @@ -916,6 +921,9 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
func (p *Plugin) bufferChunk(buffer *logBuffer, bs []byte) {
dropped := buffer.Push(bs)
if dropped > 0 {
if p.metrics != nil {
p.metrics.Counter(logBufferSizeLimitExDropCounterName).Incr()
}
p.logger.Error("Dropped %v chunks from buffer. Reduce reporting interval or increase buffer size.", dropped)
}
}
Expand Down
223 changes: 215 additions & 8 deletions plugins/logs/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,209 @@ func TestPluginStatusUpdateHTTPError(t *testing.T) {
}
}

func TestPluginStatusUpdate(t *testing.T) {
func TestPluginStatusUpdateEncodingFailure(t *testing.T) {
ctx := context.Background()
testLogger := test.New()

ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z")
if err != nil {
panic(err)
}

fixture := newTestFixture(t, testFixtureOptions{
ConsoleLogger: testLogger,
ReportingUploadSizeLimitBytes: 1,
})
defer fixture.server.stop()

m := metrics.New()
fixture.plugin.metrics = m
fixture.plugin.enc.metrics = m

var input interface{} = map[string]interface{}{"method": "GET"}
var result interface{} = false

event := &server.Info{
DecisionID: "abc",
Path: "foo/bar",
Input: &input,
Results: &result,
RemoteAddr: "test-1",
Timestamp: ts,
}

err = fixture.plugin.Log(ctx, event)
if err != nil {
t.Fatal(err)
}

fixture.plugin.mtx.Lock()
if fixture.plugin.enc.bytesWritten != 0 {
t.Fatal("Expected no event to be written into the encoder")
}
fixture.plugin.mtx.Unlock()

// Create a status plugin that logs to console
pluginConfig := []byte(`{
"console": true,
}`)

config, _ := status.ParseConfig(pluginConfig, fixture.manager.Services(), nil)
p := status.New(config, fixture.manager).WithMetrics(fixture.plugin.metrics)

fixture.manager.Register(status.Name, p)
if err := fixture.manager.Start(ctx); err != nil {
t.Fatal(err)
}

// Trigger a status update
fixture.server.expCode = 200
err = fixture.plugin.doOneShot(ctx)
if err != nil {
t.Fatal("Unexpected error")
}

// Give the logger / console some time to process and print the events
time.Sleep(10 * time.Millisecond)
p.Stop(ctx)

entries := testLogger.Entries()
if len(entries) == 0 {
t.Fatal("Expected log entries but got none")
}

// Pick the last entry as it should have the decision log metrics
e := entries[len(entries)-1]

if _, ok := e.Fields["metrics"]; !ok {
t.Fatal("Expected metrics field in status update")
}

fmt.Println(e.Fields["metrics"])

exp := map[string]interface{}{"<built-in>": map[string]interface{}{"counter_decision_logs_encoding_failure": json.Number("1"),
"counter_enc_log_exceeded_upload_size_limit_bytes": json.Number("1")}}

if !reflect.DeepEqual(e.Fields["metrics"], exp) {
t.Fatalf("Expected %v but got %v", exp, e.Fields["metrics"])
}
}

func TestPluginStatusUpdateBufferSizeExceeded(t *testing.T) {
ctx := context.Background()
testLogger := test.New()

ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z")
if err != nil {
panic(err)
}

fixture := newTestFixture(t, testFixtureOptions{
ConsoleLogger: testLogger,
ReportingBufferSizeLimitBytes: 200,
ReportingUploadSizeLimitBytes: 300,
})
defer fixture.server.stop()

fixture.server.ch = make(chan []EventV1, 1)

fixture.plugin.metrics = metrics.New()

var input interface{} = map[string]interface{}{"method": "GET"}
var result interface{} = false

event1 := &server.Info{
DecisionID: "abc",
Path: "foo/bar",
Input: &input,
Results: &result,
RemoteAddr: "test-1",
Timestamp: ts,
}

event2 := &server.Info{
DecisionID: "def",
Path: "foo/baz",
Input: &input,
Results: &result,
RemoteAddr: "test-2",
Timestamp: ts,
}

event3 := &server.Info{
DecisionID: "ghi",
Path: "foo/aux",
Input: &input,
Results: &result,
RemoteAddr: "test-3",
Timestamp: ts,
}

// write event 1 and 2 into the encoder and check the chunk is inserted into the buffer
_ = fixture.plugin.Log(ctx, event1)
_ = fixture.plugin.Log(ctx, event2)

fixture.plugin.mtx.Lock()
if fixture.plugin.enc.bytesWritten == 0 {
t.Fatal("Expected event to be written into the encoder")
}

if fixture.plugin.buffer.Len() == 0 {
t.Fatal("Expected one chunk to be written into the buffer")
}
fixture.plugin.mtx.Unlock()

// write event 3 into the encoder and then flush the encoder which will result in the event being
// written to the buffer. But given the buffer size it won't be able to hold this event and will
// drop the existing chunk
_ = fixture.plugin.Log(ctx, event3)

// Create a status plugin that logs to console
pluginConfig := []byte(`{
"console": true,
}`)

config, _ := status.ParseConfig(pluginConfig, fixture.manager.Services(), nil)
p := status.New(config, fixture.manager).WithMetrics(fixture.plugin.metrics)

fixture.manager.Register(status.Name, p)
if err := fixture.manager.Start(ctx); err != nil {
t.Fatal(err)
}

// Trigger a status update
fixture.server.expCode = 200
err = fixture.plugin.doOneShot(ctx)
if err != nil {
t.Fatal("Unexpected error")
}

<-fixture.server.ch

// Give the logger / console some time to process and print the events
time.Sleep(10 * time.Millisecond)
p.Stop(ctx)

entries := testLogger.Entries()
if len(entries) == 0 {
t.Fatal("Expected log entries but got none")
}

// Pick the last entry as it should have the decision log metrics
e := entries[len(entries)-1]

if _, ok := e.Fields["metrics"]; !ok {
t.Fatal("Expected metrics field in status update")
}

exp := map[string]interface{}{"<built-in>": map[string]interface{}{"counter_decision_logs_dropped_buffer_size_limit_bytes_exceeded": json.Number("1")}}

if !reflect.DeepEqual(e.Fields["metrics"], exp) {
t.Fatalf("Expected %v but got %v", exp, e.Fields["metrics"])
}
}

func TestPluginStatusUpdateRateLimitExceeded(t *testing.T) {
ctx := context.Background()
testLogger := test.New()

Expand Down Expand Up @@ -889,17 +1091,17 @@ func TestPluginStatusUpdate(t *testing.T) {
t.Fatal("Expected log entries but got none")
}

// Pick the last entry as it should have the decision log update
// Pick the last entry as it should have the decision log metrics
e := entries[len(entries)-1]

if _, ok := e.Fields["decision_logs"]; !ok {
t.Fatal("Expected decision_log status update")
if _, ok := e.Fields["metrics"]; !ok {
t.Fatal("Expected metrics field in status update")
}

exp := map[string]interface{}{"metrics": map[string]interface{}{"counter_decision_logs_dropped": json.Number("2")}}
exp := map[string]interface{}{"<built-in>": map[string]interface{}{"counter_decision_logs_dropped_rate_limit_exceeded": json.Number("2")}}

if !reflect.DeepEqual(e.Fields["decision_logs"], exp) {
t.Fatalf("Expected %v but got %v", exp, e.Fields["decision_logs"])
if !reflect.DeepEqual(e.Fields["metrics"], exp) {
t.Fatalf("Expected %v but got %v", exp, e.Fields["metrics"])
}
}

Expand Down Expand Up @@ -1059,7 +1261,7 @@ func TestPluginRateLimitDropCountStatus(t *testing.T) {
t.Fatal("Expected metrics")
}

exp := map[string]interface{}{"<built-in>": map[string]interface{}{"counter_decision_logs_dropped": json.Number("2")}}
exp := map[string]interface{}{"<built-in>": map[string]interface{}{"counter_decision_logs_dropped_rate_limit_exceeded": json.Number("2")}}

if !reflect.DeepEqual(e.Fields["metrics"], exp) {
t.Fatalf("Expected %v but got %v", exp, e.Fields["metrics"])
Expand Down Expand Up @@ -2040,6 +2242,7 @@ type testFixtureOptions struct {
ConsoleLogger *test.Logger
ReportingUploadSizeLimitBytes int64
ReportingMaxDecisionsPerSecond float64
ReportingBufferSizeLimitBytes int64
Resource *string
TestServerPath *string
PartitionName *string
Expand Down Expand Up @@ -2150,6 +2353,10 @@ func newTestFixture(t *testing.T, opts ...testFixtureOptions) testFixture {
config.Reporting.UploadSizeLimitBytes = &options.ReportingUploadSizeLimitBytes
}

if options.ReportingBufferSizeLimitBytes != 0 {
config.Reporting.BufferSizeLimitBytes = &options.ReportingBufferSizeLimitBytes
}

if s, ok := manager.PluginStatus()[Name]; ok {
t.Fatalf("Unexpected status found in plugin manager for %s: %+v", Name, s)
}
Expand Down

0 comments on commit 9ba19f3

Please sign in to comment.