Skip to content

Commit

Permalink
Replace expected and actual query with single query (#1489)
Browse files Browse the repository at this point in the history
* WIP: Replace expected and actual query with single query

Instead of using two separate queries which the validator
will use to calculate the error metrics, replace with a single
promql query which will calculate and return the error metric to
the validator.

Signed-off-by: Kaiyi <[email protected]>

* Resolve Proportion Error with promql query

Resolved Division in promql query by introducing a new dummy label
and on function to allow different metrics with differing labels
to still be operands for prometheus operators like +, - , /.

Signed-off-by: Kaiyi <[email protected]>

* Convert job=vm into a vm_query parameter

Added vm_query parameter which can be replaced with
query for vm related labels.

Signed-off-by: Kaiyi <[email protected]>

* Remove Comments and Unnecessary Functions

Removed Commented code and unneeded functions

Signed-off-by: Kaiyi <[email protected]>

* Remove labelchange function for promql queries

Instead of using on() with labelchange, removed
labelchange function completely and used only on()

Signed-off-by: Kaiyi <[email protected]>

---------

Signed-off-by: Kaiyi <[email protected]>
  • Loading branch information
KaiyiLiu1234 authored Jun 6, 2024
1 parent 89ebf4e commit 07636b1
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 162 deletions.
21 changes: 5 additions & 16 deletions e2e/tools/validator/query.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
[
{
"expected_query": "rate(kepler_{level}_package_joules_total{{{query}, mode='dynamic'}}[{interval}])",
"actual_query": "rate(kepler_node_platform_joules_total[{interval}])"
},
{
"expected_query": "rate(kepler_{level}_platform_joules_total{{{query}, mode='dynamic'}}[{interval}])",
"actual_query": "rate(kepler_node_platform_joules_total[{interval}])"
},
{
"expected_query": "rate(kepler_{level}_bpf_cpu_time_ms_total{{{query}}}[{interval}])",
"actual_query": "sum by(__name__, job) (rate(kepler_process_bpf_cpu_time_ms_total[{interval}]))"
},
{
"expected_query": "rate(kepler_{level}_bpf_page_cache_hit_total{{{query}}}[{interval}])",
"actual_query": "sum by(__name__, job) (rate(kepler_process_bpf_page_cache_hit_total[{interval}]))"
}
"abs((rate(kepler_{level}_package_joules_total{{{query}, job='metal', mode='dynamic'}}[{interval}]) - on() rate(kepler_node_platform_joules_total{{{vm_query}}}[{interval}])) / on() rate(kepler_{level}_package_joules_total{{{query}, job='metal', mode='dynamic'}}[{interval}]))",
"abs((rate(kepler_{level}_platform_joules_total{{{query}, job='metal', mode='dynamic'}}[{interval}]) - on() rate(kepler_node_platform_joules_total{{{vm_query}}}[{interval}])) / on() rate(kepler_{level}_platform_joules_total{{{query}, job='metal', mode='dynamic'}}[{interval}]))"

]


8 changes: 3 additions & 5 deletions e2e/tools/validator/src/validator/cases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ def read_json_file(file_path):
# Raw Prometheus Queries, read all the query from the config file

class CaseResult(NamedTuple):
expected_query: str
actual_query: str
refined_query: str


class CasesResult(NamedTuple):
Expand All @@ -40,7 +39,7 @@ def __init__(self, vm: config.VM, prom: config.Prometheus, query_path: str) -> N
self.vm_name = vm.name
self.interval = prom.interval
self.raw_prom_queries = read_json_file(query_path)

self.vm_query = "job='vm'"
if self.vm_pid != 0:
self.query = f"pid='{{vm_pid}}'".format(vm_pid=self.vm_pid)
self.level = "process"
Expand All @@ -52,8 +51,7 @@ def load_test_cases(self) -> CasesResult:
test_cases = []
for raw_prom_query in self.raw_prom_queries:
test_cases.append(CaseResult(
expected_query=raw_prom_query["expected_query"].format(level=self.level, query=self.query, interval=self.interval),
actual_query=raw_prom_query["actual_query"].format(interval=self.interval)
refined_query=raw_prom_query.format(level=self.level, query=self.query, interval=self.interval, vm_query=self.vm_query)
))
return CasesResult(
test_cases=test_cases
Expand Down
60 changes: 9 additions & 51 deletions e2e/tools/validator/src/validator/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,69 +38,27 @@ def validator(ctx: click.Context, config_file: str):
)
@pass_config
def stress(cfg: Validator, script_path: str):
# PROM_QUERIES = {
# "vm_process_joules_total": {"name": "rate(kepler_process_package_joules_total)", "base_labels": {"job": "metal", "pid": "2093543"}},
# "platform_joules_vm": {"name": "kepler_node_platform_joules_total", "base_labels": {"job": "vm"}},
# # "platform_joules_vm_bm" : "kepler_vm_platform_joules_total{job='metal'}"
# }

remote = Remote(cfg.remote)
result = remote.run_script(script_path=script_path)

# from prometheus_api_client.utils import parse_datetime
# start_time=parse_datetime("2024-04-12 16:27:20.254648")
# end_time = parse_datetime("2024-04-12 16:28:00.466223")
click.echo(f"start_time: {result.start_time}, end_time: {result.end_time}")

# TODO: clean up
# expected_query_config = PROM_QUERIES["vm_process_joules_total"]
# expected_query_modified_labels = expected_query_config["base_labels"].copy()
# expected_query_modified_labels["pid"] = str(cfg.metal.vm.pid)
#expected_query = "kepler_process_package_joules_total{pid='2093543', job='metal'}"
#actual_query_config = PROM_QUERIES["platform_joules_vm"]


# expected_data, actual_data = compare_metrics(
# endpoint=cfg.prometheus.url,
# disable_ssl=True,
# start_time=result.start_time,
# end_time=result.end_time,
# expected_query=expected_query_config["name"],
# expected_query_labels=expected_query_modified_labels,
# actual_query=actual_query_config["name"],
# actual_query_labels=actual_query_config["base_labels"]
# )
# # NOTE: calc
# percentage_error = absolute_percentage_error(expected_data, actual_data)
# error = absolute_error(expected_data, actual_data)
# mae = mean_absolute_error(expected_data, actual_data)
# mape = mean_absolute_percentage_error(expected_data, actual_data)

test_cases = Cases(vm = cfg.metal.vm, prom = cfg.prometheus, query_path = cfg.query_path)
metrics_validator = MetricsValidator(cfg.prometheus)
test_case_result = test_cases.load_test_cases()
click.secho("Validation results during stress test:")
click.secho("Validation results during stress test:")
for test_case in test_case_result.test_cases:
expected_query = test_case.expected_query
actual_query = test_case.actual_query
print(f"expected_query: {expected_query}")
print(f"actual_query: {actual_query}")

query = test_case.refined_query

print(f"start_time: {result.start_time}, end_time: {result.end_time}")
metrics_res = metrics_validator.compare_metrics(result.start_time,
result.end_time,
expected_query,
actual_query)
query)

