diff --git a/lib/cloud_controller/diego/app_recipe_builder.rb b/lib/cloud_controller/diego/app_recipe_builder.rb index 04151b337a3..5f041a2fe2f 100644 --- a/lib/cloud_controller/diego/app_recipe_builder.rb +++ b/lib/cloud_controller/diego/app_recipe_builder.rb @@ -13,6 +13,9 @@ module Diego class AppRecipeBuilder include ::Diego::ActionBuilder + METRIC_TAG_VALUE = ::Diego::Bbs::Models::MetricTagValue + METRIC_TAG_ENTRY = ::Diego::Bbs::Models::DesiredLRP::MetricTagsEntry + MONITORED_HEALTH_CHECK_TYPES = [HealthCheckTypes::PORT, HealthCheckTypes::HTTP, ''].map(&:freeze).freeze def initialize(config:, process:, ssh_key: SSHKey.new) @@ -51,7 +54,13 @@ def build_app_lrp ports: ports, log_source: LRP_LOG_SOURCE, log_guid: process.app.guid, - metrics_guid: process.guid, + metrics_guid: process.app.guid, + metric_tags: [ + METRIC_TAG_ENTRY.new(key: 'source_id', value: METRIC_TAG_VALUE.new(static: process.app.guid)), + METRIC_TAG_ENTRY.new(key: 'process_id', value: METRIC_TAG_VALUE.new(static: process.guid)), + METRIC_TAG_ENTRY.new(key: 'process_instance_id', value: METRIC_TAG_VALUE.new(dynamic: METRIC_TAG_VALUE::DynamicValue::INSTANCE_GUID)), + METRIC_TAG_ENTRY.new(key: 'instance_id', value: METRIC_TAG_VALUE.new(dynamic: METRIC_TAG_VALUE::DynamicValue::INDEX)), + ], annotation: process.updated_at.to_f.to_s, egress_rules: generate_egress_rules, cached_dependencies: desired_lrp_builder.cached_dependencies, diff --git a/lib/cloud_controller/diego/reporters/instances_stats_reporter.rb b/lib/cloud_controller/diego/reporters/instances_stats_reporter.rb index 05eba23a5ad..d99d1b69914 100644 --- a/lib/cloud_controller/diego/reporters/instances_stats_reporter.rb +++ b/lib/cloud_controller/diego/reporters/instances_stats_reporter.rb @@ -18,22 +18,17 @@ def stats_for_app(process) formatted_current_time = Time.now.to_datetime.rfc3339 logger.debug('stats_for_app.fetching_container_metrics', process_guid: process.guid) - envelopes = @logstats_client.container_metrics( - source_guid: process.guid, - auth_token: VCAP::CloudController::SecurityContext.auth_token, - ) - actual_lrps = bbs_instances_client.lrp_instances(process) desired_lrp = bbs_instances_client.desired_lrp_instance(process) - stats = {} - envelopes.each do |envelope| - container_metrics = envelope.containerMetric - stats[container_metrics.instanceIndex] = { - time: formatted_current_time - }.merge(converted_container_metrics(container_metrics)) - end + stats = envelopes(desired_lrp, process). + map { |e| + [ + e.containerMetric.instanceIndex, + converted_container_metrics(e.containerMetric, formatted_current_time), + ] + }.to_h - actual_lrps.each do |actual_lrp| + bbs_instances_client.lrp_instances(process).each do |actual_lrp| next unless actual_lrp.actual_lrp_key.index < process.instances info = { @@ -72,25 +67,41 @@ def stats_for_app(process) private + def envelopes(desired_lrp, process) + filter, source_guid = if desired_lrp.metric_tags.any? { |tag| tag.key == 'process_id' } + [->(e) { e.tags.any? { |key, value| key == 'process_id' && value == process.guid } }, process.app.guid] + else + [->(_) { true }, process.guid] + end + + @logstats_client.container_metrics( + source_guid: source_guid, + auth_token: VCAP::CloudController::SecurityContext.auth_token, + logcache_filter: filter + ) + end + attr_reader :bbs_instances_client def logger @logger ||= Steno.logger('cc.diego.instances_reporter') end - def converted_container_metrics(container_metrics) + def converted_container_metrics(container_metrics, formatted_current_time) cpu = container_metrics.cpuPercentage mem = container_metrics.memoryBytes disk = container_metrics.diskBytes if cpu.nil? || mem.nil? || disk.nil? { + time: formatted_current_time, cpu: 0, mem: 0, disk: 0 } else { + time: formatted_current_time, cpu: cpu / 100, mem: mem, disk: disk diff --git a/lib/logcache/traffic_controller_decorator.rb b/lib/logcache/traffic_controller_decorator.rb index f33ea2e3f70..61f88f0ffee 100644 --- a/lib/logcache/traffic_controller_decorator.rb +++ b/lib/logcache/traffic_controller_decorator.rb @@ -9,7 +9,7 @@ def initialize(logcache_client) @logcache_client = logcache_client end - def container_metrics(auth_token: nil, source_guid:) + def container_metrics(auth_token: nil, source_guid:, logcache_filter:) now = Time.now start_time = TimeUtils.to_nanoseconds(now - 2.minutes) end_time = TimeUtils.to_nanoseconds(now) @@ -36,7 +36,7 @@ def container_metrics(auth_token: nil, source_guid:) end final_envelopes. - select { |e| has_container_metrics_fields?(e) }. + select { |e| has_container_metrics_fields?(e) && logcache_filter.call(e) }. uniq(&:instance_id). map { |e| convert_to_traffic_controller_envelope(source_guid, e) } end @@ -67,22 +67,15 @@ def has_container_metrics_fields?(envelope) end def convert_to_traffic_controller_envelope(source_guid, logcache_envelope) - new_envelope = { + TrafficController::Models::Envelope.new( + containerMetric: TrafficController::Models::ContainerMetric.new({ applicationId: source_guid, instanceIndex: logcache_envelope.instance_id, - } - - if (metrics = logcache_envelope.gauge.metrics) - gauge_values = { - cpuPercentage: metrics['cpu'].value, - memoryBytes: metrics['memory'].value, - diskBytes: metrics['disk'].value - } - new_envelope.merge!(gauge_values) - end - - TrafficController::Models::Envelope.new( - containerMetric: TrafficController::Models::ContainerMetric.new(new_envelope) + cpuPercentage: logcache_envelope.gauge.metrics['cpu'].value, + memoryBytes: logcache_envelope.gauge.metrics['memory'].value, + diskBytes: logcache_envelope.gauge.metrics['disk'].value, + }), + tags: logcache_envelope.tags.map { |k, v| TrafficController::Models::Envelope::TagsEntry.new(key: k, value: v) }, ) end diff --git a/lib/traffic_controller/client.rb b/lib/traffic_controller/client.rb index c4c2481d9bd..63c96953a12 100644 --- a/lib/traffic_controller/client.rb +++ b/lib/traffic_controller/client.rb @@ -10,7 +10,7 @@ def initialize(url:) @url = url end - def container_metrics(auth_token:, source_guid:) + def container_metrics(auth_token:, source_guid:, logcache_filter: nil) response = with_request_error_handling do client.get("/apps/#{source_guid}/containermetrics", nil, { 'Authorization' => auth_token }) end diff --git a/spec/logcache/traffic_controller_decorator_spec.rb b/spec/logcache/traffic_controller_decorator_spec.rb index b48c6829ea6..d5054b9c54c 100644 --- a/spec/logcache/traffic_controller_decorator_spec.rb +++ b/spec/logcache/traffic_controller_decorator_spec.rb @@ -3,7 +3,7 @@ require 'utils/time_utils' RSpec.describe Logcache::TrafficControllerDecorator do - subject { described_class.new(wrapped_logcache_client).container_metrics(source_guid: process_guid) } + subject { described_class.new(wrapped_logcache_client).container_metrics(source_guid: process_guid, logcache_filter: filter) } let(:wrapped_logcache_client) { instance_double(Logcache::Client, container_metrics: logcache_response) } let(:num_instances) { 11 } @@ -23,7 +23,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 100 * i + 2), 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 100 * i + 3), }), - instance_id: (offset + i).to_s + instance_id: (offset + i).to_s, + tags: { + 'source_id' => process.app.guid, + 'process_id' => process.guid, + }, ), Loggregator::V2::Envelope.new( timestamp: last_timestamp, @@ -33,7 +37,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim 'absolute_entitlement' => Loggregator::V2::GaugeValue.new(unit: 'nanoseconds', value: 100 * i + 2), 'container_age' => Loggregator::V2::GaugeValue.new(unit: 'nanoseconds', value: 100 * i + 3), }), - instance_id: (offset + i).to_s + instance_id: (offset + i).to_s, + tags: { + 'source_id' => process.app.guid, + 'process_id' => process.guid, + }, ) ] end @@ -41,6 +49,8 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim end describe 'converting from Logcache to TrafficController' do + let(:filter) { ->(_) { true } } + before do allow(wrapped_logcache_client).to receive(:container_metrics).and_return(logcache_response) end @@ -53,6 +63,54 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim ) end + context 'filters' do + let(:envelopes) { + Loggregator::V2::EnvelopeBatch.new( + batch: [ + Loggregator::V2::Envelope.new( + source_id: process_guid, + gauge: Loggregator::V2::Gauge.new(metrics: { + 'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10), + 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 11), + 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 12), + }), + instance_id: '1' + ), + Loggregator::V2::Envelope.new( + source_id: process_guid, + gauge: Loggregator::V2::Gauge.new(metrics: { + 'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 13), + 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10), + 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10), + }), + instance_id: '2' + ), + Loggregator::V2::Envelope.new( + source_id: process_guid, + gauge: Loggregator::V2::Gauge.new(metrics: { + 'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10), + 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 9), + 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 8), + }), + instance_id: '1' + ), + ] + ) + } + let(:filter) { ->(e) { e.gauge.metrics['cpu'].value == 10 } } + + it 'filters envelopes' do + subject + + expect(subject).to have(1).items + expect(subject.first.containerMetric.applicationId).to eq(process_guid) + expect(subject.first.containerMetric.instanceIndex).to eq(1) + expect(subject.first.containerMetric.cpuPercentage).to eq(10) + expect(subject.first.containerMetric.memoryBytes).to eq(11) + expect(subject.first.containerMetric.diskBytes).to eq(12) + end + end + context 'when given an empty envelope batch' do let(:envelopes) { Loggregator::V2::EnvelopeBatch.new } @@ -170,6 +228,29 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim end end + context 'when the envelope does not have tags' do + let(:envelopes) { + Loggregator::V2::EnvelopeBatch.new( + batch: [ + Loggregator::V2::Envelope.new( + source_id: process_guid, + gauge: Loggregator::V2::Gauge.new(metrics: { + 'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10), + 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 11), + 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 12), + }), + instance_id: '1', + tags: {}, + ), + ] + ) + } + + it 'returns an envelope with an empty array of tags' do + expect(subject.first.tags).to be_empty + end + end + context 'when given multiple envelopes back' do let(:envelopes) { Loggregator::V2::EnvelopeBatch.new( @@ -181,7 +262,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 11), 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 12), }), - instance_id: '1' + instance_id: '1', + tags: { + 'source_id' => process.app.guid, + 'process_id' => process.guid, + }, ), Loggregator::V2::Envelope.new( source_id: process_guid, @@ -190,7 +275,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 21), 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 22), }), - instance_id: '2' + instance_id: '2', + tags: { + 'source_id' => process.app.guid, + 'process_id' => process.guid, + }, ), Loggregator::V2::Envelope.new( source_id: process_guid, @@ -199,7 +288,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 31), 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 32), }), - instance_id: '3' + instance_id: '3', + tags: { + 'source_id' => process.app.guid, + 'process_id' => process.guid, + }, ) ] ) @@ -213,6 +306,11 @@ def generate_batch(size, offset: 0, last_timestamp: TimeUtils.to_nanoseconds(Tim expect(subject.first.containerMetric.memoryBytes).to eq(11) expect(subject.first.containerMetric.diskBytes).to eq(12) + expect(subject.first.tags[0].key).to eq('source_id') + expect(subject.first.tags[0].value).to eq(process.app.guid) + expect(subject.first.tags[1].key).to eq('process_id') + expect(subject.first.tags[1].value).to eq(process.guid) + expect(subject.second.containerMetric.applicationId).to eq(process_guid) expect(subject.second.containerMetric.instanceIndex).to eq(2) expect(subject.second.containerMetric.cpuPercentage).to eq(20) diff --git a/spec/unit/lib/cloud_controller/diego/app_recipe_builder_spec.rb b/spec/unit/lib/cloud_controller/diego/app_recipe_builder_spec.rb index 6b4e28a4fc0..2f67f76f79b 100644 --- a/spec/unit/lib/cloud_controller/diego/app_recipe_builder_spec.rb +++ b/spec/unit/lib/cloud_controller/diego/app_recipe_builder_spec.rb @@ -286,7 +286,16 @@ module Diego expect(lrp.log_source).to eq(LRP_LOG_SOURCE) expect(lrp.max_pids).to eq(100) expect(lrp.memory_mb).to eq(128) - expect(lrp.metrics_guid).to eq(process.guid) + expect(lrp.metrics_guid).to eq(process.app.guid) + expect(lrp.metric_tags.size).to eq(4) + expect(lrp.metric_tags[0].key).to eq('source_id') + expect(lrp.metric_tags[0].value.static).to eq(process.app.guid) + expect(lrp.metric_tags[1].key).to eq('process_id') + expect(lrp.metric_tags[1].value.static).to eq(process.guid) + expect(lrp.metric_tags[2].key).to eq('process_instance_id') + expect(lrp.metric_tags[2].value.dynamic).to eq(::Diego::Bbs::Models::MetricTagValue::DynamicValue::INSTANCE_GUID) + expect(lrp.metric_tags[3].key).to eq('instance_id') + expect(lrp.metric_tags[3].value.dynamic).to eq(::Diego::Bbs::Models::MetricTagValue::DynamicValue::INDEX) expect(lrp.monitor).to eq(expected_monitor_action) expect(lrp.network).to eq(expected_network) expect(lrp.ports).to eq([4444, 5555]) @@ -941,7 +950,16 @@ module Diego expect(lrp.log_guid).to eq(process.app.guid) expect(lrp.max_pids).to eq(100) expect(lrp.memory_mb).to eq(128) - expect(lrp.metrics_guid).to eq(process.guid) + expect(lrp.metrics_guid).to eq(process.app.guid) + expect(lrp.metric_tags.size).to eq(4) + expect(lrp.metric_tags[0].key).to eq('source_id') + expect(lrp.metric_tags[0].value.static).to eq(process.app.guid) + expect(lrp.metric_tags[1].key).to eq('process_id') + expect(lrp.metric_tags[1].value.static).to eq(process.guid) + expect(lrp.metric_tags[2].key).to eq('process_instance_id') + expect(lrp.metric_tags[2].value.dynamic).to eq(::Diego::Bbs::Models::MetricTagValue::DynamicValue::INSTANCE_GUID) + expect(lrp.metric_tags[3].key).to eq('instance_id') + expect(lrp.metric_tags[3].value.dynamic).to eq(::Diego::Bbs::Models::MetricTagValue::DynamicValue::INDEX) expect(lrp.monitor).to eq(expected_monitor_action) expect(lrp.network).to eq(expected_network) expect(lrp.ports).to eq([4444, 5555]) diff --git a/spec/unit/lib/cloud_controller/diego/reporters/instances_stats_reporter_spec.rb b/spec/unit/lib/cloud_controller/diego/reporters/instances_stats_reporter_spec.rb index 682c2ba9675..f7f6710aae7 100644 --- a/spec/unit/lib/cloud_controller/diego/reporters/instances_stats_reporter_spec.rb +++ b/spec/unit/lib/cloud_controller/diego/reporters/instances_stats_reporter_spec.rb @@ -5,8 +5,8 @@ module VCAP::CloudController module Diego RSpec.describe InstancesStatsReporter do subject(:instances_reporter) { InstancesStatsReporter.new(bbs_instances_client, traffic_controller_client) } - - let(:process) { ProcessModelFactory.make(instances: desired_instances) } + let(:app) { AppModel.make } + let(:process) { ProcessModel.make(instances: desired_instances, app: app) } let(:desired_instances) { 1 } let(:bbs_instances_client) { instance_double(BbsInstancesClient) } let(:traffic_controller_client) { instance_double(TrafficController::Client) } @@ -30,7 +30,25 @@ def make_actual_lrp(instance_guid:, index:, state:, error:, since:) describe '#stats_for_app' do let(:desired_instances) { bbs_actual_lrps_response.length } let(:bbs_actual_lrps_response) { [actual_lrp_1] } - let(:bbs_desired_lrp_response) { ::Diego::Bbs::Models::DesiredLRP.new PlacementTags: ['isolation-segment-name'] } + let(:bbs_desired_lrp_response) do + ::Diego::Bbs::Models::DesiredLRP.new( + PlacementTags: placement_tags, + metric_tags: metrics_tags, + ) + end + let(:placement_tags) { ['isolation-segment-name'] } + let(:metrics_tags) { + [ + ::Diego::Bbs::Models::DesiredLRP::MetricTagsEntry.new( + key: 'source_id', + value: ::Diego::Bbs::Models::MetricTagValue.new(static: process.app.guid), + ), + ::Diego::Bbs::Models::DesiredLRP::MetricTagsEntry.new( + key: 'process_id', + value: ::Diego::Bbs::Models::MetricTagValue.new(static: process.guid), + ), + ] + } let(:formatted_current_time) { Time.now.to_datetime.rfc3339 } let(:lrp_1_net_info) do @@ -60,6 +78,7 @@ def make_actual_lrp(instance_guid:, index:, state:, error:, since:) memoryBytes: 564, diskBytes: 5000, ), + tags: [::TrafficController::Models::Envelope::TagsEntry.new(key: 'process_id', value: process.guid)], ), ] end @@ -94,7 +113,9 @@ def make_actual_lrp(instance_guid:, index:, state:, error:, since:) before do allow(bbs_instances_client).to receive(:lrp_instances).and_return(bbs_actual_lrps_response) allow(bbs_instances_client).to receive(:desired_lrp_instance).and_return(bbs_desired_lrp_response) - allow(traffic_controller_client).to receive(:container_metrics).with(auth_token: 'my-token', source_guid: process.guid).and_return(traffic_controller_response) + allow(traffic_controller_client).to receive(:container_metrics). + with(auth_token: 'my-token', source_guid: process.app.guid, logcache_filter: anything). + and_return(traffic_controller_response) allow(VCAP::CloudController::SecurityContext).to receive(:auth_token).and_return('my-token') end @@ -102,8 +123,79 @@ def make_actual_lrp(instance_guid:, index:, state:, error:, since:) expect(instances_reporter.stats_for_app(process)).to eq(expected_stats_response) end + it 'passes a process_id filter' do + filter = nil + + allow(traffic_controller_client).to receive(:container_metrics) { |args| + filter = args[:logcache_filter] + }.and_return(traffic_controller_response) + + expected_envelope = Loggregator::V2::Envelope.new( + source_id: process.app.guid, + gauge: Loggregator::V2::Gauge.new(metrics: { + 'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10), + 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 11), + 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 12), + }), + instance_id: '1', + tags: { + 'process_id' => process.guid, + } + ) + other_envelope = Loggregator::V2::Envelope.new( + source_id: process.app.guid, + gauge: Loggregator::V2::Gauge.new(metrics: { + 'cpu' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 13), + 'memory' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10), + 'disk' => Loggregator::V2::GaugeValue.new(unit: 'bytes', value: 10), + }), + instance_id: '1', + tags: { + 'process_id' => 'different-guid', + } + ) + + instances_reporter.stats_for_app(process) + + expect([expected_envelope, other_envelope].select { |e| filter.call(e) }).to eq([expected_envelope]) + end + + context 'when the desired lrp does NOT have a process_id metric tag' do + let(:metrics_tags) { + [ + ::Diego::Bbs::Models::DesiredLRP::MetricTagsEntry.new( + key: 'source_id', + value: ::Diego::Bbs::Models::MetricTagValue.new(static: process.guid), + ), + ] + } + let(:traffic_controller_response) do + [ + ::TrafficController::Models::Envelope.new( + origin: 'does-anyone-even-know?', + eventType: ::TrafficController::Models::Envelope::EventType::ContainerMetric, + containerMetric: ::TrafficController::Models::ContainerMetric.new( + instanceIndex: 0, + cpuPercentage: 3.92, + memoryBytes: 564, + diskBytes: 5000, + ), + ), + ] + end + + it 'gets metrics for the process and does not filter on the source_id' do + expect(traffic_controller_client). + to receive(:container_metrics). + with(auth_token: 'my-token', source_guid: process.guid, logcache_filter: anything). + and_return(traffic_controller_response) + + expect(instances_reporter.stats_for_app(process)).to eq(expected_stats_response) + end + end + context 'when there is no isolation segment for the app' do - let(:bbs_desired_lrp_response) { ::Diego::Bbs::Models::DesiredLRP.new PlacementTags: [] } + let(:placement_tags) { [] } it 'returns nil for the isolation_segment' do expect(instances_reporter.stats_for_app(process)[0][:isolation_segment]).to eq(nil) @@ -241,7 +333,9 @@ def make_actual_lrp(instance_guid:, index:, state:, error:, since:) let(:error) { StandardError.new('tomato') } let(:mock_logger) { double(:logger, error: nil, debug: nil) } before do - allow(traffic_controller_client).to receive(:container_metrics).with(auth_token: 'my-token', source_guid: process.guid).and_raise(error) + allow(traffic_controller_client).to receive(:container_metrics). + with(auth_token: 'my-token', source_guid: process.app.guid, logcache_filter: anything). + and_raise(error) allow(instances_reporter).to receive(:logger).and_return(mock_logger) end