Skip to content

Commit

Permalink
Send database monitoring query metrics to new intake (#9222)
Browse files Browse the repository at this point in the history
* postgres: send metrics to new intake as events

Send DBM query metrics via the new aggregator API added in #9165.

* lazy resolve hostname
  • Loading branch information
djova authored May 7, 2021
1 parent d4f422c commit 26d90d6
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 256 deletions.
9 changes: 2 additions & 7 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, name, init_config, instances):
)
self._config = PostgresConfig(self.instance)
self.metrics_cache = PostgresMetricsCache(self._config)
self.statement_metrics = PostgresStatementMetrics(self._config)
self.statement_metrics = PostgresStatementMetrics(self, self._config)
self.statement_samples = PostgresStatementSamples(self, self._config)
self._clean_state()

Expand Down Expand Up @@ -430,11 +430,6 @@ def _collect_custom_queries(self, tags):
metric, value, method = info
getattr(self, method)(metric, value, tags=set(query_tags))

def _collect_per_statement_metrics(self, tags):
metrics = self.statement_metrics.collect_per_statement_metrics(self.db)
for metric_name, metric_value, metrics_tags in metrics:
self.count(metric_name, metric_value, tags=list(set(metrics_tags + tags)))

def check(self, _):
tags = copy.copy(self._config.tags)
# Collect metrics
Expand All @@ -447,7 +442,7 @@ def check(self, _):
self._collect_stats(tags)
self._collect_custom_queries(tags)
if self._config.deep_database_monitoring:
self._collect_per_statement_metrics(tags)
self.statement_metrics.collect_per_statement_metrics(self.db, tags)
self.statement_samples.run_sampler(tags)

except Exception as e:
Expand Down
217 changes: 79 additions & 138 deletions postgres/datadog_checks/postgres/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
from __future__ import unicode_literals

import copy
import time

import psycopg2
import psycopg2.extras

from datadog_checks.base.log import get_check_logger
from datadog_checks.base.utils.db.sql import compute_sql_signature, normalize_query_tag
from datadog_checks.base.utils.db.statement_metrics import StatementMetrics, apply_row_limits

from .util import milliseconds_to_nanoseconds
from datadog_checks.base.utils.db.sql import compute_sql_signature
from datadog_checks.base.utils.db.statement_metrics import StatementMetrics
from datadog_checks.base.utils.db.utils import default_json_event_encoding, resolve_db_host
from datadog_checks.base.utils.serialization import json

try:
import datadog_agent
except ImportError:
from ..stubs import datadog_agent