click.secho(f"Expected Query Name: {expected_query}", fg='bright_yellow')
click.secho(f"Actual Query Name: {actual_query}", fg='bright_yellow')
click.secho(f"Expected Query Name: {expected_query}", fg='bright_yellow')
click.secho(f"Actual Query Name: {actual_query}", fg='bright_yellow')
click.secho(f"Absolute Errors during stress test: {metrics_res.ae}", fg='green')
click.secho(f"Absolute Percentage Errors during stress test: {metrics_res.ape}", fg='green')
click.secho(f"Mean Absolute Error (MAE) during stress test: {metrics_res.mae}", fg="red")
click.secho(f"Mean Absolute Percentage Error (MAPE) during stress test: {metrics_res.mape}", fg="red")
click.secho(f"Mean Squared Error (MSE) during stress test: {metrics_res.rmse}", fg="blue")
click.secho(f"Query Name: {query}", fg='bright_white')
click.secho(f"Error List: {metrics_res.el}", fg='bright_red')
click.secho(f"Average Error: {metrics_res.me}", fg='bright_yellow')

click.secho("---------------------------------------------------", fg="cyan")



103 changes: 13 additions & 90 deletions e2e/tools/validator/src/validator/prometheus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,17 @@
import numpy as np
from datetime import datetime
from validator import config

from statistics import fmean

class MetricsValidatorResult(NamedTuple):
# mean absolute error
mae: float
# mean absolute percentage error
mape: float
# mean squared error
mse: float
# root mean squared error
rmse: float
# absolute error list
ae: List[float]
# absolute percentage error list
ape: List[float]
# error list
el: List[float]
# mean error
me: float


