diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ebd009c7ffa9..73182bbc5011 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -331,6 +331,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support to include AWS cloudwatch linked accounts when using log_group_name_prefix to define log group names. {pull}41206[41206] - Improved Azure Blob Storage input documentation. {pull}41252[41252] - Make ETW input GA. {pull}41389[41389] +- Added input metrics to GCS input. {issue}36640[36640] {pull}41505[41505] - Add support for Okta entity analytics provider to collect role and factor data for users. {pull}41460[41460] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc index eae7158c78df..23ac0e021c6a 100644 --- a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc @@ -462,5 +462,38 @@ filebeat.inputs: In this configuration even though we have specified `max_workers = 10`, `poll = true` and `poll_interval = 15s` at the root level, both the buckets will override these values with their own respective values which are defined as part of their sub attibutes. +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/inputs` path. They can be used to +observe the activity of the input. + +[options="header"] +|======= +| Metric | Description +| `url` | URL of the input resource. +| `errors_total` | Total number of errors encountered by the input. +| `decode_errors_total` | Total number of decode errors encountered by the input. +| `gcs_objects_requested_total` | Total number of GCS objects downloaded. +| `gcs_objects_published_total` | Total number of GCS objects processed that were published. +| `gcs_objects_listed_total` | Total number of GCS objects returned by list operations. +| `gcs_bytes_processed_total` | Total number of GCS bytes processed. +| `gcs_events_created_total` | Total number of events created from processing GCS data. +| `gcs_failed_jobs_total` | Total number of failed jobs. +| `gcs_expired_failed_jobs_total` | Total number of expired failed jobs that could not be recovered. +| `gcs_objects_tracked_gauge` | Number of objects currently tracked in the state registry (gauge). +| `gcs_objects_inflight_gauge` | Number of GCS objects inflight (gauge). +| `gcs_jobs_scheduled_after_validation` | Histogram of the number of jobs scheduled after validation. +| `gcs_object_processing_time` | Histogram of the elapsed GCS object processing times in nanoseconds (start of download to completion of parsing). +| `gcs_object_size_in_bytes` | Histogram of processed GCS object size in bytes. +| `gcs_events_per_object` | Histogram of event count per GCS object. +| `source_lag_time` | Histogram of the time between the source (Updated) timestamp and the time the object was read, in nanoseconds. +|======= + +==== Common input options + +[id="{beatname_lc}-input-{type}-common-options"] +include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] NOTE: Any feedback is welcome which will help us further optimize this input. Please feel free to open a github issue for any bugs or feature requests. diff --git a/x-pack/filebeat/input/gcs/decoding_test.go b/x-pack/filebeat/input/gcs/decoding_test.go index 0a2ee5e3f0d7..a57fe62a5ed6 100644 --- a/x-pack/filebeat/input/gcs/decoding_test.go +++ b/x-pack/filebeat/input/gcs/decoding_test.go @@ -79,7 +79,7 @@ func TestDecoding(t *testing.T) { } defer f.Close() p := &pub{t: t} - j := newJob(&storage.BucketHandle{}, &storage.ObjectAttrs{Name: "test_object"}, "gs://test_uri", newState(), &Source{}, p, log, false) + j := newJob(&storage.BucketHandle{}, &storage.ObjectAttrs{Name: "test_object"}, "gs://test_uri", newState(), &Source{}, p, nil, log, false) j.src.ReaderConfig.Decoding = tc.config err = j.decode(context.Background(), f, "test") if err != nil { diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index a2ecf2c28afc..cc0e9ad74bbb 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -152,9 +152,15 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName) log.Infof("Running google cloud storage for project: %s", input.config.ProjectId) + // create a new inputMetrics instance + metrics := newInputMetrics(inputCtx.ID+":"+currentSource.BucketName, nil) + metrics.url.Set("gs://" + currentSource.BucketName) + defer metrics.Close() + var cp *Checkpoint if !cursor.IsNew() { if err := cursor.Unpack(&cp); err != nil { + metrics.errorsTotal.Inc() return err } @@ -169,6 +175,7 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, client, err := fetchStorageClient(ctx, input.config, log) if err != nil { + metrics.errorsTotal.Inc() return err } bucket := client.Bucket(currentSource.BucketName).Retryer( @@ -180,7 +187,7 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, // Since we are only reading, the operation is always idempotent storage.WithPolicy(storage.RetryAlways), ) - scheduler := newScheduler(publisher, bucket, currentSource, &input.config, st, log) + scheduler := newScheduler(publisher, bucket, currentSource, &input.config, st, metrics, log) return scheduler.schedule(ctx) } diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index 3cdeb3794739..f56f7f35bc55 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -49,6 +49,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher pub := statelessPublisher{wrapped: publisher} var source cursor.Source var g errgroup.Group + for _, b := range in.config.Buckets { bucket := tryOverrideOrDefault(in.config, b) source = &Source{ @@ -68,6 +69,9 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher st := newState() currentSource := source.(*Source) log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName) + metrics := newInputMetrics(inputCtx.ID+":"+currentSource.BucketName, nil) + defer metrics.Close() + metrics.url.Set("gs://" + currentSource.BucketName) ctx, cancel := context.WithCancel(context.Background()) go func() { @@ -85,7 +89,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher storage.WithPolicy(storage.RetryAlways), ) - scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, log) + scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, metrics, log) // allows multiple containers to be scheduled concurrently while testing // the stateless input is triggered only while testing and till now it did not mimic // the real world concurrent execution of multiple containers. This fix allows it to do so. diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 64a548afd8c3..8accb774f384 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -6,7 +6,9 @@ package gcs import ( "context" + "crypto/rand" "crypto/tls" + "encoding/hex" "errors" "fmt" "net/http" @@ -547,7 +549,7 @@ func Test_StorageClient(t *testing.T) { chanClient := beattest.NewChanClient(len(tt.expected)) t.Cleanup(func() { _ = chanClient.Close() }) - ctx, cancel := newV2Context() + ctx, cancel := newV2Context(t) t.Cleanup(cancel) var g errgroup.Group @@ -607,11 +609,23 @@ func Test_StorageClient(t *testing.T) { } } -func newV2Context() (v2.Context, func()) { +func newV2Context(t *testing.T) (v2.Context, func()) { ctx, cancel := context.WithCancel(context.Background()) + id, err := generateRandomID(8) + if err != nil { + t.Fatalf("failed to generate random id: %v", err) + } return v2.Context{ Logger: logp.NewLogger("gcs_test"), - ID: "test_id", + ID: "gcs_test-" + id, Cancelation: ctx, }, cancel } + +func generateRandomID(length int) (string, error) { + bytes := make([]byte, length) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes), nil +} diff --git a/x-pack/filebeat/input/gcs/job.go b/x-pack/filebeat/input/gcs/job.go index 403555311e9d..114200021599 100644 --- a/x-pack/filebeat/input/gcs/job.go +++ b/x-pack/filebeat/input/gcs/job.go @@ -45,6 +45,8 @@ type job struct { src *Source // publisher is used to publish a beat event to the output stream publisher cursor.Publisher + // metrics used to track the errors and success of jobs + metrics *inputMetrics // custom logger log *logp.Logger // flag used to denote if this object has previously failed without being processed at all. @@ -53,8 +55,12 @@ type job struct { // newJob, returns an instance of a job, which is a unit of work that can be assigned to a go routine func newJob(bucket *storage.BucketHandle, object *storage.ObjectAttrs, objectURI string, - state *state, src *Source, publisher cursor.Publisher, log *logp.Logger, isFailed bool, + state *state, src *Source, publisher cursor.Publisher, metrics *inputMetrics, log *logp.Logger, isFailed bool, ) *job { + if metrics == nil { + // metrics are optional, initialize a stub if not provided + metrics = newInputMetrics("", nil) + } return &job{ bucket: bucket, object: object, @@ -63,6 +69,7 @@ func newJob(bucket *storage.BucketHandle, object *storage.ObjectAttrs, objectURI state: state, src: src, publisher: publisher, + metrics: metrics, log: log, isFailed: isFailed, } @@ -78,6 +85,17 @@ func gcsObjectHash(src *Source, object *storage.ObjectAttrs) string { func (j *job) do(ctx context.Context, id string) { var fields mapstr.M + // metrics & logging + j.log.Debug("begin gcs object processing.") + j.metrics.gcsObjectsRequestedTotal.Inc() + j.metrics.gcsObjectsInflight.Inc() + start := time.Now() + defer func() { + elapsed := time.Since(start) + j.metrics.gcsObjectsInflight.Dec() + j.metrics.gcsObjectProcessingTime.Update(elapsed.Nanoseconds()) + j.log.Debugw("end gcs object processing.", "elapsed_time_ns", elapsed) + }() if allowedContentTypes[j.object.ContentType] { if j.object.ContentType == gzType || j.object.ContentEncoding == encodingGzip { @@ -85,10 +103,15 @@ func (j *job) do(ctx context.Context, id string) { } err := j.processAndPublishData(ctx, id) if err != nil { - j.state.updateFailedJobs(j.object.Name) + j.state.updateFailedJobs(j.object.Name, j.metrics) j.log.Errorw("job encountered an error while publishing data and has been added to a failed jobs list", "gcs.jobId", id, "error", err) + j.metrics.gcsFailedJobsTotal.Inc() + j.metrics.errorsTotal.Inc() return } + j.metrics.gcsObjectsPublishedTotal.Inc() + //nolint:gosec // object size cannot be negative hence this conversion is safe + j.metrics.gcsBytesProcessedTotal.Add(uint64(j.object.Size)) } else { err := fmt.Errorf("job with jobId %s encountered an error: content-type %s not supported", id, j.object.ContentType) @@ -101,9 +124,10 @@ func (j *job) do(ctx context.Context, id string) { } event.SetID(objectID(j.hash, 0)) // locks while data is being saved and published to avoid concurrent map read/writes - cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) + cp, done := j.state.saveForTx(j.object.Name, j.object.Updated, j.metrics) if err := j.publisher.Publish(event, cp); err != nil { j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) + j.metrics.errorsTotal.Inc() } // unlocks after data is saved and published done() @@ -133,11 +157,21 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { defer func() { err = reader.Close() if err != nil { + j.metrics.errorsTotal.Inc() j.log.Errorw("failed to close reader for object", "objectName", j.object.Name, "error", err) } }() - return j.decode(ctx, reader, id) + // update the source lag time metric + j.metrics.sourceLagTime.Update(time.Since(j.object.Updated).Nanoseconds()) + + // calculate number of decode errors + if err := j.decode(ctx, reader, id); err != nil { + j.metrics.decodeErrorsTotal.Inc() + return fmt.Errorf("failed to decode object: %s, with error: %w", j.object.Name, err) + } + + return nil } func (j *job) decode(ctx context.Context, r io.Reader, id string) error { @@ -241,17 +275,24 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // if expand_event_list_from_field is set, then split the event list if j.src.ExpandEventListFromField != "" { - if err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, j.hash, id); err != nil { + if numEvents, err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, id); err != nil { return err + } else { + j.metrics.gcsEventsPerObject.Update(int64(numEvents)) } continue + } else { + j.metrics.gcsEventsPerObject.Update(1) } var parsedData []mapstr.M if j.src.ParseJSON { parsedData, err = decodeJSON(bytes.NewReader(item)) if err != nil { - j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) + // since we do not want to stop processing the job here as this is purely cosmetic and optional, we log the error and continue + j.metrics.errorsTotal.Inc() + j.metrics.decodeErrorsTotal.Inc() + j.log.Errorw("job encountered an error during 'ParseJSON' op", "gcs.jobId", id, "error", err) } } evt := j.createEvent(item, parsedData, offset) @@ -263,8 +304,9 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er func (j *job) publish(evt beat.Event, last bool, id string) { if last { // if this is the last object, then perform a complete state save - cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) + cp, done := j.state.saveForTx(j.object.Name, j.object.Updated, j.metrics) if err := j.publisher.Publish(evt, cp); err != nil { + j.metrics.errorsTotal.Inc() j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) } done() @@ -272,20 +314,22 @@ func (j *job) publish(evt beat.Event, last bool, id string) { } // since we don't update the cursor checkpoint, lack of a lock here is not a problem if err := j.publisher.Publish(evt, nil); err != nil { + j.metrics.errorsTotal.Inc() j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) } } // splitEventList splits the event list into individual events and publishes them -func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error { +func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, id string) (int, error) { var jsonObject map[string]json.RawMessage + var eventsPerObject int if err := json.Unmarshal(raw, &jsonObject); err != nil { - return fmt.Errorf("job with job id %s encountered an unmarshaling error: %w", id, err) + return eventsPerObject, fmt.Errorf("job with job id %s encountered an unmarshaling error: %w", id, err) } raw, found := jsonObject[key] if !found { - return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key) + return eventsPerObject, fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key) } dec := json.NewDecoder(bytes.NewReader(raw)) @@ -294,11 +338,11 @@ func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objH tok, err := dec.Token() if err != nil { - return fmt.Errorf("failed to read JSON token for object: %s, with error: %w", j.object.Name, err) + return eventsPerObject, fmt.Errorf("failed to read JSON token for object: %s, with error: %w", j.object.Name, err) } delim, ok := tok.(json.Delim) if !ok || delim != '[' { - return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key) + return eventsPerObject, fmt.Errorf("expand_event_list_from_field <%v> is not an array", key) } for dec.More() { @@ -306,31 +350,34 @@ func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objH var item json.RawMessage if err := dec.Decode(&item); err != nil { - return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err) + return eventsPerObject, fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err) } data, err := item.MarshalJSON() if err != nil { - return fmt.Errorf("job with job id %s encountered a marshaling error: %w", id, err) + return eventsPerObject, fmt.Errorf("job with job id %s encountered a marshaling error: %w", id, err) } evt := j.createEvent(data, nil, offset+arrayOffset) if !dec.More() { // if this is the last object, then perform a complete state save - cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) + cp, done := j.state.saveForTx(j.object.Name, j.object.Updated, j.metrics) if err := j.publisher.Publish(evt, cp); err != nil { + j.metrics.errorsTotal.Inc() j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) } done() } else { // since we don't update the cursor checkpoint, lack of a lock here is not a problem if err := j.publisher.Publish(evt, nil); err != nil { + j.metrics.errorsTotal.Inc() j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) } } + eventsPerObject++ } - return nil + return eventsPerObject, nil } // addGzipDecoderIfNeeded determines whether the given stream of bytes (encapsulated in a buffered reader) @@ -426,7 +473,7 @@ func (j *job) createEvent(message []byte, data []mapstr.M, offset int64) beat.Ev }, } event.SetID(objectID(j.hash, offset)) - + j.metrics.gcsEventsCreatedTotal.Inc() return event } diff --git a/x-pack/filebeat/input/gcs/metrics.go b/x-pack/filebeat/input/gcs/metrics.go new file mode 100644 index 000000000000..58b5e3c02570 --- /dev/null +++ b/x-pack/filebeat/input/gcs/metrics.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package gcs + +import ( + "github.com/rcrowley/go-metrics" + + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +// inputMetrics handles the input's metric reporting. +type inputMetrics struct { + unregister func() + url *monitoring.String // URL of the input resource. + errorsTotal *monitoring.Uint // Number of errors encountered. + decodeErrorsTotal *monitoring.Uint // Number of decode errors encountered. + + gcsObjectsTracked *monitoring.Uint // Number of objects currently tracked in the state registry (gauge). + gcsObjectsRequestedTotal *monitoring.Uint // Number of GCS objects downloaded. + gcsObjectsPublishedTotal *monitoring.Uint // Number of GCS objects processed that were published. + gcsObjectsListedTotal *monitoring.Uint // Number of GCS objects returned by list operations. + gcsBytesProcessedTotal *monitoring.Uint // Number of GCS bytes processed. + gcsEventsCreatedTotal *monitoring.Uint // Number of events created from processing GCS data. + gcsFailedJobsTotal *monitoring.Uint // Number of failed jobs. + gcsExpiredFailedJobsTotal *monitoring.Uint // Number of expired failed jobs that could not be recovered. + gcsObjectsInflight *monitoring.Uint // Number of GCS objects inflight (gauge). + gcsObjectProcessingTime metrics.Sample // Histogram of the elapsed GCS object processing times in nanoseconds (start of download to completion of parsing). + gcsObjectSizeInBytes metrics.Sample // Histogram of processed GCS object size in bytes. + gcsEventsPerObject metrics.Sample // Histogram of event count per GCS object. + gcsJobsScheduledAfterValidation metrics.Sample // Histogram of number of jobs scheduled after validation. + sourceLagTime metrics.Sample // Histogram of the time between the source (Updated) timestamp and the time the object was read. +} + +func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent) + out := &inputMetrics{ + unregister: unreg, + url: monitoring.NewString(reg, "url"), + errorsTotal: monitoring.NewUint(reg, "errors_total"), + decodeErrorsTotal: monitoring.NewUint(reg, "decode_errors_total"), + + gcsObjectsTracked: monitoring.NewUint(reg, "gcs_objects_tracked_gauge"), + gcsObjectsRequestedTotal: monitoring.NewUint(reg, "gcs_objects_requested_total"), + gcsObjectsPublishedTotal: monitoring.NewUint(reg, "gcs_objects_published_total"), + gcsObjectsListedTotal: monitoring.NewUint(reg, "gcs_objects_listed_total"), + gcsBytesProcessedTotal: monitoring.NewUint(reg, "gcs_bytes_processed_total"), + gcsEventsCreatedTotal: monitoring.NewUint(reg, "gcs_events_created_total"), + gcsFailedJobsTotal: monitoring.NewUint(reg, "gcs_failed_jobs_total"), + gcsExpiredFailedJobsTotal: monitoring.NewUint(reg, "gcs_expired_failed_jobs_total"), + gcsObjectsInflight: monitoring.NewUint(reg, "gcs_objects_inflight_gauge"), + gcsObjectProcessingTime: metrics.NewUniformSample(1024), + gcsObjectSizeInBytes: metrics.NewUniformSample(1024), + gcsEventsPerObject: metrics.NewUniformSample(1024), + gcsJobsScheduledAfterValidation: metrics.NewUniformSample(1024), + sourceLagTime: metrics.NewUniformSample(1024), + } + + adapter.NewGoMetrics(reg, "gcs_object_processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.gcsObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + adapter.NewGoMetrics(reg, "gcs_object_size_in_bytes", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.gcsObjectSizeInBytes)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + adapter.NewGoMetrics(reg, "gcs_events_per_object", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.gcsEventsPerObject)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + adapter.NewGoMetrics(reg, "gcs_jobs_scheduled_after_validation", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.gcsJobsScheduledAfterValidation)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + adapter.NewGoMetrics(reg, "source_lag_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.sourceLagTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + + return out +} + +func (m *inputMetrics) Close() { + m.unregister() +} diff --git a/x-pack/filebeat/input/gcs/metrics_test.go b/x-pack/filebeat/input/gcs/metrics_test.go new file mode 100644 index 000000000000..3398a1a8daa5 --- /dev/null +++ b/x-pack/filebeat/input/gcs/metrics_test.go @@ -0,0 +1,67 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package gcs + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/monitoring" +) + +// TestInputMetricsClose asserts that metrics registered by this input are +// removed after Close() is called. This is important because an input with +// the same ID could be re-registered, and that ID cannot exist in the +// monitoring registry. +func TestInputMetricsClose(t *testing.T) { + reg := monitoring.NewRegistry() + + metrics := newInputMetrics("gcs-cl-bucket.cloudflare_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg) + metrics.Close() + + reg.Do(monitoring.Full, func(s string, _ interface{}) { + t.Errorf("registry should be empty, but found %v", s) + }) +} + +// TestNewInputMetricsInstance asserts that all the metrics are initialized +// when a newInputMetrics method is invoked. This avoids nil hit panics when +// a getter is invoked on any uninitialized metric. +func TestNewInputMetricsInstance(t *testing.T) { + reg := monitoring.NewRegistry() + metrics := newInputMetrics("gcs-new-metric-test", reg) + + assert.NotNil(t, metrics.errorsTotal, + metrics.decodeErrorsTotal, + metrics.gcsObjectsTracked, + metrics.gcsObjectsRequestedTotal, + metrics.gcsObjectsPublishedTotal, + metrics.gcsObjectsListedTotal, + metrics.gcsBytesProcessedTotal, + metrics.gcsEventsCreatedTotal, + metrics.gcsFailedJobsTotal, + metrics.gcsExpiredFailedJobsTotal, + metrics.gcsObjectsInflight, + metrics.gcsObjectProcessingTime, + metrics.gcsObjectSizeInBytes, + metrics.gcsEventsPerObject, + metrics.gcsJobsScheduledAfterValidation, + metrics.sourceLagTime, + ) + + assert.Equal(t, uint64(0x0), metrics.errorsTotal.Get()) + assert.Equal(t, uint64(0x0), metrics.decodeErrorsTotal.Get()) + assert.Equal(t, uint64(0x0), metrics.gcsObjectsTracked.Get()) + assert.Equal(t, uint64(0x0), metrics.gcsObjectsRequestedTotal.Get()) + assert.Equal(t, uint64(0x0), metrics.gcsObjectsPublishedTotal.Get()) + assert.Equal(t, uint64(0x0), metrics.gcsObjectsListedTotal.Get()) + assert.Equal(t, uint64(0x0), metrics.gcsBytesProcessedTotal.Get()) + assert.Equal(t, uint64(0x0), metrics.gcsEventsCreatedTotal.Get()) + assert.Equal(t, uint64(0x0), metrics.gcsFailedJobsTotal.Get()) + assert.Equal(t, uint64(0x0), metrics.gcsExpiredFailedJobsTotal.Get()) + assert.Equal(t, uint64(0x0), metrics.gcsObjectsInflight.Get()) + +} diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index ef1bebd083d9..3f7a1d833c9f 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -36,12 +36,17 @@ type scheduler struct { state *state log *logp.Logger limiter *limiter + metrics *inputMetrics } // newScheduler, returns a new scheduler instance func newScheduler(publisher cursor.Publisher, bucket *storage.BucketHandle, src *Source, cfg *config, - state *state, log *logp.Logger, + state *state, metrics *inputMetrics, log *logp.Logger, ) *scheduler { + if metrics == nil { + // metrics are optional, initialize a stub if not provided + metrics = newInputMetrics("", nil) + } return &scheduler{ publisher: publisher, bucket: bucket, @@ -50,6 +55,7 @@ func newScheduler(publisher cursor.Publisher, bucket *storage.BucketHandle, src state: state, log: log, limiter: &limiter{limit: make(chan struct{}, src.MaxWorkers)}, + metrics: metrics, } } @@ -96,11 +102,13 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { var objects []*storage.ObjectAttrs nextPageToken, err := pager.NextPage(&objects) if err != nil { + s.metrics.errorsTotal.Inc() return err } numObs += len(objects) jobs := s.createJobs(objects, s.log) s.log.Debugf("scheduler: %d objects fetched for current batch", len(objects)) + s.metrics.gcsObjectsListedTotal.Add(uint64(len(objects))) // If previous checkpoint was saved then look up starting point for new jobs if !s.state.checkpoint().LatestEntryTime.IsZero() { @@ -110,6 +118,7 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { } } s.log.Debugf("scheduler: %d jobs scheduled for current batch", len(jobs)) + s.metrics.gcsJobsScheduledAfterValidation.Update(int64(len(jobs))) // distributes jobs among workers with the help of a limiter for i, job := range jobs { @@ -165,7 +174,7 @@ func (s *scheduler) createJobs(objects []*storage.ObjectAttrs, log *logp.Logger) } objectURI := "gs://" + s.src.BucketName + "/" + obj.Name - job := newJob(s.bucket, obj, objectURI, s.state, s.src, s.publisher, log, false) + job := newJob(s.bucket, obj, objectURI, s.state, s.src, s.publisher, s.metrics, log, false) jobs = append(jobs, job) } @@ -201,7 +210,6 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { jobMap := make(map[string]bool) - for _, j := range jobs { jobMap[j.Name()] = true } @@ -215,19 +223,19 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { if err != nil { if errors.Is(err, storage.ErrObjectNotExist) { // if the object is not found in the bucket, then remove it from the failed job list - s.state.deleteFailedJob(name) + s.state.deleteFailedJob(name, s.metrics) s.log.Debugf("scheduler: failed job %s not found in bucket %s", name, s.src.BucketName) } else { // if there is an error while validating the object, // then update the failed job retry count and work towards natural removal - s.state.updateFailedJobs(name) + s.state.updateFailedJobs(name, s.metrics) s.log.Errorf("scheduler: adding failed job %s to job list caused an error: %v", name, err) } continue } objectURI := "gs://" + s.src.BucketName + "/" + obj.Name - job := newJob(s.bucket, obj, objectURI, s.state, s.src, s.publisher, s.log, true) + job := newJob(s.bucket, obj, objectURI, s.state, s.src, s.publisher, s.metrics, s.log, true) jobs = append(jobs, job) s.log.Debugf("scheduler: adding failed job number %d with name %s to job current list", fj, job.Name()) fj++ diff --git a/x-pack/filebeat/input/gcs/state.go b/x-pack/filebeat/input/gcs/state.go index ea04edcae908..af2ab43cec0a 100644 --- a/x-pack/filebeat/input/gcs/state.go +++ b/x-pack/filebeat/input/gcs/state.go @@ -44,7 +44,7 @@ func newState() *state { // and returns an unlock function, done. The caller must call done when // s and cp are no longer needed in a locked state. done may not be called // more than once. -func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) { +func (s *state) saveForTx(name string, lastModifiedOn time.Time, metrics *inputMetrics) (cp *Checkpoint, done func()) { s.mu.Lock() if _, ok := s.cp.FailedJobs[name]; !ok { if len(s.cp.ObjectName) == 0 { @@ -61,6 +61,7 @@ func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint } else { // clear entry if this is a failed job delete(s.cp.FailedJobs, name) + metrics.gcsObjectsTracked.Dec() } return s.cp, func() { s.mu.Unlock() } } @@ -70,20 +71,29 @@ func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint // move ahead in timestamp & objectName due to successful operations from other workers. // A failed job will be re-tried a maximum of 3 times after which the // entry is removed from the map -func (s *state) updateFailedJobs(jobName string) { +func (s *state) updateFailedJobs(jobName string, metrics *inputMetrics) { s.mu.Lock() + if _, ok := s.cp.FailedJobs[jobName]; !ok { + // increment stored state object count & failed job count + metrics.gcsObjectsTracked.Inc() + metrics.gcsFailedJobsTotal.Inc() + } s.cp.FailedJobs[jobName]++ if s.cp.FailedJobs[jobName] > maxFailedJobRetries { delete(s.cp.FailedJobs, jobName) + metrics.gcsExpiredFailedJobsTotal.Inc() + metrics.gcsObjectsTracked.Dec() } s.mu.Unlock() } // deleteFailedJob, deletes a failed job from the failedJobs map // this is used when a job no longer exists in the bucket or gets expired -func (s *state) deleteFailedJob(jobName string) { +func (s *state) deleteFailedJob(jobName string, metrics *inputMetrics) { s.mu.Lock() delete(s.cp.FailedJobs, jobName) + metrics.gcsExpiredFailedJobsTotal.Inc() + metrics.gcsObjectsTracked.Dec() s.mu.Unlock() }