Skip to content

Commit

Permalink
decouple DBM query metrics interval from check run interval (#9658)
Browse files Browse the repository at this point in the history
* decouple DBM query metrics interval from check run interval

* decouple the DBM metrics collection interval from the check run interval
* set default DBM metrics collection interval to 10s
* change `statement_samples.collections_per_second` to `statement_samples.collection_interval` so it matches the new `statement_metrics.collection_interval` key

Depends on #9656

Motivation: being able to configure the DBM metrics collection interval separately from the check run interval enables us to use a 10 second interval (by default) for the query metrics. There are various difficulties when querying metrics that have a 15 second interval (i.e. ensuring a correct rollup window for varying time ranges) that don't exist with a 10 second interval.

* update config spec & model
  • Loading branch information
djova authored Jul 9, 2021
1 parent 5d4a2f1 commit 8a8aa22
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 159 deletions.
21 changes: 19 additions & 2 deletions mysql/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,23 @@ files:
type: boolean
example: false
display_default: false
- name: statement_metrics
description: Configure collection of statement metrics
options:
- name: enabled
description: |
Enable collection of statement metrics. Requires `dbm: true`.
value:
type: boolean
example: true
- name: collection_interval
description: |
Set the statement metric collection interval (in seconds). Each collection involves a single query to
`pg_stat_statements`. If a non-default value is chosen then that exact same value must be used for *every*
check instance. Running different instances with different collection intervals is not supported.
value:
type: number
example: 10
- name: statement_samples
description: Configure collection of statement samples
options:
Expand All @@ -254,9 +271,9 @@ files:
value:
type: boolean
example: true
- name: collections_per_second
- name: collection_interval
description: |
Sets the maximum statement sample collection rate. Each collection involves a single query to one
Sets the statement sample collection interval (in seconds). Each collection involves a single query to one
of the `performance_schema.events_statements_*` tables, followed by at most one `EXPLAIN` query per
unique statement seen.
value:
Expand Down
1 change: 1 addition & 0 deletions mysql/datadog_checks/mysql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, instance):
'full_statement_text_samples_per_hour_per_query', 1
)
self.statement_samples_config = instance.get('statement_samples', {}) or {}
self.statement_metrics_config = instance.get('statement_metrics', {}) or {}
self.min_collection_interval = instance.get('min_collection_interval', 15)
self.configuration_checks()

Expand Down
4 changes: 4 additions & 0 deletions mysql/datadog_checks/mysql/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ def instance_ssl(field, value):
return get_default_field_value(field, value)


def instance_statement_metrics(field, value):
return get_default_field_value(field, value)


def instance_statement_samples(field, value):
return get_default_field_value(field, value)

Expand Down
11 changes: 10 additions & 1 deletion mysql/datadog_checks/mysql/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,21 @@ class Config:
key: Optional[str]


class StatementMetrics(BaseModel):
class Config:
allow_mutation = False

collection_interval: Optional[float]
enabled: Optional[bool]


class StatementSamples(BaseModel):
class Config:
allow_mutation = False

collection_interval: Optional[float]
collection_strategy_cache_maxsize: Optional[int]
collection_strategy_cache_ttl: Optional[int]
collections_per_second: Optional[float]
enabled: Optional[bool]
events_statements_enable_procedure: Optional[str]
events_statements_row_limit: Optional[int]
Expand Down Expand Up @@ -86,6 +94,7 @@ class Config:
service: Optional[str]
sock: Optional[str]
ssl: Optional[Ssl]
statement_metrics: Optional[StatementMetrics]
statement_samples: Optional[StatementSamples]
tags: Optional[Sequence[str]]
use_global_custom_queries: Optional[str]
Expand Down
22 changes: 19 additions & 3 deletions mysql/datadog_checks/mysql/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ instances:
#
# dbm: false

## Configure collection of statement metrics
#
statement_metrics:

## @param enabled - boolean - optional - default: true
## Enable collection of statement metrics. Requires `dbm: true`.
#
# enabled: true

