Skip to content

Commit

Permalink
Use app guid as the source_id for metrics
Browse files Browse the repository at this point in the history
When creating a desiredLRP we will now add metric tags with the
following information:

- source_id: App GUID
- process_id: Process GUID
- process_instance_id: Instance GUID - Generated by Diego
- instance_id: Instance Index - Generated by Diego

When reading metrics from LogCache we will now filter the envelopes by
the process guid to differentiate processes of the same app

This change makes the TrafficController client return invalid results
when showing stats for an app with multiple processes.

[Finishes #162922843]

Signed-off-by: Andres Medina <[email protected]>
  • Loading branch information
Andy Brown authored and Andres Medina committed Jan 28, 2019
1 parent 3281fb5 commit fcd9d22
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 46 deletions.
11 changes: 10 additions & 1 deletion lib/cloud_controller/diego/app_recipe_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module Diego
class AppRecipeBuilder
include ::Diego::ActionBuilder

METRIC_TAG_VALUE = ::Diego::Bbs::Models::MetricTagValue
METRIC_TAG_ENTRY = ::Diego::Bbs::Models::DesiredLRP::MetricTagsEntry

MONITORED_HEALTH_CHECK_TYPES = [HealthCheckTypes::PORT, HealthCheckTypes::HTTP, ''].map(&:freeze).freeze

def initialize(config:, process:, ssh_key: SSHKey.new)
Expand Down Expand Up @@ -51,7 +54,13 @@ def build_app_lrp
ports: ports,
log_source: LRP_LOG_SOURCE,
log_guid: process.app.guid,
metrics_guid: process.guid,
metrics_guid: process.app.guid,
metric_tags: [
METRIC_TAG_ENTRY.new(key: 'source_id', value: METRIC_TAG_VALUE.new(static: process.app.guid)),
METRIC_TAG_ENTRY.new(key: 'process_id', value: METRIC_TAG_VALUE.new(static: process.guid)),
METRIC_TAG_ENTRY.new(key: 'process_instance_id', value: METRIC_TAG_VALUE.new(dynamic: METRIC_TAG_VALUE::DynamicValue::INSTANCE_GUID)),
METRIC_TAG_ENTRY.new(key: 'instance_id', value: METRIC_TAG_VALUE.new(dynamic: METRIC_TAG_VALUE::DynamicValue::INDEX)),
],
annotation: process.updated_at.to_f.to_s,
egress_rules: generate_egress_rules,
cached_dependencies: desired_lrp_builder.cached_dependencies,
Expand Down
39 changes: 25 additions & 14 deletions lib/cloud_controller/diego/reporters/instances_stats_reporter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,17 @@ def stats_for_app(process)
formatted_current_time = Time.now.to_datetime.rfc3339

logger.debug('stats_for_app.fetching_container_metrics', process_guid: process.guid)
envelopes = @logstats_client.container_metrics(
source_guid: process.guid,
auth_token: VCAP::CloudController::SecurityContext.auth_token,
)
actual_lrps = bbs_instances_client.lrp_instances(process)
desired_lrp = bbs_instances_client.desired_lrp_instance(process)

stats = {}
envelopes.each do |envelope|
container_metrics = envelope.containerMetric
stats[container_metrics.instanceIndex] = {
time: formatted_current_time
}.merge(converted_container_metrics(container_metrics))
end
stats = envelopes(desired_lrp, process).
map { |e|
[
e.containerMetric.instanceIndex,
converted_container_metrics(e.containerMetric, formatted_current_time),
]
}.to_h

actual_lrps.each do |actual_lrp|
bbs_instances_client.lrp_instances(process).each do |actual_lrp|
next unless actual_lrp.actual_lrp_key.index < process.instances

info = {
Expand Down Expand Up @@ -72,25 +67,41 @@ def stats_for_app(process)

private

def envelopes(desired_lrp, process)
filter, source_guid = if desired_lrp.metric_tags.any? { |tag| tag.key == 'process_id' }
[->(e) { e.tags.any? { |key, value| key == 'process_id' && value == process.guid } }, process.app.guid]
else
[->(_) { true }, process.guid]
end

@logstats_client.container_metrics(
source_guid: source_guid,
auth_token: VCAP::CloudController::SecurityContext.auth_token,
logcache_filter: filter
)
end

attr_reader :bbs_instances_client

def logger
@logger ||= Steno.logger('cc.diego.instances_reporter')
end

def converted_container_metrics(container_metrics)
def converted_container_metrics(container_metrics, formatted_current_time)
cpu = container_metrics.cpuPercentage
mem = container_metrics.memoryBytes
disk = container_metrics.diskBytes

if cpu.nil? || mem.nil? || disk.nil?
{
time: formatted_current_time,
cpu: 0,
mem: 0,
disk: 0
}
else
{
time: formatted_current_time,
cpu: cpu / 100,
mem: mem,
disk: disk
Expand Down
25 changes: 9 additions & 16 deletions lib/logcache/traffic_controller_decorator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def initialize(logcache_client)
@logcache_client = logcache_client
end

def container_metrics(auth_token: nil, source_guid:)
def container_metrics(auth_token: nil, source_guid:, logcache_filter:)
now = Time.now
start_time = TimeUtils.to_nanoseconds(now - 2.minutes)
end_time = TimeUtils.to_nanoseconds(now)
Expand All @@ -36,7 +36,7 @@ def container_metrics(auth_token: nil, source_guid:)
end

final_envelopes.
select { |e| has_container_metrics_fields?(e) }.
select { |e| has_container_metrics_fields?(e) && logcache_filter.call(e) }.
uniq(&:instance_id).
map { |e| convert_to_traffic_controller_envelope(source_guid, e) }
end
Expand Down Expand Up @@ -67,22 +67,15 @@ def has_container_metrics_fields?(envelope)
end

def convert_to_traffic_controller_envelope(source_guid, logcache_envelope)
new_envelope = {
TrafficController::Models::Envelope.new(
containerMetric: TrafficController::Models::ContainerMetric.new({
applicationId: source_guid,
instanceIndex: logcache_envelope.instance_id,
}

if (metrics = logcache_envelope.gauge.metrics)
gauge_values = {
cpuPercentage: metrics['cpu'].value,
memoryBytes: metrics['memory'].value,
diskBytes: metrics['disk'].value
}
new_envelope.merge!(gauge_values)
end

TrafficController::Models::Envelope.new(
containerMetric: TrafficController::Models::ContainerMetric.new(new_envelope)
cpuPercentage: logcache_envelope.gauge.metrics['cpu'].value,
memoryBytes: logcache_envelope.gauge.metrics['memory'].value,
diskBytes: logcache_envelope.gauge.metrics['disk'].value,
}),
tags: logcache_envelope.tags.map { |k, v| TrafficController::Models::Envelope::TagsEntry.new(key: k, value: v) },
)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/traffic_controller/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(url:)
@url = url
end

def container_metrics(auth_token:, source_guid:)
def container_metrics(auth_token:, source_guid:, logcache_filter: nil)
response = with_request_error_handling do
client.get("/apps/#{source_guid}/containermetrics", nil, { 'Authorization' => auth_token })
end
Expand Down
110 changes: 104 additions & 6 deletions spec/logcache/traffic_controller_decorator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require 'utils/time_utils'

RSpec.describe Logcache::TrafficControllerDecorator do
subject { described_class.new(wrapped_logcache_client).container_metrics(source_guid: process_guid) }
subject { described_class.new(wrapped_logcache_client).container_metrics(source_guid: process_guid, logcache_filter: filter) }
let(:wrapped_logcache_client) { instance_double(Logcache::Client, container_metrics: logcache_response) }

let(:num_instances) { 11 }
Expand All @@ -23,7 +23,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim
'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 100 * i + 2),
'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 100 * i + 3),
}),
instance_id: (offset + i).to_s
instance_id: (offset + i).to_s,
tags: {
'source_id' => process.app.guid,
'process_id' => process.guid,
},
),
Loggregator::V2::Envelope.new(
timestamp: last_timestamp,
Expand All @@ -33,14 +37,20 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim
'absolute_entitlement' => Loggregator::V2::GaugeValue.new(unit: 'nanoseconds', value: 100 * i + 2),
'container_age' => Loggregator::V2::GaugeValue.new(unit: 'nanoseconds', value: 100 * i + 3),
}),
instance_id: (offset + i).to_s
instance_id: (offset + i).to_s,
tags: {
'source_id' => process.app.guid,
'process_id' => process.guid,
},
)
]
end
Loggregator::V2::EnvelopeBatch.new(batch: batch)
end

