Skip to content

Commit

Permalink
Merge pull request #2250 from cloudfoundry/handle-log-cache-unavailab…
Browse files Browse the repository at this point in the history
…ility

Support partial Process Stats responses when the stats service (log-cache or metric-proxy) is unavailable
  • Loading branch information
sethboyles authored Apr 30, 2021
2 parents ab3fac2 + 932e8ec commit 99d4874
Show file tree
Hide file tree
Showing 18 changed files with 307 additions and 148 deletions.
7 changes: 6 additions & 1 deletion app/controllers/runtime/stats_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ def stats(guid, opts={})
end

begin
stats = instances_reporters.stats_for_app(process)
stats, warnings = instances_reporters.stats_for_app(process)

warnings.each do |warning_message|
add_warning(warning_message)
end

stats.each_value do |stats_hash|
if stats_hash[:stats]
stats_hash[:stats].delete_if { |key, _| key == :net_info }
Expand Down
3 changes: 2 additions & 1 deletion app/controllers/v3/processes_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def scale
end

def stats
process_stats = instances_reporters.stats_for_app(@process)
process_stats, warnings = instances_reporters.stats_for_app(@process)
add_warning_headers(warnings)

render status: :ok, json: Presenters::V3::ProcessStatsPresenter.new(@process.type, process_stats)
end
Expand Down
26 changes: 18 additions & 8 deletions app/presenters/v3/process_stats_presenter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def initialize(type, process_stats)

def to_hash
{
resources: present_stats_hash
resources: present_stats_hash,
}
end

Expand All @@ -35,20 +35,17 @@ def found_instance_stats_hash(index, stats)
type: @type,
index: index,
state: stats[:state],
usage: {
time: stats[:stats][:usage][:time],
cpu: stats[:stats][:usage][:cpu],
mem: stats[:stats][:usage][:mem],
disk: stats[:stats][:usage][:disk],
},
host: stats[:stats][:host],
uptime: stats[:stats][:uptime],
mem_quota: stats[:stats][:mem_quota],
disk_quota: stats[:stats][:disk_quota],
fds_quota: stats[:stats][:fds_quota],
isolation_segment: stats[:isolation_segment],
details: stats[:details]
}.tap { |presented_stats| add_port_info(presented_stats, stats) }
}.tap do |presented_stats|
add_port_info(presented_stats, stats)
add_usage_info(presented_stats, stats)
end
end

def down_instance_stats_hash(index, stats)
Expand All @@ -70,6 +67,19 @@ def add_port_info(presented_stats, stats)
end
end

def add_usage_info(presented_stats, stats)
presented_stats[:usage] = if stats[:stats][:usage].present?
{
time: stats[:stats][:usage][:time],
cpu: stats[:stats][:usage][:cpu],
mem: stats[:stats][:usage][:mem],
disk: stats[:stats][:usage][:disk],
}
else
{}
end
end

def net_info_to_instance_ports(net_info_ports)
return [] if net_info_ports.nil?

Expand Down
1 change: 0 additions & 1 deletion config/cloud_controller.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ loggregator:
logcache:
host: 'http://doppler.service.cf.internal'
port: 8080
temporary_ignore_server_unavailable_errors: false

