Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PG check updates (connection, sizes, bgwriter) #1105

Merged
merged 9 commits into from
Sep 1, 2014
245 changes: 138 additions & 107 deletions checks.d/postgres.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
"""PostgreSQL check

Collects database-wide metrics and optionally per-relation metrics.
"""
# project
from checks import AgentCheck, CheckException

# 3rd party
import pg8000 as pg
from pg8000 import InterfaceError
from pg8000 import InterfaceError, ProgrammingError
import socket

class ShouldRestartException(Exception): pass

@@ -13,6 +18,7 @@ class PostgreSql(AgentCheck):
SOURCE_TYPE_NAME = 'postgresql'
RATE = AgentCheck.rate
GAUGE = AgentCheck.gauge
MONOTONIC = AgentCheck.monotonic_count

# turning columns into tags
DB_METRICS = {
@@ -33,16 +39,16 @@ class PostgreSql(AgentCheck):
BGW_METRICS = {
'descriptors': [],
'metrics': {
'checkpoints_timed' : ('postgresql.bgwriter.checkpoints_timed', RATE),
'checkpoints_req' : ('postgresql.bgwriter.checkpoints_requested', RATE),
'checkpoint_write_time': ('postgresql.bgwriter.write_time', RATE),
'checkpoint_sync_time' : ('postgresql.bgwriter.sync_time', RATE),
'buffers_checkpoint' : ('postgresql.bgwriter.buffers_checkpoint', RATE),
'buffers_clean' : ('postgresql.bgwriter.buffers_clean', RATE),
'maxwritten_clean' : ('postgresql.bgwriter.maxwritten_clean', RATE),
'buffers_backend' : ('postgresql.bgwriter.buffers_backend', RATE),
'buffers_backend_fsync': ('postgresql.bgwriter.buffers_backend_fsync', RATE),
'buffers_alloc' : ('postgresql.bgwriter.buffers_alloc', RATE),
'checkpoints_timed' : ('postgresql.bgwriter.checkpoints_timed', MONOTONIC),
'checkpoints_req' : ('postgresql.bgwriter.checkpoints_requested', MONOTONIC),
'checkpoint_write_time': ('postgresql.bgwriter.write_time', MONOTONIC),
'checkpoint_sync_time' : ('postgresql.bgwriter.sync_time', MONOTONIC),
'buffers_checkpoint' : ('postgresql.bgwriter.buffers_checkpoint', MONOTONIC),
'buffers_clean' : ('postgresql.bgwriter.buffers_clean', MONOTONIC),
'maxwritten_clean' : ('postgresql.bgwriter.maxwritten_clean', MONOTONIC),
'buffers_backend' : ('postgresql.bgwriter.buffers_backend', MONOTONIC),
'buffers_backend_fsync': ('postgresql.bgwriter.buffers_backend_fsync', MONOTONIC),
'buffers_alloc' : ('postgresql.bgwriter.buffers_alloc', MONOTONIC),
},
'query': "select %s FROM pg_stat_bgwriter",
'relation': False,
@@ -130,11 +136,54 @@ class PostgreSql(AgentCheck):
'relation': True,
}

# Individual metrics with tuple of (query, metric_name, metric_type)
MAX_CONNECTIONS_METRIC = ('SHOW max_connections;','postgresql.max_connections', GAUGE)
SIZE_METRICS = {
'descriptors': [
('relname', 'table'),
],
'metrics': {
'pg_table_size(C.oid)' : ('postgresql.table_size', GAUGE),
'pg_indexes_size(C.oid)' : ('postgresql.index_size', GAUGE),
'pg_total_relation_size(C.oid)': ('postgresql.total_size', GAUGE),
},
'relation': True,
'query': """
SELECT
relname,
%s
FROM pg_class C
LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE nspname NOT IN ('pg_catalog', 'information_schema') AND
nspname !~ '^pg_toast' AND
relkind IN ('r') AND
relname = ANY(%s)"""
}

HOT_STANDBY_METRIC = ('select now() - pg_last_xact_replay_timestamp() AS replication_delay;', 'postgresql.replication_delay', GAUGE)
REPLICATION_METRICS = {
'descriptors': [],
'metrics': {
'GREATEST(0, EXTRACT(EPOCH FROM now() - pg_last_xact_replay_timestamp())) AS replication_delay': ('postgresql.replication_delay', GAUGE),
},
'relation': False,
'query': """
SELECT %s
FROM pg_settings
WHERE name = 'hot_standby'
AND setting = 'on'"""
}

CONNECTION_METRICS = {
'descriptors': [],
'metrics': {
'MAX(setting) AS max_connections': ('postgresql.max_connections', GAUGE),
'SUM(numbackends)/MAX(setting) AS pct_connections': ('postgresql.percent_usage_connections', GAUGE),
},
'relation': False,
'query': """
WITH max_con AS (SELECT setting::float FROM pg_settings WHERE name = 'max_connections')
SELECT %s
FROM pg_stat_database, max_con
"""
}

def __init__(self, name, init_config, agentConfig):
AgentCheck.__init__(self, name, init_config, agentConfig)
@@ -187,99 +236,82 @@ def _collect_stats(self, key, db, instance_tags, relations):

# Do we need relation-specific metrics?
if not relations:
metric_scope = (self.DB_METRICS, self.BGW_METRICS, self.LOCK_METRICS)
metric_scope = (self.DB_METRICS, self.CONNECTION_METRICS, self.BGW_METRICS,
self.LOCK_METRICS, self.REPLICATION_METRICS)
else:
metric_scope = (self.DB_METRICS, self.BGW_METRICS, self.LOCK_METRICS, self.REL_METRICS, self.IDX_METRICS)
metric_scope = (self.DB_METRICS, self.CONNECTION_METRICS, self.BGW_METRICS,
self.LOCK_METRICS, self.REPLICATION_METRICS,
self.REL_METRICS, self.IDX_METRICS, self.SIZE_METRICS)

try:
cursor = db.cursor()

for scope in metric_scope:
# build query
cols = scope['metrics'].keys() # list of metrics to query, in some order
# we must remember that order to parse results

try:
# if this is a relation-specific query, we need to list all relations last
if scope['relation'] and len(relations) > 0:
query = scope['query'] % (", ".join(cols), "%s") # Keep the last %s intact
self.log.debug("Running query: %s with relations: %s" % (query, relations))
cursor.execute(query, (relations, ))
else:
query = scope['query'] % (", ".join(cols))
self.log.debug("Running query: %s" % query)
cursor.execute(query.replace(r'%', r'%%'))

results = cursor.fetchall()
except ProgrammingError, e:
self.log.warning("Not all metrics may be available: %s" % str(e))
continue
except Exception:
import pdb; pdb.set_trace()

# parse & submit results
# A row should look like this
# (descriptor, descriptor, ..., value, value, value, value, ...)
# with descriptor a PG relation or index name, which we use to create the tags
for row in results:
# turn descriptors into tags
desc = scope['descriptors']
# Check that all columns will be processed
assert len(row) == len(cols) + len(desc)

# Build tags
# descriptors are: (pg_name, dd_tag_name): value
# Special-case the "db" tag, which overrides the one that is passed as instance_tag
# The reason is that pg_stat_database returns all databases regardless of the
# connection.
if not scope['relation']:
tags = [t for t in instance_tags if not t.startswith("db:")]
else:
tags = [t for t in instance_tags]

tags += ["%s:%s" % (d[0][1], d[1]) for d in zip(desc, row[:len(desc)])]

# [(metric-map, value), (metric-map, value), ...]
# metric-map is: (dd_name, "rate"|"gauge")
# shift the results since the first columns will be the "descriptors"
values = zip([scope['metrics'][c] for c in cols], row[len(desc):])

# To submit simply call the function for each value v
# v[0] == (metric_name, submit_function)
# v[1] == the actual value
# tags are
[v[0][1](self, v[0][0], v[1], tags=tags) for v in values]

if not results:
self.warning('No results were found for query: "%s"' % query)

cursor.close()
except InterfaceError, e:
self.log.error("Connection seems broken: %s" % str(e))
self.log.error("Connection error: %s" % str(e))
raise ShouldRestartException
except socket.error, e:
self.log.error("Connection error: %s" % str(e))
raise ShouldRestartException

for scope in metric_scope:
# build query
cols = scope['metrics'].keys() # list of metrics to query, in some order
# we must remember that order to parse results

# if this is a relation-specific query, we need to list all relations last
if scope['relation'] and len(relations) > 0:
query = scope['query'] % (", ".join(cols), "%s") # Keep the last %s intact
self.log.debug("Running query: %s with relations: %s" % (query, relations))
cursor.execute(query, (relations, ))
else:
query = scope['query'] % (", ".join(cols))
self.log.debug("Running query: %s" % query)
cursor.execute(query.replace(r'%', r'%%'))

results = cursor.fetchall()


# parse & submit results
# A row should look like this
# (descriptor, descriptor, ..., value, value, value, value, ...)
# with descriptor a PG relation or index name, which we use to create the tags
for row in results:
# turn descriptors into tags
desc = scope['descriptors']
# Check that all columns will be processed
assert len(row) == len(cols) + len(desc)

# Build tags
# descriptors are: (pg_name, dd_tag_name): value
# Special-case the "db" tag, which overrides the one that is passed as instance_tag
# The reason is that pg_stat_database returns all databases regardless of the
# connection.
if not scope['relation']:
tags = [t for t in instance_tags if not t.startswith("db:")]
else:
tags = [t for t in instance_tags]

tags += ["%s:%s" % (d[0][1], d[1]) for d in zip(desc, row[:len(desc)])]

# [(metric-map, value), (metric-map, value), ...]
# metric-map is: (dd_name, "rate"|"gauge")
# shift the results since the first columns will be the "descriptors"
values = zip([scope['metrics'][c] for c in cols], row[len(desc):])

# To submit simply call the function for each value v
# v[0] == (metric_name, submit_function)
# v[1] == the actual value
# tags are
[v[0][1](self, v[0][0], v[1], tags=tags) for v in values]

if not results:
self.warning('No results were found for query: "%s"' % query)

# Query for miscellaneous metrics
query = self.MAX_CONNECTIONS_METRIC[0]
cursor.execute(query)
result = cursor.fetchone()
self.MAX_CONNECTIONS_METRIC[2](self, self.MAX_CONNECTIONS_METRIC[1], result[0], tags=instance_tags)

# Query for percent usage of max_connections
cursor.execute('show max_connections')
max_conn = cursor.fetchone()[0]
cursor.execute('SELECT sum(numbackends) FROM pg_stat_database')
current_conn = cursor.fetchone()[0]
percent_usage = float(current_conn) / float(max_conn)
self.gauge('postgresql.percent_usage_connections', percent_usage, tags=instance_tags)

# check if hot_standby is on before running hot standby metrics (replication delay)
cursor.execute('show hot_standby')
is_standby = cursor.fetchone()[0]=='on'
if is_standby:
query = self.HOT_STANDBY_METRIC[0]
cursor.execute(query)
# Python interprets the return value of the replication delay output from postgres as a timedelta
# Therefore, you must use the seconds attribute on the timedelta object in order to get the correct metric value.
result = cursor.fetchone()[0]
if result is not None:
if result.days < 0:
self.HOT_STANDBY_METRIC[2](self, self.HOT_STANDBY_METRIC[1], 0, tags=instance_tags)
else:
self.HOT_STANDBY_METRIC[2](self, self.HOT_STANDBY_METRIC[1], result.microseconds / 1000000.0, tags=instance_tags)
cursor.close()

def get_connection(self, key, host, port, user, password, dbname, use_cached=True):
"Get and memoize connections to instances"
@@ -308,7 +340,7 @@ def get_connection(self, key, host, port, user, password, dbname, use_cached=Tru
self.service_check('postgres.can_connect', status, tags=service_check_tags)
self.log.info('pg status: %s' % status)

except Exception, e:
except Exception:
status = AgentCheck.CRITICAL
self.service_check('postgres.can_connect', status, tags=service_check_tags)
self.log.info('pg status: %s' % status)
@@ -341,7 +373,6 @@ def check(self, instance):
dbname = 'postgres'

key = '%s:%s:%s' % (host, port, dbname)
db = self.get_connection(key, host, port, user, password, dbname)

# Clean up tags in case there was a None entry in the instance
# e.g. if the yaml contains tags: but no actual tags
@@ -353,12 +384,12 @@ def check(self, instance):
# preset tags to the database name
tags.extend(["db:%s" % dbname])

# Check version
version = self._get_version(key, db)
self.log.debug("Running check against version %s" % version)

# Collect metrics
try:
# Check version
db = self.get_connection(key, host, port, user, password, dbname)
version = self._get_version(key, db)
self.log.debug("Running check against version %s" % version)
self._collect_stats(key, db, tags, relations)
except ShouldRestartException:
self.log.info("Resetting the connection")
17 changes: 11 additions & 6 deletions tests/test_postgresql.py
Original file line number Diff line number Diff line change
@@ -28,15 +28,20 @@ def testChecks(self):

self.check.run()
metrics = self.check.get_metrics()
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.connections']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.dead_rows']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.live_rows']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.connections']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.dead_rows']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.live_rows']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.table_size']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.index_size']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.total_size']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.max_connections']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.percent_usage_connections']) >= 1, pprint(metrics))
# Don't test for locks
# self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.locks']) >= 1, pprint(metrics))
# Brittle tests
self.assertTrue(4 <= len(metrics) <= 6, metrics)
self.assertTrue(4 <= len([m for m in metrics if 'db:datadog_test' in str(m[3]['tags']) ]) <= 5, pprint(metrics))
self.assertTrue(len([m for m in metrics if 'table:persons' in str(m[3]['tags'])]) == 2, pprint(metrics))
# self.assertTrue(4 <= len(metrics) <= 6, metrics)
# self.assertTrue(4 <= len([m for m in metrics if 'db:datadog_test' in str(m[3]['tags']) ]) <= 5, pprint(metrics))
# self.assertTrue(len([m for m in metrics if 'table:persons' in str(m[3]['tags'])]) == 2, pprint(metrics))

# Rate metrics, need 2 collection rounds
time.sleep(1)