Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] - Added input metrics #41505

Merged
merged 18 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support to include AWS cloudwatch linked accounts when using log_group_name_prefix to define log group names. {pull}41206[41206]
- Improved Azure Blob Storage input documentation. {pull}41252[41252]
- Make ETW input GA. {pull}41389[41389]
- Added input metrics to GCS input. {issue}36640[36640] {pull}41505[41505]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/gcs/decoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDecoding(t *testing.T) {
}
defer f.Close()
p := &pub{t: t}
j := newJob(&storage.BucketHandle{}, &storage.ObjectAttrs{Name: "test_object"}, "gs://test_uri", newState(), &Source{}, p, log, false)
j := newJob(&storage.BucketHandle{}, &storage.ObjectAttrs{Name: "test_object"}, "gs://test_uri", newState(), &Source{}, p, nil, log, false)
j.src.ReaderConfig.Decoding = tc.config
err = j.decode(context.Background(), f, "test")
if err != nil {
Expand Down
72 changes: 58 additions & 14 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
src *Source
// publisher is used to publish a beat event to the output stream
publisher cursor.Publisher
// metrics used to track the errors and success of jobs
metrics *inputMetrics
// custom logger
log *logp.Logger
// flag used to denote if this object has previously failed without being processed at all.
Expand All @@ -53,8 +55,12 @@

// newJob, returns an instance of a job, which is a unit of work that can be assigned to a go routine
func newJob(bucket *storage.BucketHandle, object *storage.ObjectAttrs, objectURI string,
state *state, src *Source, publisher cursor.Publisher, log *logp.Logger, isFailed bool,
state *state, src *Source, publisher cursor.Publisher, metrics *inputMetrics, log *logp.Logger, isFailed bool,
) *job {
if metrics == nil {
// metrics are optional, initialize a stub if not provided
metrics = newInputMetrics("", nil)
}
return &job{
bucket: bucket,
object: object,
Expand All @@ -63,6 +69,7 @@
state: state,
src: src,
publisher: publisher,
metrics: metrics,
log: log,
isFailed: isFailed,
}
Expand All @@ -78,17 +85,33 @@

func (j *job) do(ctx context.Context, id string) {
var fields mapstr.M
// metrics & logging
j.log.Debug("begin gcs object processing.")
j.metrics.gcsObjectsRequestedTotal.Inc()
j.metrics.gcsObjectsInflight.Inc()
start := time.Now()
defer func() {
elapsed := time.Since(start)
j.metrics.gcsObjectsInflight.Dec()
j.metrics.gcsObjectProcessingTime.Update(elapsed.Nanoseconds())
j.log.Debugw("end gcs object processing.", "elapsed_time_ns", elapsed)
}()

if allowedContentTypes[j.object.ContentType] {
if j.object.ContentType == gzType || j.object.ContentEncoding == encodingGzip {
j.isCompressed = true
}
err := j.processAndPublishData(ctx, id)
if err != nil {
j.state.updateFailedJobs(j.object.Name)
j.state.updateFailedJobs(j.object.Name, j.metrics)
j.log.Errorw("job encountered an error while publishing data and has been added to a failed jobs list", "gcs.jobId", id, "error", err)
j.metrics.gcsFailedJobsTotal.Inc()
j.metrics.errorsTotal.Inc()
return
}
j.metrics.gcsObjectsPublishedTotal.Inc()
//nolint:gosec // object size cannot be negative hence this conversion is safe

Check failure on line 113 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

directive `//nolint:gosec // object size cannot be negative hence this conversion is safe` is unused for linter "gosec" (nolintlint)
j.metrics.gcsBytesProcessedTotal.Add(uint64(j.object.Size))

} else {
err := fmt.Errorf("job with jobId %s encountered an error: content-type %s not supported", id, j.object.ContentType)
Expand All @@ -104,6 +127,7 @@
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
if err := j.publisher.Publish(event, cp); err != nil {
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
j.metrics.errorsTotal.Inc()
}
// unlocks after data is saved and published
done()
Expand Down Expand Up @@ -133,11 +157,18 @@
defer func() {
err = reader.Close()
if err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("failed to close reader for object", "objectName", j.object.Name, "error", err)
}
}()

return j.decode(ctx, reader, id)
// calculate number of decode errors
if err := j.decode(ctx, reader, id); err != nil {
j.metrics.decodeErrorsTotal.Inc()
return fmt.Errorf("failed to decode object: %s, with error: %w", j.object.Name, err)
}

return nil
}

func (j *job) decode(ctx context.Context, r io.Reader, id string) error {
Expand All @@ -163,7 +194,7 @@
var v mapstr.M
msg, v, err = dec.decodeValue()
if err != nil {
if err == io.EOF {

Check failure on line 197 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
break
Expand All @@ -172,7 +203,7 @@
} else {
msg, err = dec.decode()
if err != nil {
if err == io.EOF {

Check failure on line 206 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
break
Expand All @@ -188,7 +219,7 @@
for dec.next() {
msg, err := dec.decode()
if err != nil {
if err == io.EOF {

Check failure on line 222 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
break
Expand Down Expand Up @@ -241,17 +272,24 @@

// if expand_event_list_from_field is set, then split the event list
if j.src.ExpandEventListFromField != "" {
if err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, j.hash, id); err != nil {
if numEvents, err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, id); err != nil {
return err
} else {
j.metrics.gcsEventsPerObject.Update(int64(numEvents))
}
continue
} else {
j.metrics.gcsEventsPerObject.Update(1)
}

var parsedData []mapstr.M
if j.src.ParseJSON {
parsedData, err = decodeJSON(bytes.NewReader(item))
if err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
// since we do not want to stop processing the job here as this is purely cosmetic and optional, we log the error and continue
j.metrics.errorsTotal.Inc()
j.metrics.decodeErrorsTotal.Inc()
j.log.Errorw("job encountered an error during 'ParseJSON' op", "gcs.jobId", id, "error", err)
}
}
evt := j.createEvent(item, parsedData, offset)
Expand All @@ -265,27 +303,30 @@
// if this is the last object, then perform a complete state save
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
if err := j.publisher.Publish(evt, cp); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
done()
return
}
// since we don't update the cursor checkpoint, lack of a lock here is not a problem
if err := j.publisher.Publish(evt, nil); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
}

// splitEventList splits the event list into individual events and publishes them
func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error {
func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, id string) (int, error) {
var jsonObject map[string]json.RawMessage
var evensPerObject int
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
if err := json.Unmarshal(raw, &jsonObject); err != nil {
return fmt.Errorf("job with job id %s encountered an unmarshaling error: %w", id, err)
return evensPerObject, fmt.Errorf("job with job id %s encountered an unmarshaling error: %w", id, err)
}

raw, found := jsonObject[key]
if !found {
return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key)
return evensPerObject, fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key)
}

dec := json.NewDecoder(bytes.NewReader(raw))
Expand All @@ -294,43 +335,46 @@

tok, err := dec.Token()
if err != nil {
return fmt.Errorf("failed to read JSON token for object: %s, with error: %w", j.object.Name, err)
return evensPerObject, fmt.Errorf("failed to read JSON token for object: %s, with error: %w", j.object.Name, err)
}
delim, ok := tok.(json.Delim)
if !ok || delim != '[' {
return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key)
return evensPerObject, fmt.Errorf("expand_event_list_from_field <%v> is not an array", key)
}

for dec.More() {
arrayOffset := dec.InputOffset()

var item json.RawMessage
if err := dec.Decode(&item); err != nil {
return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err)
return evensPerObject, fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err)
}

data, err := item.MarshalJSON()
if err != nil {
return fmt.Errorf("job with job id %s encountered a marshaling error: %w", id, err)
return evensPerObject, fmt.Errorf("job with job id %s encountered a marshaling error: %w", id, err)
}
evt := j.createEvent(data, nil, offset+arrayOffset)

if !dec.More() {
// if this is the last object, then perform a complete state save
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
if err := j.publisher.Publish(evt, cp); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
done()
} else {
// since we don't update the cursor checkpoint, lack of a lock here is not a problem
if err := j.publisher.Publish(evt, nil); err != nil {
j.metrics.errorsTotal.Inc()
j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err)
}
}
evensPerObject++
}

return nil
return evensPerObject, nil
}

