Skip to content

Commit

Permalink
Add DB connection metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
svkrieger committed Sep 17, 2024
1 parent 628bed7 commit 87b3ede
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 8 deletions.
10 changes: 10 additions & 0 deletions lib/cloud_controller/db.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def self.connect(opts, logger)
add_connection_expiration_extension(db, opts)
add_connection_validator_extension(db, opts)
db.extension(:requires_unique_column_names_in_subquery)
add_connection_metrics_extension(db)
db
end

Expand Down Expand Up @@ -69,6 +70,15 @@ def self.add_connection_expiration_extension(db, opts)
db.disconnect
end

def self.add_connection_metrics_extension(db)
# only add the metrics for api processes. Otherwise e.g. rake db:migrate would also initialize metric updaters, which need additional config
return if Object.const_defined?(:RakeConfig)

db.extension(:connection_metrics)
# so that we gather connection metrics from the beginning
db.disconnect
end

def self.load_models(db_config, logger)
db = connect(db_config, logger)
DBMigrator.new(db).wait_for_migrations!
Expand Down
23 changes: 16 additions & 7 deletions lib/cloud_controller/metrics/prometheus_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def self.allow_pid_label
end

DURATION_BUCKETS = [5, 10, 30, 60, 300, 600, 890].freeze
CONNECTION_DURATION_BUCKETS = [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10].freeze

METRICS = [
{ type: :gauge, name: :cc_job_queues_length_total, docstring: 'Job queues length of worker processes', labels: [:queue], aggregation: :most_recent },
Expand All @@ -35,7 +36,15 @@ def self.allow_pid_label
{ type: :gauge, name: :cc_running_tasks_total, docstring: 'Total running tasks', aggregation: :most_recent },
{ type: :gauge, name: :cc_running_tasks_memory_bytes, docstring: 'Total memory consumed by running tasks', aggregation: :most_recent },
{ type: :gauge, name: :cc_users_total, docstring: 'Number of users', aggregation: :most_recent },
{ type: :gauge, name: :cc_deployments_in_progress_total, docstring: 'Number of in progress deployments', aggregation: :most_recent }
{ type: :gauge, name: :cc_deployments_in_progress_total, docstring: 'Number of in progress deployments', aggregation: :most_recent },
{ type: :gauge, name: :cc_acquired_db_connections_total, labels: %i[process_type], docstring: 'Number of acquired DB connections' },
{ type: :histogram, name: :cc_db_connection_hold_duration_seconds, docstring: 'The time threads were holding DB connections', buckets: CONNECTION_DURATION_BUCKETS },
# cc_connection_pool_timeouts_total must be a gauge metric, because otherwise we cannot match them with processes
{ type: :gauge, name: :cc_db_connection_pool_timeouts_total, labels: %i[process_type],
docstring: 'Number of threads which failed to acquire a free DB connection from the pool within the timeout' },
{ type: :gauge, name: :cc_open_db_connections_total, labels: %i[process_type], docstring: 'Number of open DB connections (acquired + available)' },
{ type: :histogram, name: :cc_db_connection_wait_duration_seconds, docstring: 'The time threads were waiting for an available DB connection',
buckets: CONNECTION_DURATION_BUCKETS }
].freeze