describe 'converting from Logcache to TrafficController' do
let(:filter) { ->(_) { true } }

before do
allow(wrapped_logcache_client).to receive(:container_metrics).and_return(logcache_response)
end
Expand All @@ -53,6 +63,54 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim
)
end

context 'filters' do
let(:envelopes) {
Loggregator::V2::EnvelopeBatch.new(
batch: [
Loggregator::V2::Envelope.new(
source_id: process_guid,
gauge: Loggregator::V2::Gauge.new(metrics: {
'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10),
'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 11),
'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 12),
}),
instance_id: '1'
),
Loggregator::V2::Envelope.new(
source_id: process_guid,
gauge: Loggregator::V2::Gauge.new(metrics: {
'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 13),
'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10),
'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10),
}),
instance_id: '2'
),
Loggregator::V2::Envelope.new(
source_id: process_guid,
gauge: Loggregator::V2::Gauge.new(metrics: {
'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10),
'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 9),
'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 8),
}),
instance_id: '1'
),
]
)
}
let(:filter) { ->(e) { e.gauge.metrics['cpu'].value == 10 } }

it 'filters envelopes' do
subject

expect(subject).to have(1).items
expect(subject.first.containerMetric.applicationId).to eq(process_guid)
expect(subject.first.containerMetric.instanceIndex).to eq(1)
expect(subject.first.containerMetric.cpuPercentage).to eq(10)
expect(subject.first.containerMetric.memoryBytes).to eq(11)
expect(subject.first.containerMetric.diskBytes).to eq(12)
end
end