logcache_tls:
key_file: 'spec/fixtures/certs/log_cache.key'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ Name | Type | Description
**type** | _string_ | Process type; a unique identifier for processes belonging to an app
**index** | _integer_ | The zero-based index of running instances
**state** | _string_ | The state of the instance; valid values are `RUNNING`, `CRASHED`, `STARTING`, `DOWN`
**usage** | _object_ | Object containing actual usage data for the instance; the value is `{}` when usage data is unavailable
**usage.time** | _[timestamp](#timestamps)_ | The time when the usage was requested
**usage.cpu** | _number_ | The current cpu usage of the instance
**usage.mem** | _integer_ | The current memory usage of the instance
**usage.disk** | _integer_ | The current disk usage of the instance
**host** | _string_ | The host the instance is running on
**instance_ports** | _object_ | JSON array of port mappings between the network-exposed port used to communicate with the app (`external`) and port opened to the running process that it can listen on (`internal`)
**uptime** | _integer_ | The uptime in seconds for the instance
**mem_quota** | _integer_ | The current maximum memory allocated for the instance
**disk_quota** | _integer_ | The current maximum disk allocated for the instance
**mem_quota** | _integer_ | The current maximum memory allocated for the instance; the value is `null` when memory quota data is unavailable
**disk_quota** | _integer_ | The current maximum disk allocated for the instance; the value is `null` when disk quota data is unavailable
**fds_quota** | _integer_ | The maximum file descriptors the instance is allowed to use
**isolation_segment** | _string_ | The current isolation segment that the instance is running on; the value is `null` when the instance is not placed on a particular isolation segment
**details** | _string_ | Information about errors placing the instance; the value is `null` if there are no placement errors
1 change: 0 additions & 1 deletion lib/cloud_controller/config_schemas/base/api_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ class ApiSchema < VCAP::Config
logcache: {
host: String,
port: Integer,
temporary_ignore_server_unavailable_errors: bool,
},

optional(:logcache_tls) => {
Expand Down
1 change: 0 additions & 1 deletion lib/cloud_controller/dependency_locator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ def build_logcache_client
client_cert_path: config.get(:logcache_tls, :cert_file),
client_key_path: config.get(:logcache_tls, :key_file),
tls_subject_name: config.get(:logcache_tls, :subject_name),
temporary_ignore_server_unavailable_errors: config.get(:logcache, :temporary_ignore_server_unavailable_errors)
)
end

Expand Down
97 changes: 61 additions & 36 deletions lib/cloud_controller/diego/reporters/instances_stats_reporter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,9 @@ def stats_for_app(process)
logger.debug('stats_for_app.fetching_container_metrics', process_guid: process.guid)
desired_lrp = bbs_instances_client.desired_lrp_instance(process)

log_cache_data = envelopes(desired_lrp, process)
stats = log_cache_data.
map { |e|
[
e.containerMetric.instanceIndex,
converted_container_metrics(e.containerMetric, formatted_current_time),
]
}.to_h

quota_stats = log_cache_data.
map { |e|
[
e.containerMetric.instanceIndex,
e.containerMetric,
]
}.to_h
log_cache_data, log_cache_errors = envelopes(desired_lrp, process)
stats = formatted_process_stats(log_cache_data, formatted_current_time)
quota_stats = formatted_quota_stats(log_cache_data)

bbs_instances_client.lrp_instances(process).each do |actual_lrp|
index = actual_lrp.actual_lrp_key.index
Expand All @@ -51,25 +38,20 @@ def stats_for_app(process)
port: get_default_port(actual_lrp.actual_lrp_net_info),
net_info: actual_lrp.actual_lrp_net_info.to_h,
uptime: nanoseconds_to_seconds(current_time * 1e9 - actual_lrp.since),
mem_quota: quota_stats[index]&.memoryBytesQuota,
disk_quota: quota_stats[index]&.diskBytesQuota,
fds_quota: process.file_descriptors,
usage: stats[index] || {
time: formatted_current_time,
cpu: 0,
mem: 0,
disk: 0,
},
}
}.merge(metrics_data_for_instance(stats, quota_stats, log_cache_errors, formatted_current_time, index))
}
info[:details] = actual_lrp.placement_error if actual_lrp.placement_error.present?
result[actual_lrp.actual_lrp_key.index] = info
end

fill_unreported_instances_with_down_instances(result, process)

warnings = [log_cache_errors].compact
return result, warnings
rescue CloudController::Errors::NoRunningInstances => e
logger.info('stats_for_app.error', error: e.to_s)
fill_unreported_instances_with_down_instances({}, process)
return fill_unreported_instances_with_down_instances({}, process), []
rescue StandardError => e
logger.error('stats_for_app.error', error: e.to_s)
if e.is_a?(CloudController::Errors::ApiError) && e.name == 'ServiceUnavailable'
Expand All @@ -83,6 +65,53 @@ def stats_for_app(process)