THIN_METRICS = [
Expand All @@ -61,20 +70,20 @@ def initialize(registry=Prometheus::Client.registry)

# Register all metrics, to initialize them for discoverability
METRICS.each { |metric| register(metric) }
THIN_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config.get(:webserver) == 'thin'
PUMA_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config.get(:webserver) == 'puma'
THIN_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config&.get(:webserver) == 'thin'
PUMA_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config&.get(:webserver) == 'puma'
end

def update_gauge_metric(metric, value, labels: {})
@registry.get(metric).set(value, labels:)
end

def increment_gauge_metric(metric)
@registry.get(metric).increment
def increment_gauge_metric(metric, labels: {})
@registry.get(metric).increment(labels:)
end

def decrement_gauge_metric(metric)
@registry.get(metric).decrement
def decrement_gauge_metric(metric, labels: {})
@registry.get(metric).decrement(labels:)
end

def increment_counter_metric(metric)
Expand Down
5 changes: 5 additions & 0 deletions lib/cloud_controller/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def initialize(argv)
secrets_hash = parse_secrets
parse_config(secrets_hash)

# DB connection metrics have a label to determine whether the process accessing the connection is the
# main or a worker process. We need to set this env variable before `setup_db` otherwise the main process
# will show up twice in the metrics as main and worker. Thin metrics will be labeled with main as well.
ENV['PROCESS_TYPE'] = 'main'

setup_cloud_controller

request_logs = VCAP::CloudController::Logs::RequestLogs.new(Steno.logger('cc.api'))
Expand Down
11 changes: 11 additions & 0 deletions lib/cloud_controller/runners/puma_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def initialize(config, app, logger, periodic_updater, request_logs)
conf.before_fork do
Sequel::Model.db.disconnect
end
conf.on_worker_boot do
ENV['PROCESS_TYPE'] = 'puma_worker'
prometheus_updater.update_gauge_metric(:cc_db_connection_pool_timeouts_total, 0, labels: { process_type: 'puma_worker' })
end
conf.on_worker_shutdown do
request_logs.log_incomplete_requests if request_logs
end
Expand All @@ -48,6 +52,7 @@ def initialize(config, app, logger, periodic_updater, request_logs)

events = Puma::Events.new
events.on_booted do
prometheus_updater.update_gauge_metric(:cc_db_connection_pool_timeouts_total, 0, labels: { process_type: 'main' })
Thread.new do
EM.run { periodic_updater.setup_updates }
end
Expand All @@ -65,5 +70,11 @@ def start!
@logger.error "Encountered error: #{e}\n#{e.backtrace&.join("\n")}"
raise e
end

private

def prometheus_updater
CloudController::DependencyLocator.instance.prometheus_updater
end
end
end
95 changes: 95 additions & 0 deletions lib/sequel/extensions/connection_metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# frozen_string_literal: true

#
# The connection_metrics extension enhances a database's
# connection pool to store metrics about the use of connections.
# Whenever a connection is acquired or released the number of
# currently acquired connections is emitted. Same for making new
# connections and disconnecting them from the DB. Example of use:
#
# DB.extension(:connection_metrics)
#
# Note that this extension has been only tested with a
# connection pool type :threaded. To use it with other
# connection pool types it might need adjustments.
#
# Related module: Sequel::ConnectionMetrics

module Sequel
module ConnectionMetrics
# Initialize the data structures used by this extension.
def self.extended(pool)
raise Error.new('cannot load connection_metrics extension if using a connection pool type different than :threaded') unless pool.pool_type == :threaded

pool.instance_exec do
sync do
@prometheus_updater = CloudController::DependencyLocator.instance.prometheus_updater
@connection_info = {}
end
end
end

private

def acquire(thread)
begin
if (conn = super)
acquired_at = Time.now.utc
@prometheus_updater.increment_gauge_metric(:cc_acquired_db_connections_total, labels: { process_type: })
@connection_info[thread] ||= {}
@connection_info[thread][:acquired_at] = acquired_at
end
rescue Sequel::PoolTimeout
@prometheus_updater.increment_gauge_metric(:cc_db_connection_pool_timeouts_total, labels: { process_type: })
raise
ensure
# acquire calls assign_connection, where the thread possibly has to wait for a free connection.
# For both cases, when the thread acquires a connection in time, or when it runs into a PoolTimeout,
# we emmit the time the thread waited for a connection.
if @connection_info[thread] && @connection_info[thread].key?(:waiting_since)
@prometheus_updater.update_histogram_metric :cc_db_connection_wait_duration_seconds, (Time.now.utc - @connection_info[thread][:waiting_since]).seconds
@connection_info[thread].delete(:waiting_since)
end
end
conn
end

def assign_connection(thread)
# if assign_connection does not return a connection, the pool is exhausted and the thread has to wait
unless (conn = super)
waiting_since = Time.now.utc
@connection_info[thread] = { waiting_since: } unless @connection_info[thread] && @connection_info[thread].key?(:waiting_since)
end
conn
end

def make_new(server)
conn = super
@prometheus_updater.update_gauge_metric(:cc_open_db_connections_total, size, labels: { process_type: })
conn
end

def disconnect_connection(conn)
super
@prometheus_updater.update_gauge_metric(:cc_open_db_connections_total, size, labels: { process_type: })
end

def release(thread)
super

# acquired_at should be always set, but as a safeguard we check that it is present before accessing
if @connection_info[thread] && @connection_info[thread].key?(:acquired_at)
@prometheus_updater.update_histogram_metric :cc_db_connection_hold_duration_seconds, (Time.now.utc - @connection_info[thread][:acquired_at]).seconds
end

@prometheus_updater.decrement_gauge_metric(:cc_acquired_db_connections_total, labels: { process_type: })
@connection_info.delete(thread)
end

def process_type
ENV.fetch('PROCESS_TYPE', nil)
end
end

Database.register_extension(:connection_metrics) { |db| db.pool.extend(ConnectionMetrics) }
end
6 changes: 6 additions & 0 deletions spec/unit/lib/cloud_controller/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ module VCAP::CloudController
end
end

it 'sets environment variable `PROCESS_TYPE` to `main`' do
subject

expect(ENV.fetch('PROCESS_TYPE')).to eq('main')
end

describe 'setting max db connections per process for puma' do
context 'when max db connections per process value is specified' do
before do
Expand Down
21 changes: 20 additions & 1 deletion spec/unit/lib/cloud_controller/runners/puma_runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ module VCAP::CloudController
let(:periodic_updater) { double(:periodic_updater) }
let(:request_logs) { double(:request_logs) }
let(:puma_launcher) { subject.instance_variable_get(:@puma_launcher) }
let(:dependency_locator) { instance_spy(CloudController::DependencyLocator) }
let(:prometheus_updater) { spy(VCAP::CloudController::Metrics::PrometheusUpdater) }

subject do
TestConfig.override(
Expand All @@ -30,6 +32,8 @@ module VCAP::CloudController

before do
allow(logger).to receive(:info)
allow(CloudController::DependencyLocator).to receive(:instance).and_return(dependency_locator)
allow(dependency_locator).to receive(:prometheus_updater).and_return(prometheus_updater)
end

describe 'initialize' do
Expand Down Expand Up @@ -109,6 +113,20 @@ module VCAP::CloudController
expect(request_logs).to receive(:log_incomplete_requests)
puma_launcher.config.final_options[:before_worker_shutdown].first.call
end

it 'initializes the cc_db_connection_pool_timeouts_total for the worker on worker boot' do
subject

expect(prometheus_updater).to receive(:update_gauge_metric).with(:cc_db_connection_pool_timeouts_total, 0, labels: { process_type: 'puma_worker' })
puma_launcher.config.final_options[:before_worker_boot].first.call
end

it 'sets environment variable `PROCESS_TYPE` to `puma_worker`' do
subject

puma_launcher.config.final_options[:before_worker_boot].first.call
expect(ENV.fetch('PROCESS_TYPE')).to eq('puma_worker')
end
end

describe 'start!' do
Expand All @@ -128,10 +146,11 @@ module VCAP::CloudController

describe 'Events' do
describe 'on_booted' do
it 'sets up periodic metrics updater with EM' do
it 'sets up periodic metrics updater with EM and initializes cc_db_connection_pool_timeouts_total for the main process' do
expect(Thread).to receive(:new).and_yield
expect(EM).to receive(:run).and_yield
expect(periodic_updater).to receive(:setup_updates)
expect(prometheus_updater).to receive(:update_gauge_metric).with(:cc_db_connection_pool_timeouts_total, 0, labels: { process_type: 'main' })

puma_launcher.events.fire(:on_booted)
end
Expand Down
Loading

0 comments on commit 87b3ede

Please sign in to comment.