## @param collection_interval - number - optional - default: 10
## Set the statement metric collection interval (in seconds). Each collection involves a single query to
## `pg_stat_statements`. If a non-default value is chosen then that exact same value must be used for *every*
## check instance. Running different instances with different collection intervals is not supported.
#
# collection_interval: 10

## Configure collection of statement samples
#
statement_samples:
Expand All @@ -240,12 +256,12 @@ instances:
#
# enabled: true

## @param collections_per_second - number - optional - default: 1
## Sets the maximum statement sample collection rate. Each collection involves a single query to one
## @param collection_interval - number - optional - default: 1
## Sets the statement sample collection interval (in seconds). Each collection involves a single query to one
## of the `performance_schema.events_statements_*` tables, followed by at most one `EXPLAIN` query per
## unique statement seen.
#
# collections_per_second: 1
# collection_interval: 1

## @param explained_statements_per_hour_per_query - integer - optional - default: 60
## Sets the rate limit for how many execution plans will be collected per hour per normalized statement.
Expand Down
7 changes: 4 additions & 3 deletions mysql/datadog_checks/mysql/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, name, init_config, instances):
self.check_initializations.append(self._query_manager.compile_queries)
self.innodb_stats = InnoDBMetrics()
self.check_initializations.append(self._config.configuration_checks)
self._statement_metrics = MySQLStatementMetrics(self, self._config)
self._statement_metrics = MySQLStatementMetrics(self, self._config, self._get_connection_args())
self._statement_samples = MySQLStatementSamples(self, self._config, self._get_connection_args())

def execute_query_raw(self, query):
Expand Down Expand Up @@ -128,8 +128,8 @@ def check(self, _):
self._collect_system_metrics(self._config.host, db, tags)
if self._config.dbm_enabled:
dbm_tags = list(set(self.service_check_tags) | set(tags))
self._statement_metrics.collect_per_statement_metrics(db, dbm_tags)
self._statement_samples.run_sampler(dbm_tags)
self._statement_metrics.run_job_loop(dbm_tags)
self._statement_samples.run_job_loop(dbm_tags)

# keeping track of these:
self._put_qcache_stats()
Expand All @@ -145,6 +145,7 @@ def check(self, _):

def cancel(self):
self._statement_samples.cancel()
self._statement_metrics.cancel()

def _set_qcache_stats(self):
host_key = self._get_host_key()
Expand Down
142 changes: 40 additions & 102 deletions mysql/datadog_checks/mysql/statement_samples.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import logging
import os
import re
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor
from contextlib import closing
from enum import Enum

Expand All @@ -16,14 +12,8 @@
from ..stubs import datadog_agent

from datadog_checks.base import is_affirmative
from datadog_checks.base.log import get_check_logger
from datadog_checks.base.utils.db.sql import compute_exec_plan_signature, compute_sql_signature
from datadog_checks.base.utils.db.utils import (
ConstantRateLimiter,
RateLimitingTTLCache,
default_json_event_encoding,
resolve_db_host,
)
from datadog_checks.base.utils.db.utils import DBMAsyncJob, RateLimitingTTLCache, default_json_event_encoding
from datadog_checks.base.utils.serialization import json

SUPPORTED_EXPLAIN_STATEMENTS = frozenset({'select', 'table', 'delete', 'insert', 'replace', 'update', 'with'})
Expand All @@ -41,11 +31,11 @@
]