private

attr_reader :bbs_instances_client

def metrics_data_for_instance(stats, quota_stats, log_cache_errors, formatted_current_time, index)
if log_cache_errors.blank?
{
mem_quota: quota_stats[index]&.memoryBytesQuota,
disk_quota: quota_stats[index]&.diskBytesQuota,
usage: stats[index] || missing_process_stats(formatted_current_time)
}
else
{
mem_quota: nil,
disk_quota: nil,
usage: {}
}
end
end

def missing_process_stats(formatted_current_time)
{
time: formatted_current_time,
cpu: 0,
mem: 0,
disk: 0,
}
end

def formatted_process_stats(log_cache_data, formatted_current_time)
log_cache_data.
map { |e|
[
e.containerMetric.instanceIndex,
converted_container_metrics(e.containerMetric, formatted_current_time),
]
}.to_h
end

def formatted_quota_stats(log_cache_data)
log_cache_data.
map { |e|
[
e.containerMetric.instanceIndex,
e.containerMetric,
]
}.to_h
end

def envelopes(desired_lrp, process)
if desired_lrp.metric_tags['process_id']
filter = ->(envelope) { envelope.tags.any? { |key, value| key == 'process_id' && value == process.guid } }
Expand All @@ -92,15 +121,16 @@ def envelopes(desired_lrp, process)
source_guid = process.guid
end

@logstats_client.container_metrics(
return @logstats_client.container_metrics(
source_guid: source_guid,
auth_token: VCAP::CloudController::SecurityContext.auth_token,
logcache_filter: filter
)
), nil
rescue GRPC::BadStatus, CloudController::Errors::ApiError => e
logger.error('stats_for_app.error', error: e.message, backtrace: e.backtrace.join("\n"))
return [], 'Stats server temporarily unavailable.'
end

attr_reader :bbs_instances_client

def logger
@logger ||= Steno.logger('cc.diego.instances_reporter')
end
Expand All @@ -111,12 +141,7 @@ def converted_container_metrics(container_metrics, formatted_current_time)
disk = container_metrics.diskBytes

if cpu.nil? || mem.nil? || disk.nil?
{
time: formatted_current_time,
cpu: 0,
mem: 0,
disk: 0
}
missing_process_stats(formatted_current_time)
else
{
time: formatted_current_time,
Expand Down
10 changes: 1 addition & 9 deletions lib/logcache/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Client
MAX_LIMIT = 1000
DEFAULT_LIMIT = 100

def initialize(host:, port:, client_ca_path:, client_cert_path:, client_key_path:, tls_subject_name:, temporary_ignore_server_unavailable_errors:)
def initialize(host:, port:, client_ca_path:, client_cert_path:, client_key_path:, tls_subject_name:)
if client_ca_path
client_ca = IO.read(client_ca_path)
client_key = IO.read(client_key_path)
Expand All @@ -25,8 +25,6 @@ def initialize(host:, port:, client_ca_path:, client_cert_path:, client_key_path
timeout: 250
)
end

@temporary_ignore_server_unavailable_errors = temporary_ignore_server_unavailable_errors
end

def container_metrics(source_guid:, envelope_limit: DEFAULT_LIMIT, start_time:, end_time:)
Expand Down Expand Up @@ -59,12 +57,6 @@ def with_request_error_handling(source_guid, &blk)
retry
end

if @temporary_ignore_server_unavailable_errors && e.is_a?(GRPC::BadStatus) && e.to_status.code == 14
logger.warn("rescuing GRPC Unavailable error: #{e.to_status}")

return EmptyEnvelope.new(source_guid)
end

raise e
end

Expand Down
2 changes: 1 addition & 1 deletion spec/api/documentation/apps_api_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def after_standard_model_delete(guid)

instances_reporters = double(:instances_reporters)
allow(CloudController::DependencyLocator.instance).to receive(:instances_reporters).and_return(instances_reporters)
allow(instances_reporters).to receive(:stats_for_app).and_return(stats)
allow(instances_reporters).to receive(:stats_for_app).and_return([stats, []])

client.get "/v2/apps/#{process.guid}/stats", {}, headers
expect(status).to eq(200)
Expand Down
56 changes: 18 additions & 38 deletions spec/logcache/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ module Logcache
let(:channel_arg_hash) { { GRPC::Core::Channel::SSL_TARGET => tls_subject_name } }
let(:client) do
Logcache::Client.new(host: host, port: port, client_ca_path: client_ca_path,
client_cert_path: client_cert_path, client_key_path: client_key_path, tls_subject_name: tls_subject_name,
temporary_ignore_server_unavailable_errors: false)
client_cert_path: client_cert_path, client_key_path: client_key_path, tls_subject_name: tls_subject_name)
end
let(:client_ca) { File.open(client_ca_path).read }
let(:client_key) { File.open(client_key_path).read }
Expand Down Expand Up @@ -76,41 +75,17 @@ module Logcache
allow(logcache_service).to receive(:read).and_raise(bad_status)
end