// addGzipDecoderIfNeeded determines whether the given stream of bytes (encapsulated in a buffered reader)
Expand All @@ -338,7 +382,7 @@
// so the function can peek into the byte stream without consuming it. This makes it convenient for
// code executed after this function call to consume the stream if it wants.
func (j *job) addGzipDecoderIfNeeded(reader *bufio.Reader) (io.Reader, error) {
isStreamGzipped := false

Check failure on line 385 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

assigned to isStreamGzipped, but reassigned without using the value (wastedassign)
// check if stream is gziped or not
buf, err := reader.Peek(3)
if err != nil {
Expand Down Expand Up @@ -426,7 +470,7 @@
},
}
event.SetID(objectID(j.hash, offset))

j.metrics.gcsEventsCreatedTotal.Inc()
return event
}

Expand Down
72 changes: 72 additions & 0 deletions x-pack/filebeat/input/gcs/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package gcs

import (
"github.com/rcrowley/go-metrics"

"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()
url *monitoring.String // URL of the input resource
errorsTotal *monitoring.Uint // number of errors encountered
decodeErrorsTotal *monitoring.Uint // number of decode errors encountered

ShourieG marked this conversation as resolved.
Show resolved Hide resolved
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
gcsObjectsRequestedTotal *monitoring.Uint // Number of GCS objects downloaded.
gcsObjectsPublishedTotal *monitoring.Uint // Number of GCS objects processed that were published.
gcsObjectsListedTotal *monitoring.Uint // Number of GCS objects returned by list operations.
gcsBytesProcessedTotal *monitoring.Uint // Number of GCS bytes processed.
gcsEventsCreatedTotal *monitoring.Uint // Number of events created from processing GCS data.
gcsFailedJobsTotal *monitoring.Uint // Number of failed jobs.
gcsExpiredFailedJobsTotal *monitoring.Uint // Number of expired failed jobs that could not be recovered.
gcsObjectsInflight *monitoring.Uint // Number of GCS objects inflight (gauge).
gcsObjectProcessingTime metrics.Sample // Histogram of the elapsed GCS object processing times in nanoseconds (start of download to completion of parsing).
gcsObjectSizeInBytes metrics.Sample // Histogram of processed GCS object size in bytes
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
gcsEventsPerObject metrics.Sample // Histogram of events in an individual GCS object
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
gcsJobsScheduledAfterValidation metrics.Sample // Number of jobs scheduled after validation.
}

