Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SVLS-4999] Add Lambda tags to metrics sent via the API #501

Merged
merged 5 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions datadog_lambda/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
import logging
import ujson as json
from datetime import datetime, timedelta

from datadog_lambda.extension import should_use_extension
from datadog_lambda.tags import get_enhanced_metrics_tags, dd_lambda_layer_tag
Expand Down Expand Up @@ -61,6 +62,16 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
if should_use_extension and timestamp is not None:
# The extension does not support timestamps for distributions so we create a
# a thread stats writer to submit metrics with timestamps to the API
timestamp_ceiling = int(
(datetime.now() - timedelta(hours=4)).timestamp()
) # 4 hours ago
if timestamp_ceiling > timestamp:
logger.warning(
"Timestamp %s is older than 4 hours, not submitting metric %s",
timestamp,
metric_name,
)
return
global extension_thread_stats
if extension_thread_stats is None:
from datadog_lambda.thread_stats_writer import ThreadStatsWriter
Expand Down Expand Up @@ -108,11 +119,19 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
)


def flush_stats():
def flush_stats(lambda_context=None):
lambda_stats.flush()

if extension_thread_stats is not None:
extension_thread_stats.flush()
if lambda_context is not None:
tags = get_enhanced_metrics_tags(lambda_context)
split_arn = lambda_context.invoked_function_arn.split(":")
if len(split_arn) > 7:
# Get rid of the alias
split_arn.pop()
arn = ":".join(split_arn)
tags.append("function_arn:" + arn)
extension_thread_stats.flush(tags)


def submit_enhanced_metric(metric_name, lambda_context):
Expand Down
4 changes: 3 additions & 1 deletion datadog_lambda/thread_stats_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ def distribution(self, metric_name, value, tags=[], timestamp=None):
metric_name, value, tags=tags, timestamp=timestamp
)

def flush(self):
def flush(self, tags=None):
""" "Flush distributions from ThreadStats to Datadog.
Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
to gain better control over exception handling.
"""
if tags:
self.thread_stats.constant_tags = self.thread_stats.constant_tags + tags
_, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf"))
count_dists = len(dists)
if not count_dists:
Expand Down
2 changes: 1 addition & 1 deletion datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def _after(self, event, context):
logger.debug("Failed to create cold start spans. %s", e)

if not self.flush_to_log or should_use_extension:
flush_stats()
flush_stats(context)
if should_use_extension and self.local_testing_mode:
# when testing locally, the extension does not know when an
# invocation completes because it does not have access to the
Expand Down
32 changes: 29 additions & 3 deletions tests/test_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from botocore.exceptions import ClientError as BotocoreClientError
from datadog.api.exceptions import ClientError

from datetime import datetime, timedelta

from datadog_lambda.metric import lambda_metric
from datadog_lambda.api import decrypt_kms_api_key, KMS_ENCRYPTION_CONTEXT_KEY
Expand Down Expand Up @@ -49,12 +49,28 @@ def test_lambda_metric_timestamp_with_extension(self):
self.mock_metric_extension_thread_stats = patcher.start()
self.addCleanup(patcher.stop)

lambda_metric("test_timestamp", 1, 123)
delta = timedelta(minutes=1)
timestamp = int((datetime.now() - delta).timestamp())

lambda_metric("test_timestamp", 1, timestamp)
self.mock_metric_lambda_stats.distribution.assert_not_called()
self.mock_metric_extension_thread_stats.distribution.assert_called_with(
"test_timestamp", 1, timestamp=123, tags=[dd_lambda_layer_tag]
"test_timestamp", 1, timestamp=timestamp, tags=[dd_lambda_layer_tag]
)

@patch("datadog_lambda.metric.should_use_extension", True)
def test_lambda_metric_invalid_timestamp_with_extension(self):
patcher = patch("datadog_lambda.metric.extension_thread_stats")
self.mock_metric_extension_thread_stats = patcher.start()
self.addCleanup(patcher.stop)

delta = timedelta(hours=5)
timestamp = int((datetime.now() - delta).timestamp())

lambda_metric("test_timestamp", 1, timestamp)
self.mock_metric_lambda_stats.distribution.assert_not_called()
self.mock_metric_extension_thread_stats.distribution.assert_not_called()

def test_lambda_metric_flush_to_log(self):
os.environ["DD_FLUSH_TO_LOG"] = "True"

Expand Down Expand Up @@ -84,6 +100,16 @@ def test_retry_on_remote_disconnected(self):
lambda_stats.flush()
self.assertEqual(self.mock_threadstats_flush_distributions.call_count, 2)

def test_flush_stats_with_tags(self):
lambda_stats = ThreadStatsWriter(True)
tags = ["tag1:value1", "tag2:value2"]
lambda_stats.flush(tags)
self.mock_threadstats_flush_distributions.assert_called_once_with(
lambda_stats.thread_stats._get_aggregate_metrics_and_dists(float("inf"))[1]
)
for tag in tags:
self.assertTrue(tag in lambda_stats.thread_stats.constant_tags)


MOCK_FUNCTION_NAME = "myFunction"

Expand Down
Loading