Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept prefix as metric_types for stackdriver metricset in GCP #19345

Merged
merged 7 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add memory metrics into compute googlecloud. {pull}18802[18802]
- Add new fields to HAProxy module. {issue}18523[18523]
- Add Tomcat overview dashboard {pull}14026[14026]
- Accept prefix as metric_types config parameter in googlecloud stackdriver metricset. {pull}19345[19345]
- Update Couchbase to version 6.5 {issue}18595[18595] {pull}19055[19055]
- Add dashboards for googlecloud load balancing metricset. {pull}18369[18369]
- Add support for v1 consumer API in Cloud Foundry module, use it by default. {pull}19268[19268]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@ call.
[float]
== Metricset config and parameters

* *metric_types*: Required, a list of metric type strings. Each call of the
* *metric_types*: Required, a list of metric type strings, or a list of metric
type prefixes. For example, `instance/cpu` is the prefix for metric type
`instance/cpu/usage_time`, `instance/cpu/utilization` etc Each call of the
`ListTimeSeries` API can return any number of time series from a single metric
type. Metric type is to used for identifying a specific time series.

* *aligner*: A single string with which aggregation operation need to be applied
onto time series data for ListTimeSeries API. If it's not given, default aligner
is set to be `ALIGN_NONE`. Google Cloud also supports `ALIGN_DELTA`, `ALIGN_RATE`,
`ALIGN_MIN`, `ALIGN_MAX`, `ALIGN_MEAN`, `ALIGN_COUNT`, `ALIGN_SUM` and etc.
is `ALIGN_NONE`. Google Cloud also supports `ALIGN_DELTA`, `ALIGN_RATE`,
`ALIGN_MIN`, `ALIGN_MAX`, `ALIGN_MEAN`, `ALIGN_COUNT`, `ALIGN_SUM` etc.
Please see
https://cloud.google.com/monitoring/api/ref_v3/rpc/google.monitoring.v3#aligner[Aggregation Aligner]
for the full list of aligners.


[float]
=== Example Configuration
* `stackdriver` metricset is enabled to collect metrics from all zones under
Expand All @@ -37,7 +38,7 @@ are specified: first one is to collect CPU usage time and utilization with
aggregation aligner ALIGN_MEAN; second one is to collect uptime with aggregation
aligner ALIGN_SUM. These metric types all have 240 seconds ingest delay time and
60 seconds sample period. With `period` specified as `300s` in the config below,
Metricbeat will collect compute metrics from googlecloud every 5-minute with
Metricbeat will collect compute metrics from Google Cloud every 5-minute with
given aggregation aligner applied for each metric type.
+
[source,yaml]
Expand Down Expand Up @@ -69,7 +70,7 @@ are specified: first one is to collect CPU usage time and utilization with
aggregation aligner ALIGN_MEAN; second one is to collect uptime with aggregation
aligner ALIGN_SUM. These metric types all have 240 seconds ingest delay time and
60 seconds sample period. With `period` specified as `60s` in the config below,
Metricbeat will collect compute metrics from googlecloud every minute with no
Metricbeat will collect compute metrics from Google Cloud every minute with no
aggregation. This case, the aligners specified in the configuration will be
ignored.
+
Expand All @@ -94,3 +95,32 @@ ignored.
metric_types:
- "instance/uptime"
----

* `stackdriver` metricset is enabled to collect metrics from all zones under
`europe-west1-c` region in `elastic-observability` project. One set of metrics
will be collected: metric types that starts with `instance/cpu` under `compute`
service with aligner ALIGN_NONE. These metric types all have 240 seconds ingest
delay time and 60 seconds sample period. With `period` specified as `1m` in
the config below, Metricbeat will collect compute metrics from Google Cloud
every minute with no aggregation. The metric types in `compute` service with
`instance/cpu` prefix are: `instance/cpu/reserved_cores`,
`instance/cpu/scheduler_wait_time`, `instance/cpu/usage_time`, and
`instance/cpu/utilization`.