func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent)
out := &inputMetrics{
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
unregister: unreg,
url: monitoring.NewString(reg, "url"),
errorsTotal: monitoring.NewUint(reg, "errors_total"),
decodeErrorsTotal: monitoring.NewUint(reg, "decode_errors_total"),

gcsObjectsRequestedTotal: monitoring.NewUint(reg, "gcs_objects_requested_total"),
gcsObjectsPublishedTotal: monitoring.NewUint(reg, "gcs_objects_published_total"),
gcsObjectsListedTotal: monitoring.NewUint(reg, "gcs_objects_listed_total"),
gcsBytesProcessedTotal: monitoring.NewUint(reg, "gcs_bytes_processed_total"),
gcsEventsCreatedTotal: monitoring.NewUint(reg, "gcs_events_created_total"),
gcsFailedJobsTotal: monitoring.NewUint(reg, "gcs_failed_jobs_total"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a gauge.

gcs_failed_jobs_gauge

Copy link
Contributor Author

@ShourieG ShourieG Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be a gauge, I'll remove the Dec(). FailedJobsTotal - ExpiredFailedJobsTotal = Recovered Jobs, at any given point of time.

gcsExpiredFailedJobsTotal: monitoring.NewUint(reg, "gcs_expired_failed_jobs_total"),
gcsObjectsInflight: monitoring.NewUint(reg, "gcs_objects_inflight"),
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
gcsObjectProcessingTime: metrics.NewUniformSample(1024),
gcsObjectSizeInBytes: metrics.NewUniformSample(1024),
gcsEventsPerObject: metrics.NewUniformSample(1024),
gcsJobsScheduledAfterValidation: metrics.NewUniformSample(1024),
}

adapter.NewGoMetrics(reg, "gcs_object_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.gcsObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "gcs_object_size_in_bytes", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.gcsObjectSizeInBytes)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "gcs_events_per_object", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.gcsEventsPerObject)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "gcs_jobs_scheduled_after_validation", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.gcsJobsScheduledAfterValidation)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.

return out
}

func (m *inputMetrics) Close() {
m.unregister()
}
62 changes: 62 additions & 0 deletions x-pack/filebeat/input/gcs/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package gcs

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/monitoring"
)