context 'when given an empty envelope batch' do
let(:envelopes) { Loggregator::V2::EnvelopeBatch.new }

Expand Down Expand Up @@ -170,6 +228,29 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim
end
end

context 'when the envelope does not have tags' do
let(:envelopes) {
Loggregator::V2::EnvelopeBatch.new(
batch: [
Loggregator::V2::Envelope.new(
source_id: process_guid,
gauge: Loggregator::V2::Gauge.new(metrics: {
'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10),
'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 11),
'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 12),
}),
instance_id: '1',
tags: {},
),
]
)
}

it 'returns an envelope with an empty array of tags' do
expect(subject.first.tags).to be_empty
end
end

context 'when given multiple envelopes back' do
let(:envelopes) {
Loggregator::V2::EnvelopeBatch.new(
Expand All @@ -181,7 +262,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim
'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 11),
'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 12),
}),
instance_id: '1'
instance_id: '1',
tags: {
'source_id' => process.app.guid,
'process_id' => process.guid,
},
),
Loggregator::V2::Envelope.new(
source_id: process_guid,
Expand All @@ -190,7 +275,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim
'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 21),
'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 22),
}),
instance_id: '2'
instance_id: '2',
tags: {
'source_id' => process.app.guid,
'process_id' => process.guid,
},
),
Loggregator::V2::Envelope.new(
source_id: process_guid,
Expand All @@ -199,7 +288,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim
'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 31),
'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 32),
}),
instance_id: '3'
instance_id: '3',
tags: {
'source_id' => process.app.guid,
'process_id' => process.guid,
},
)
]
)
Expand All @@ -213,6 +306,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim
expect(subject.first.containerMetric.memoryBytes).to eq(11)
expect(subject.first.containerMetric.diskBytes).to eq(12)

expect(subject.first.tags[0].key).to eq('source_id')
expect(subject.first.tags[0].value).to eq(process.app.guid)
expect(subject.first.tags[1].key).to eq('process_id')
expect(subject.first.tags[1].value).to eq(process.guid)

expect(subject.second.containerMetric.applicationId).to eq(process_guid)
expect(subject.second.containerMetric.instanceIndex).to eq(2)
expect(subject.second.containerMetric.cpuPercentage).to eq(20)
Expand Down
22 changes: 20 additions & 2 deletions spec/unit/lib/cloud_controller/diego/app_recipe_builder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,16 @@ module Diego
expect(lrp.log_source).to eq(LRP_LOG_SOURCE)
expect(lrp.max_pids).to eq(100)
expect(lrp.memory_mb).to eq(128)
expect(lrp.metrics_guid).to eq(process.guid)
expect(lrp.metrics_guid).to eq(process.app.guid)
expect(lrp.metric_tags.size).to eq(4)
expect(lrp.metric_tags[0].key).to eq('source_id')
expect(lrp.metric_tags[0].value.static).to eq(process.app.guid)
expect(lrp.metric_tags[1].key).to eq('process_id')
expect(lrp.metric_tags[1].value.static).to eq(process.guid)
expect(lrp.metric_tags[2].key).to eq('process_instance_id')
expect(lrp.metric_tags[2].value.dynamic).to eq(::Diego::Bbs::Models::MetricTagValue::DynamicValue::INSTANCE_GUID)
expect(lrp.metric_tags[3].key).to eq('instance_id')
expect(lrp.metric_tags[3].value.dynamic).to eq(::Diego::Bbs::Models::MetricTagValue::DynamicValue::INDEX)
expect(lrp.monitor).to eq(expected_monitor_action)
expect(lrp.network).to eq(expected_network)
expect(lrp.ports).to eq([4444, 5555])
Expand Down Expand Up @@ -941,7 +950,16 @@ module Diego
expect(lrp.log_guid).to eq(process.app.guid)
expect(lrp.max_pids).to eq(100)
expect(lrp.memory_mb).to eq(128)
expect(lrp.metrics_guid).to eq(process.guid)
expect(lrp.metrics_guid).to eq(process.app.guid)
expect(lrp.metric_tags.size).to eq(4)
expect(lrp.metric_tags[0].key).to eq('source_id')
expect(lrp.metric_tags[0].value.static).to eq(process.app.guid)
expect(lrp.metric_tags[1].key).to eq('process_id')
expect(lrp.metric_tags[1].value.static).to eq(process.guid)
expect(lrp.metric_tags[2].key).to eq('process_instance_id')
expect(lrp.metric_tags[2].value.dynamic).to eq(::Diego::Bbs::Models::MetricTagValue::DynamicValue::INSTANCE_GUID)
expect(lrp.metric_tags[3].key).to eq('instance_id')
expect(lrp.metric_tags[3].value.dynamic).to eq(::Diego::Bbs::Models::MetricTagValue::DynamicValue::INDEX)
expect(lrp.monitor).to eq(expected_monitor_action)
expect(lrp.network).to eq(expected_network)
expect(lrp.ports).to eq([4444, 5555])
Expand Down
Loading

0 comments on commit fcd9d22

Please sign in to comment.