# default sampling settings for events_statements_* tables
# rate limit is in samples/second
# {table -> rate-limit}
DEFAULT_EVENTS_STATEMENTS_COLLECTIONS_PER_SECOND = {
'events_statements_history_long': 0.1,
'events_statements_history': 0.1,
# collection interval is in seconds
# {table -> interval}
DEFAULT_EVENTS_STATEMENTS_COLLECTION_INTERVAL = {
'events_statements_history_long': 10,
'events_statements_history': 10,
'events_statements_current': 1,
}

Expand Down Expand Up @@ -237,32 +227,35 @@ class DBExplainError(Enum):
statement_truncated = 'statement_truncated'


class MySQLStatementSamples(object):
class MySQLStatementSamples(DBMAsyncJob):
"""
Collects statement samples and execution plans.
"""

executor = ThreadPoolExecutor()

def __init__(self, check, config, connection_args):
self._check = check
collection_interval = float(config.statement_metrics_config.get('collection_interval', 1))
if collection_interval <= 0:
collection_interval = 1
super(MySQLStatementSamples, self).__init__(
check,
rate_limit=1 / collection_interval,
run_sync=is_affirmative(config.statement_samples_config.get('run_sync', False)),
enabled=is_affirmative(config.statement_samples_config.get('enabled', True)),
min_collection_interval=config.min_collection_interval,
config_host=config.host,
dbms="mysql",
expected_db_exceptions=(pymysql.err.DatabaseError,),
job_name="statement-samples",
shutdown_callback=self._close_db_conn,
)
self._config = config
self._version_processed = False
self._connection_args = connection_args
# checkpoint at zero so we pull the whole history table on the first run
self._checkpoint = 0
self._log = get_check_logger()
self._last_check_run = 0
self._db = None
self._tags = None
self._tags_str = None
self._collection_loop_future = None
self._cancel_event = threading.Event()
self._rate_limiter = ConstantRateLimiter(1)
self._config = config
self._db_hostname = resolve_db_host(self._config.host)
self._enabled = is_affirmative(self._config.statement_samples_config.get('enabled', True))
self._run_sync = is_affirmative(self._config.statement_samples_config.get('run_sync', False))
self._collections_per_second = self._config.statement_samples_config.get('collections_per_second', -1)
self._configured_collection_interval = self._config.statement_samples_config.get('collection_interval', -1)
self._events_statements_row_limit = self._config.statement_samples_config.get(
'events_statements_row_limit', 5000
)
Expand All @@ -280,14 +273,14 @@ def __init__(self, check, config, connection_args):
self._has_window_functions = False
events_statements_table = self._config.statement_samples_config.get('events_statements_table', None)
if events_statements_table:
if events_statements_table in DEFAULT_EVENTS_STATEMENTS_COLLECTIONS_PER_SECOND:
if events_statements_table in DEFAULT_EVENTS_STATEMENTS_COLLECTION_INTERVAL:
self._log.debug("Configured preferred events_statements_table: %s", events_statements_table)
self._preferred_events_statements_tables = [events_statements_table]
else:
self._log.warning(
"Invalid events_statements_table: %s. Must be one of %s. Falling back to trying all tables.",
events_statements_table,
', '.join(DEFAULT_EVENTS_STATEMENTS_COLLECTIONS_PER_SECOND.keys()),
', '.join(DEFAULT_EVENTS_STATEMENTS_COLLECTION_INTERVAL.keys()),
)
self._explain_strategies = {
'PROCEDURE': self._run_explain_procedure,
Expand Down Expand Up @@ -317,39 +310,14 @@ def _init_caches(self):
ttl=60 * 60 / self._config.statement_samples_config.get('samples_per_hour_per_query', 15),
)

def run_sampler(self, tags):
"""
start the sampler thread if not already running & update tag metadata
:param tags:
:return:
"""
if not self._enabled:
self._log.debug("Statement sampler not enabled")
return
self._tags = tags
self._tags_str = ','.join(tags)
def _read_version_info(self):
if not self._version_processed and self._check.version:
self._has_window_functions = self._check.version.version_compatible((8, 0, 0))
if self._check.version.flavor == "MariaDB" or not self._check.version.version_compatible((5, 7, 0)):
self._global_status_table = "information_schema.global_status"
else:
self._global_status_table = "performance_schema.global_status"
self._version_processed = True
self._last_check_run = time.time()
if self._run_sync or is_affirmative(os.environ.get('DBM_STATEMENT_SAMPLER_RUN_SYNC', "false")):
self._log.debug("Running statement sampler synchronously")
self._collect_statement_samples()
elif self._collection_loop_future is None or not self._collection_loop_future.running():
self._collection_loop_future = MySQLStatementSamples.executor.submit(self.collection_loop)
else:
self._log.debug("Statement sampler collection loop already running")

def cancel(self):
"""
Cancels the collection loop thread if it's running.
Returns immediately, leaving the thread to stop & clean up on its own time.
"""
self._cancel_event.set()

def _get_db_connection(self):
"""
Expand All @@ -370,39 +338,6 @@ def _close_db_conn(self):
finally:
self._db = None

def collection_loop(self):
try:
self._log.info("Starting statement sampler collection loop")
while True:
if self._cancel_event.isSet():
self._log.info("Collection loop cancelled")
self._check.count("dd.mysql.statement_samples.collection_loop_cancel", 1, tags=self._tags)
break
if time.time() - self._last_check_run > self._config.min_collection_interval * 2:
self._log.info("Stopping statement sampler collection loop due to check inactivity")
self._check.count("dd.mysql.statement_samples.collection_loop_inactive_stop", 1, tags=self._tags)
break
self._collect_statement_samples()
except pymysql.err.DatabaseError as e:
self._log.warning(
"Statement sampler database error: %s", e, exc_info=self._log.getEffectiveLevel() == logging.DEBUG
)
self._check.count(
"dd.mysql.statement_samples.error",
1,
tags=self._tags + ["error:collection-loop-database-error-{}".format(type(e))],
)
except Exception as e:
self._log.exception("Statement sampler collection loop crash")
self._check.count(
"dd.mysql.statement_samples.error",
1,
tags=self._tags + ["error:collection-loop-crash-{}".format(type(e))],
)
finally:
self._log.info("Shutting down statement sampler collection loop")
self._close_db_conn()

def _cursor_run(self, cursor, query, params=None, obfuscated_params=None):
"""
Run and log the query. If provided, obfuscated params are logged in place of the regular params.
Expand Down Expand Up @@ -658,30 +593,33 @@ def _get_sample_collection_strategy(self):
)
return None, None