// TestInputMetricsClose asserts that metrics registered by this input are
// removed after Close() is called. This is important because an input with
// the same ID could be re-registered, and that ID cannot exist in the
// monitoring registry.
func TestInputMetricsClose(t *testing.T) {
reg := monitoring.NewRegistry()

metrics := newInputMetrics("gcs-cl-bucket.cloudflare_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg)
metrics.Close()

reg.Do(monitoring.Full, func(s string, _ interface{}) {
t.Errorf("registry should be empty, but found %v", s)
})
}

// TestNewInputMetricsInstance asserts that all the metrics are initialized
// when a newInputMetrics method is invoked. This avoids nil hit panics when
// a getter is invoked on any uninitialized metric.
func TestNewInputMetricsInstance(t *testing.T) {
reg := monitoring.NewRegistry()
metrics := newInputMetrics("gcs-new-metric-test", reg)

assert.NotNil(t, metrics.errorsTotal,
metrics.decodeErrorsTotal,
metrics.gcsObjectsRequestedTotal,
metrics.gcsObjectsPublishedTotal,
metrics.gcsObjectsListedTotal,
metrics.gcsBytesProcessedTotal,
metrics.gcsEventsCreatedTotal,
metrics.gcsFailedJobsTotal,
metrics.gcsExpiredFailedJobsTotal,
metrics.gcsObjectsInflight,
metrics.gcsObjectProcessingTime,
metrics.gcsObjectSizeInBytes,
metrics.gcsEventsPerObject,
metrics.gcsJobsScheduledAfterValidation)

assert.Equal(t, uint64(0x0), metrics.errorsTotal.Get())
assert.Equal(t, uint64(0x0), metrics.decodeErrorsTotal.Get())
assert.Equal(t, uint64(0x0), metrics.gcsObjectsRequestedTotal.Get())
assert.Equal(t, uint64(0x0), metrics.gcsObjectsPublishedTotal.Get())
assert.Equal(t, uint64(0x0), metrics.gcsObjectsListedTotal.Get())
assert.Equal(t, uint64(0x0), metrics.gcsBytesProcessedTotal.Get())
assert.Equal(t, uint64(0x0), metrics.gcsEventsCreatedTotal.Get())
assert.Equal(t, uint64(0x0), metrics.gcsFailedJobsTotal.Get())
assert.Equal(t, uint64(0x0), metrics.gcsExpiredFailedJobsTotal.Get())
assert.Equal(t, uint64(0x0), metrics.gcsObjectsInflight.Get())
}
Loading
Loading