+
[source,yaml]
----
- module: googlecloud
metricsets:
- stackdriver
zone: "europe-west1-c"
project_id: elastic-observability
credentials_file_path: "your JSON credentials file path"
exclude_labels: false
period: 1m
metrics:
- aligner: ALIGN_NONE
service: compute
metric_types:
- "instance/cpu"
----
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,16 @@ func (r *stackdriverMetricsRequester) Metrics(ctx context.Context, sdc stackDriv
results := make([]timeSeriesWithAligner, 0)

aligner := sdc.Aligner
serviceName := sdc.ServiceName
for _, mt := range sdc.MetricTypes {
for mt, meta := range metricsMeta {
wg.Add(1)

metricMeta := meta
go func(mt string) {
defer wg.Done()

metricMeta := metricsMeta[mt]
r.logger.Debugf("For metricType %s, metricMeta = %s", mt, metricMeta)
interval, aligner := getTimeIntervalAligner(metricMeta.ingestDelay, metricMeta.samplePeriod, r.config.period, aligner)
ts := r.Metric(ctx, serviceName+".googleapis.com/"+mt, interval, aligner)
ts := r.Metric(ctx, mt, interval, aligner)
lock.Lock()
defer lock.Unlock()
results = append(results, ts)
Expand Down
72 changes: 42 additions & 30 deletions x-pack/metricbeat/module/googlecloud/stackdriver/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"fmt"
"time"

"github.com/golang/protobuf/ptypes/duration"

monitoring "cloud.google.com/go/monitoring/apiv3"

"github.com/golang/protobuf/ptypes/duration"
"github.com/pkg/errors"

"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -216,41 +215,54 @@ func (mc *stackDriverConfig) Validate() error {
// (sample period and ingest delay) of each given metric type
func (m *MetricSet) metricDescriptor(ctx context.Context, client *monitoring.MetricClient) (map[string]metricMeta, error) {
metricsWithMeta := make(map[string]metricMeta, 0)
req := &monitoringpb.ListMetricDescriptorsRequest{
Name: "projects/" + m.config.ProjectID,
}

for _, sdc := range m.stackDriverConfig {
for _, mt := range sdc.MetricTypes {
req := &monitoringpb.ListMetricDescriptorsRequest{
Name: "projects/" + m.config.ProjectID,
Filter: fmt.Sprintf(`metric.type = "%s"`, sdc.ServiceName+".googleapis.com/"+mt),
}

req.Filter = fmt.Sprintf(`metric.type = starts_with("%s")`, sdc.ServiceName+".googleapis.com/"+mt)
it := client.ListMetricDescriptors(ctx, req)
out, err := it.Next()
if err != nil {
err = errors.Errorf("Could not make ListMetricDescriptors request: %s: %v", mt, err)
m.Logger().Error(err)
return metricsWithMeta, err
}

// Set samplePeriod default to 60 seconds and ingestDelay default to 0.
meta := metricMeta{
samplePeriod: 60 * time.Second,
ingestDelay: 0 * time.Second,
for {
out, err := it.Next()
if err != nil && err != iterator.Done {
err = errors.Errorf("Could not make ListMetricDescriptors request for metric type %s: %v", mt, err)
m.Logger().Error(err)
return metricsWithMeta, err
}

if out != nil {
metricsWithMeta = m.getMetadata(out, metricsWithMeta)
}

if err == iterator.Done {
break
}
}
}
}

if out.Metadata.SamplePeriod != nil {
m.Logger().Debugf("For metric type %s: sample period = %s", mt, out.Metadata.SamplePeriod)
meta.samplePeriod = time.Duration(out.Metadata.SamplePeriod.Seconds) * time.Second
}
return metricsWithMeta, nil
}

if out.Metadata.IngestDelay != nil {
m.Logger().Debugf("For metric type %s: ingest delay = %s", mt, out.Metadata.IngestDelay)
meta.ingestDelay = time.Duration(out.Metadata.IngestDelay.Seconds) * time.Second
}
func (m *MetricSet) getMetadata(out *metric.MetricDescriptor, metricsWithMeta map[string]metricMeta) map[string]metricMeta {
// Set samplePeriod default to 60 seconds and ingestDelay default to 0.
meta := metricMeta{
samplePeriod: 60 * time.Second,
ingestDelay: 0 * time.Second,
}

metricsWithMeta[mt] = meta
}
if out.Metadata.SamplePeriod != nil {
m.Logger().Debugf("For metric type %s: sample period = %s", out.Type, out.Metadata.SamplePeriod)
meta.samplePeriod = time.Duration(out.Metadata.SamplePeriod.Seconds) * time.Second
}

return metricsWithMeta, nil
if out.Metadata.IngestDelay != nil {
m.Logger().Debugf("For metric type %s: ingest delay = %s", out.Type, out.Metadata.IngestDelay)
meta.ingestDelay = time.Duration(out.Metadata.IngestDelay.Seconds) * time.Second
}

metricsWithMeta[out.Type] = meta
return metricsWithMeta
}