context 'and operator has enabled temporary_ignore_server_unavailable_errors' do
let(:client) do
Logcache::Client.new(host: host, port: port, client_ca_path: client_ca_path,
client_cert_path: client_cert_path, client_key_path: client_key_path, tls_subject_name: tls_subject_name,
temporary_ignore_server_unavailable_errors: true)
end

it 'returns an empty envelope' do
expect(
client.container_metrics(source_guid: process.guid, envelope_limit: 1000, start_time: 100, end_time: 101)
).to be_a(Logcache::EmptyEnvelope)
end

# TODO: fix calling the function under test separately
it 'retries the request three times' do
client.container_metrics(source_guid: process.guid, envelope_limit: 1000, start_time: 100, end_time: 101)

expect(logcache_service).to have_received(:read).with(logcache_request).exactly(3).times
end
let(:client) do
Logcache::Client.new(host: host, port: port, client_ca_path: client_ca_path,
client_cert_path: client_cert_path, client_key_path: client_key_path, tls_subject_name: tls_subject_name)
end

context 'and operator has disabled temporary_ignore_server_unavailable_errors' do
let(:client) do
Logcache::Client.new(host: host, port: port, client_ca_path: client_ca_path,
client_cert_path: client_cert_path, client_key_path: client_key_path, tls_subject_name: tls_subject_name,
temporary_ignore_server_unavailable_errors: false)
end

it 'retries the request three times and raises an exception' do
expect {
client.container_metrics(source_guid: process.guid, envelope_limit: 1000, start_time: 100, end_time: 101)
}.to raise_error(bad_status)
it 'retries the request three times and raises an exception' do
expect {
client.container_metrics(source_guid: process.guid, envelope_limit: 1000, start_time: 100, end_time: 101)
}.to raise_error(bad_status)

expect(logcache_service).to have_received(:read).with(logcache_request).exactly(3).times
end
expect(logcache_service).to have_received(:read).with(logcache_request).exactly(3).times
end
end

Expand All @@ -133,8 +108,7 @@ module Logcache

let(:client) do
Logcache::Client.new(host: host, port: port, client_ca_path: client_ca_path,
client_cert_path: client_cert_path, client_key_path: client_key_path, tls_subject_name: tls_subject_name,
temporary_ignore_server_unavailable_errors: false)
client_cert_path: client_cert_path, client_key_path: client_key_path, tls_subject_name: tls_subject_name)
end

it 'raises an exception' do
Expand Down Expand Up @@ -173,8 +147,14 @@ module Logcache

describe 'without TLS' do
let(:client) do
Logcache::Client.new(host: host, port: port, temporary_ignore_server_unavailable_errors: false,
client_ca_path: nil, client_cert_path: nil, client_key_path: nil, tls_subject_name: nil)
Logcache::Client.new(
host: host,
port: port,
client_ca_path: nil,
client_cert_path: nil,
client_key_path: nil,
tls_subject_name: nil
)
end

describe '#container_metrics' do
Expand Down
Loading

0 comments on commit 99d4874

Please sign in to comment.