#TODO: Include Environment Variables if desired
class MetricsValidator:
# test with float
def __init__(self, prom: config.Prometheus):
self.prom_client = PrometheusConnect(prom.url, headers=None, disable_ssl=True)
self.step = prom.step
Expand All @@ -40,92 +31,24 @@ def custom_metric_query(self, start_time: datetime, end_time: datetime, query: s

def compare_metrics(self, start_time: datetime,
end_time: datetime,
expected_query: str,
actual_query: str,
query: str
) -> MetricsValidatorResult:

expected_metrics = self.custom_metric_query(start_time, end_time, expected_query)
actual_metrics = self.custom_metric_query(start_time, end_time, actual_query)

print(expected_metrics)
print(actual_metrics)
query_metrics = self.custom_metric_query(start_time, end_time, query)
cleaned_expected_metrics = retrieve_timestamp_value_metrics(query_metrics[0])

cleaned_expected_metrics = retrieve_timestamp_value_metrics(expected_metrics[0])
cleaned_actual_metrics = retrieve_timestamp_value_metrics(actual_metrics[0])

# remove timestamps that do not match
expected_data, actual_data = acquire_datapoints_with_common_timestamps(cleaned_expected_metrics,
cleaned_actual_metrics)
return MetricsValidatorResult(
mae=mean_absolute_error(expected_data, actual_data),
mape=mean_absolute_percentage_error(expected_data, actual_data),
mse=mean_squared_error(expected_data, actual_data),
rmse=root_mean_squared_error(expected_data, actual_data),
ae=absolute_error(expected_data, actual_data),
ape=absolute_percentage_error(expected_data, actual_data)
el=cleaned_expected_metrics,
me=round(fmean(cleaned_expected_metrics), 3)
)


def retrieve_timestamp_value_metrics(prom_query_response) -> List[List[Tuple[int, float]]]:

def retrieve_timestamp_value_metrics(prom_query_response) -> List[float]:
acquired_data = []
for element in prom_query_response['values']:
acquired_data.append([int(element[0]), float(element[1])])
acquired_data.append(float(element[1]))
return acquired_data


def acquire_datapoints_with_common_timestamps(prom_data_list_one, prom_data_list_two) -> Tuple[list, list]:
common_timestamps = [datapoint[0] for datapoint in prom_data_list_one
if datapoint[0] in [datapoint[0] for datapoint in prom_data_list_two]]
# necessary to sort timestamps?
common_timestamps.sort()
list_one_metrics = []
list_two_metrics = []
for timestamp in common_timestamps:
for list_one_datapoint in prom_data_list_one:
if list_one_datapoint[0] == timestamp:
list_one_metrics.append(list_one_datapoint[1])
for list_two_datapoint in prom_data_list_two:
if list_two_datapoint[0] == timestamp:
list_two_metrics.append(list_two_datapoint[1])
return list_one_metrics, list_two_metrics


def absolute_percentage_error(expected_data, actual_data) -> List[float]:
expected_data = np.array(expected_data)
actual_data = np.array(actual_data)

absolute_percentage_error = np.abs((expected_data - actual_data) / expected_data) * 100
return absolute_percentage_error.tolist()


def absolute_error(expected_data, actual_data) -> List[float]:
expected_data = np.array(expected_data)
actual_data = np.array(actual_data)

absolute_error = np.abs(expected_data - actual_data)
return absolute_error.tolist()


def mean_absolute_error(expected_data, actual_data) -> float:
abs_error_ndarray = np.array(absolute_error(expected_data, actual_data))
return np.mean(abs_error_ndarray).tolist()


def mean_absolute_percentage_error(expected_data, actual_data) -> float:
abs_percentage_error_ndarray = np.array(absolute_percentage_error(expected_data, actual_data))
return np.mean(abs_percentage_error_ndarray).tolist()


def mean_squared_error(expected_data, actual_data) -> float:
abs_error_ndarray = np.array(absolute_error(expected_data, actual_data))
return np.mean(np.square(abs_error_ndarray)).tolist()


def root_mean_squared_error(expected_data, actual_data) -> float:
mean_squared_error_ndarray = np.array(mean_squared_error(expected_data, actual_data))
return np.sqrt(mean_squared_error_ndarray).tolist()


# if __name__ == "__main__":
# prom_metrics_validator = PromMetricsValidator("http://localhost:9091")
# start_datetime = datetime.strptime("2024-04-10 19:17:53.882176", '%Y-%m-%d %H:%M:%S.%f')
Expand Down

0 comments on commit 07636b1

Please sign in to comment.