STATEMENTS_QUERY = """
SELECT {cols}
FROM {pg_stat_statements_view} as pg_stat_statements
Expand All @@ -37,88 +37,55 @@
# Required columns for the check to run
PG_STAT_STATEMENTS_REQUIRED_COLUMNS = frozenset({'calls', 'query', 'total_time', 'rows'})

PG_STAT_STATEMENTS_OPTIONAL_COLUMNS = frozenset({'queryid'})
PG_STAT_STATEMENTS_METRICS_COLUMNS = frozenset(
{
'calls',
'total_time',
'rows',
'shared_blks_hit',
'shared_blks_read',
'shared_blks_dirtied',
'shared_blks_written',
'local_blks_hit',
'local_blks_read',
'local_blks_dirtied',
'local_blks_written',
'temp_blks_read',
'temp_blks_written',
}
)

PG_STAT_STATEMENTS_TAG_COLUMNS = frozenset(
{
'datname',
'rolname',
'query',
}
)

# Monotonically increasing count columns to be converted to metrics
PG_STAT_STATEMENTS_METRIC_COLUMNS = {
'calls': 'postgresql.queries.count',
'total_time': 'postgresql.queries.time',
'rows': 'postgresql.queries.rows',
'shared_blks_hit': 'postgresql.queries.shared_blks_hit',
'shared_blks_read': 'postgresql.queries.shared_blks_read',
'shared_blks_dirtied': 'postgresql.queries.shared_blks_dirtied',
'shared_blks_written': 'postgresql.queries.shared_blks_written',
'local_blks_hit': 'postgresql.queries.local_blks_hit',
'local_blks_read': 'postgresql.queries.local_blks_read',
'local_blks_dirtied': 'postgresql.queries.local_blks_dirtied',
'local_blks_written': 'postgresql.queries.local_blks_written',
'temp_blks_read': 'postgresql.queries.temp_blks_read',
'temp_blks_written': 'postgresql.queries.temp_blks_written',
}

# Columns to apply as tags
PG_STAT_STATEMENTS_TAG_COLUMNS = {
'datname': 'db',
'rolname': 'user',
'query': 'query',
}

# These limits define the top K and bottom K unique query rows for each metric. For each check run the
# max metrics sent will be sum of all numbers below (in practice, much less due to overlap in rows).
DEFAULT_STATEMENT_METRICS_LIMITS = {
'calls': (400, 0),
'total_time': (400, 0),
'rows': (400, 0),
'shared_blks_hit': (50, 0),
'shared_blks_read': (50, 0),
'shared_blks_dirtied': (50, 0),
'shared_blks_written': (50, 0),
'local_blks_hit': (50, 0),
'local_blks_read': (50, 0),
'local_blks_dirtied': (50, 0),
'local_blks_written': (50, 0),
'temp_blks_read': (50, 0),
'temp_blks_written': (50, 0),
# Synthetic column limits
'avg_time': (400, 0),
'shared_blks_ratio': (0, 50),
}


def generate_synthetic_rows(rows):
"""
Given a list of rows, generate a new list of rows with "synthetic" column values derived from
the existing row values.
"""
synthetic_rows = []
for row in rows:
new = copy.copy(row)
new['avg_time'] = float(new['total_time']) / new['calls'] if new['calls'] > 0 else 0
new['shared_blks_ratio'] = (
float(new['shared_blks_hit']) / (new['shared_blks_hit'] + new['shared_blks_read'])
if new['shared_blks_hit'] + new['shared_blks_read'] > 0
else 0
)

synthetic_rows.append(new)
PG_STAT_STATEMENTS_OPTIONAL_COLUMNS = frozenset({'queryid'})

return synthetic_rows
PG_STAT_ALL_DESIRED_COLUMNS = (
PG_STAT_STATEMENTS_METRICS_COLUMNS | PG_STAT_STATEMENTS_TAG_COLUMNS | PG_STAT_STATEMENTS_OPTIONAL_COLUMNS
)


class PostgresStatementMetrics(object):
"""Collects telemetry for SQL statements"""

def __init__(self, config):
self.config = config
self.log = get_check_logger()
def __init__(self, check, config):
self._check = check
self._config = config
self._db_hostname = None
self._log = get_check_logger()
self._state = StatementMetrics()

def _execute_query(self, cursor, query, params=()):
try:
cursor.execute(query, params)
return cursor.fetchall()
except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e:
self.log.warning('Statement-level metrics are unavailable: %s', e)
self._log.warning('Statement-level metrics are unavailable: %s', e)
return []

def _get_pg_stat_statements_columns(self, db):
Expand All @@ -130,98 +97,72 @@ def _get_pg_stat_statements_columns(self, db):
# Querying over '*' with limit 0 allows fetching only the column names from the cursor without data
query = STATEMENTS_QUERY.format(
cols='*',
pg_stat_statements_view=self.config.pg_stat_statements_view,
pg_stat_statements_view=self._config.pg_stat_statements_view,
limit=0,
)
cursor = db.cursor()
self._execute_query(cursor, query, params=(self.config.dbname,))
self._execute_query(cursor, query, params=(self._config.dbname,))
colnames = [desc[0] for desc in cursor.description]
return colnames

def collect_per_statement_metrics(self, db):
def _db_hostname_cached(self):
if self._db_hostname:
return self._db_hostname
self._db_hostname = resolve_db_host(self._config.host)
return self._db_hostname

def collect_per_statement_metrics(self, db, tags):
try:
return self._collect_per_statement_metrics(db)
rows = self._collect_metrics_rows(db)
if not rows:
return
payload = {
'host': self._db_hostname_cached(),
'timestamp': time.time() * 1000,
'min_collection_interval': self._config.min_collection_interval,
'tags': tags,
'postgres_rows': rows,
}
self._check.database_monitoring_query_metrics(json.dumps(payload, default=default_json_event_encoding))
except Exception:
db.rollback()
self.log.exception('Unable to collect statement metrics due to an error')
self._log.exception('Unable to collect statement metrics due to an error')
return []

def _collect_per_statement_metrics(self, db):
metrics = []

available_columns = self._get_pg_stat_statements_columns(db)
missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - set(available_columns)
def _load_pg_stat_statements(self, db):
available_columns = set(self._get_pg_stat_statements_columns(db))
missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - available_columns
if len(missing_columns) > 0:
self.log.warning(
self._log.warning(
'Unable to collect statement metrics because required fields are unavailable: %s',
', '.join(list(missing_columns)),
)
return metrics
return []

desired_columns = (
list(PG_STAT_STATEMENTS_METRIC_COLUMNS.keys())
+ list(PG_STAT_STATEMENTS_OPTIONAL_COLUMNS)
+ list(PG_STAT_STATEMENTS_TAG_COLUMNS.keys())
)
query_columns = list(set(desired_columns) & set(available_columns) | set(PG_STAT_STATEMENTS_TAG_COLUMNS.keys()))
rows = self._execute_query(
# sort to ensure consistent column order
query_columns = sorted(list(PG_STAT_ALL_DESIRED_COLUMNS & available_columns))

return self._execute_query(
db.cursor(cursor_factory=psycopg2.extras.DictCursor),
STATEMENTS_QUERY.format(
cols=', '.join(query_columns),
pg_stat_statements_view=self.config.pg_stat_statements_view,
pg_stat_statements_view=self._config.pg_stat_statements_view,
limit=DEFAULT_STATEMENTS_LIMIT,
),
params=(self.config.dbname,),
params=(self._config.dbname,),
)
if not rows:
return metrics

def _collect_metrics_rows(self, db):
rows = self._load_pg_stat_statements(db)

def row_keyfunc(row):
return (row['query_signature'], row['datname'], row['rolname'])

rows = self._normalize_queries(rows)
rows = self._state.compute_derivative_rows(rows, PG_STAT_STATEMENTS_METRIC_COLUMNS.keys(), key=row_keyfunc)
metrics.append(('dd.postgres.queries.query_rows_raw', len(rows), []))

rows = generate_synthetic_rows(rows)
rows = apply_row_limits(
rows,
self.config.statement_metrics_limits or DEFAULT_STATEMENT_METRICS_LIMITS,
tiebreaker_metric='calls',
tiebreaker_reverse=True,
key=row_keyfunc,
)
metrics.append(('dd.postgres.queries.query_rows_limited', len(rows), []))
rows = self._state.compute_derivative_rows(rows, PG_STAT_STATEMENTS_METRICS_COLUMNS, key=row_keyfunc)
self._check.gauge('dd.postgres.queries.query_rows_raw', len(rows))

for row in rows:
# All "Deep Database Monitoring" statement-level metrics are tagged with a `query_signature`
# which uniquely identifies the normalized query family. Where possible, this hash should
# match the hash of APM "resources" (https://docs.datadoghq.com/tracing/visualization/resource/)
# when the resource is a SQL query. Postgres' query normalization in the `pg_stat_statements` table
# preserves most of the original query, so we tag the `resource_hash` with the same value as the
# `query_signature`. The `resource_hash` tag should match the *actual* APM resource hash most of
# the time, but not always. So this is a best-effort approach to link these metrics to APM metrics.
tags = ['query_signature:' + row['query_signature'], 'resource_hash:' + row['query_signature']]

for column, tag_name in PG_STAT_STATEMENTS_TAG_COLUMNS.items():
if column not in row:
continue
value = row[column]
if column == 'query':
value = normalize_query_tag(row['query'])
tags.append('{tag_name}:{value}'.format(tag_name=tag_name, value=value))

for column, metric_name in PG_STAT_STATEMENTS_METRIC_COLUMNS.items():
if column not in row:
continue
value = row[column]
if column == 'total_time':
# All "Deep Database Monitoring" timing metrics are in nanoseconds
# Postgres tracks pg_stat* timing stats in milliseconds
value = milliseconds_to_nanoseconds(value)
metrics.append((metric_name, value, tags))

return metrics
return rows

def _normalize_queries(self, rows):
normalized_rows = []
Expand All @@ -230,7 +171,7 @@ def _normalize_queries(self, rows):
try:
obfuscated_statement = datadog_agent.obfuscate_sql(row['query'])
except Exception as e:
self.log.warning("Failed to obfuscate query '%s': %s", row['query'], e)
self._log.debug("Failed to obfuscate query '%s': %s", row['query'], e)
continue

normalized_row['query'] = obfuscated_statement
Expand Down
Loading

0 comments on commit 26d90d6

Please sign in to comment.