rate_limit = self._collections_per_second
if rate_limit < 0:
rate_limit = DEFAULT_EVENTS_STATEMENTS_COLLECTIONS_PER_SECOND[events_statements_table]
collection_interval = self._configured_collection_interval
if collection_interval < 0:
collection_interval = DEFAULT_EVENTS_STATEMENTS_COLLECTION_INTERVAL[events_statements_table]

# cache only successful strategies
# should be short enough that we'll reflect updates relatively quickly
# i.e., an aurora replica becomes a master (or vice versa).
strategy = (events_statements_table, rate_limit)
strategy = (events_statements_table, collection_interval)
self._log.debug(
"Chose plan collection strategy: events_statements_table=%s, collections_per_second=%s",
"Chose plan collection strategy: events_statements_table=%s, collection_interval=%s",
events_statements_table,
rate_limit,
collection_interval,
)
self._collection_strategy_cache["plan_collection_strategy"] = strategy
return strategy

def run_job(self):
self._collect_statement_samples()

def _collect_statement_samples(self):
self._read_version_info()
self._log.debug("collecting statement samples")
self._rate_limiter.sleep()
events_statements_table, rate_limit = self._get_sample_collection_strategy()
events_statements_table, collection_interval = self._get_sample_collection_strategy()
if not events_statements_table:
return
if self._rate_limiter.rate_limit_s != rate_limit:
self._rate_limiter = ConstantRateLimiter(rate_limit)
self._set_rate_limit(1.0 / collection_interval)

start_time = time.time()

tags = self._tags + ["events_statements_table:{}".format(events_statements_table)]
Expand Down
Loading

0 comments on commit 8a8aa22

Please sign in to comment.