From 46d7860f0ebd08f619336a7c20223f60f9027a2b Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Mon, 30 Dec 2024 01:42:02 +0200 Subject: [PATCH 1/8] [Core] Add metrics to monitor integration performance 1. Add classes to encapsulate metrics data. 2. Add class to encapsulate metric modification. 3. Add Metric event into `event.py`. 4. Add metric helper decorator to provide context with phase and measure time. 5. Add metric wrapper for async generators to measure time. 6. Add increment metric calls to measure stats. 7. Add tests to validate how many entities were extraced and processed. 8. Add test to compare performance of two branches. --- port_ocean/clients/port/mixins/entities.py | 58 +++++- port_ocean/context/event.py | 26 ++- .../core/handlers/entity_processor/base.py | 38 +++- .../core/integrations/mixins/sync_raw.py | 8 +- port_ocean/helpers/metric.py | 172 ++++++++++++++++++ port_ocean/helpers/retry.py | 42 +++++ .../core/handlers/mixins/test_sync_raw.py | 16 +- port_ocean/tests/test_metric.py | 29 +++ scripts/run-compare-diff-pef.sh | 108 +++++++++++ scripts/run-local-perf-test.sh | 6 +- scripts/run-metric-test.sh | 17 ++ 11 files changed, 503 insertions(+), 17 deletions(-) create mode 100644 port_ocean/helpers/metric.py create mode 100644 port_ocean/tests/test_metric.py create mode 100755 scripts/run-compare-diff-pef.sh create mode 100755 scripts/run-metric-test.sh diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index f33ab737da..e39adc6009 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -11,7 +11,10 @@ handle_status_code, PORT_HTTP_MAX_CONNECTIONS_LIMIT, ) -from port_ocean.core.models import Entity, PortAPIErrorMessage +from port_ocean.context import event +from port_ocean.core.models import Entity +from port_ocean.helpers.metric import MetricFieldType, MetricType, metric +from port_ocean.core.models import PortAPIErrorMessage from starlette import status @@ -72,6 +75,13 @@ async def upsert_entity( extensions={"retryable": True}, ) if response.is_error: + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.ERROR_COUNT + ) + if event.event._metric_aggregator + else None + ) logger.error( f"Error {'Validating' if validation_only else 'Upserting'} " f"entity: {entity.identifier} of " @@ -96,6 +106,14 @@ async def upsert_entity( # Happens when upsert fails and search identifier is defined. # We return None to ignore the entity later in the delete process if result_entity.is_using_search_identifier: + if not response.is_error: + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.ERROR_COUNT + ) + if event.event._metric_aggregator + else None + ) return None # In order to save memory we'll keep only the identifier, blueprint and relations of the @@ -112,8 +130,24 @@ async def upsert_entity( for key, relation in result_entity.relations.items() } + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.OBJECT_COUNT + ) + if event.event._metric_aggregator + else None + ) + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.UPSERTED + ) + if event.event._metric_aggregator + else None + ) + return reduced_entity + @metric(MetricType.LOAD) async def batch_upsert_entities( self, entities: list[Entity], @@ -145,6 +179,7 @@ async def batch_upsert_entities( return entity_results + @metric(MetricType.LOAD) async def delete_entity( self, entity: Entity, @@ -167,6 +202,13 @@ async def delete_entity( ) if response.is_error: + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.ERROR_COUNT + ) + if event.event._metric_aggregator + else None + ) if response.status_code == 404: logger.info( f"Failed to delete entity: {entity.identifier} of blueprint: {entity.blueprint}," @@ -180,6 +222,20 @@ async def delete_entity( ) handle_status_code(response, should_raise) + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.OBJECT_COUNT + ) + if event.event._metric_aggregator + else None + ) + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.DELETED + ) + if event.event._metric_aggregator + else None + ) async def batch_delete_entities( self, diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 5f8b2b37d5..be983c3dea 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -14,6 +14,7 @@ from uuid import uuid4 from loguru import logger +from port_ocean.helpers.metric import MetricAggregator from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter from pydispatch import dispatcher # type: ignore from werkzeug.local import LocalStack, LocalProxy @@ -40,6 +41,7 @@ class EventType: START = "start" RESYNC = "resync" HTTP_REQUEST = "http_request" + METRIC = "metric" @dataclass @@ -52,6 +54,7 @@ class EventContext: _parent_event: Optional["EventContext"] = None _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) + _metric_aggregator: Optional["MetricAggregator"] = None entity_topological_sorter: EntityTopologicalSorter = field( default_factory=EntityTopologicalSorter ) @@ -141,6 +144,13 @@ async def event_context( ) attributes = {**parent_attributes, **(attributes or {})} + + aggregator = ( + parent._metric_aggregator + if parent and parent._metric_aggregator + else MetricAggregator() + ) + new_event = EventContext( event_type, trigger_type=trigger_type, @@ -148,6 +158,7 @@ async def event_context( _parent_event=parent, # inherit port app config from parent event, so it can be used in nested events _port_app_config=parent.port_app_config if parent else None, + _metric_aggregator=aggregator, entity_topological_sorter=entity_topological_sorter, ) _event_context_stack.push(new_event) @@ -163,6 +174,7 @@ def _handle_event(triggering_event_id: int) -> None: dispatcher.connect(_handle_event, event_type) dispatcher.send(event_type, triggering_event_id=event.id) + is_silent = EventType.METRIC == event_type start_time = get_time(seconds_precision=False) with logger.contextualize( event_trigger_type=event.trigger_type, @@ -173,7 +185,7 @@ def _handle_event(triggering_event_id: int) -> None: event.resource_config.kind if event.resource_config else None ), ): - logger.info("Event started") + logger.info("Event started") if not is_silent else None try: yield event except: @@ -184,10 +196,14 @@ def _handle_event(triggering_event_id: int) -> None: finally: end_time = get_time(seconds_precision=False) time_elapsed = round(end_time - start_time, 5) - logger.bind( - success=success, - time_elapsed=time_elapsed, - ).info("Event finished") + ( + logger.bind( + success=success, + time_elapsed=time_elapsed, + ).info("Event finished") + if not is_silent + else None + ) dispatcher.disconnect(_handle_event, event_type) diff --git a/port_ocean/core/handlers/entity_processor/base.py b/port_ocean/core/handlers/entity_processor/base.py index 784d29da03..ef5f138f7c 100644 --- a/port_ocean/core/handlers/entity_processor/base.py +++ b/port_ocean/core/handlers/entity_processor/base.py @@ -2,6 +2,7 @@ from loguru import logger +from port_ocean.context import event from port_ocean.core.handlers.base import BaseHandler from port_ocean.core.handlers.port_app_config.models import ResourceConfig from port_ocean.core.ocean_types import ( @@ -9,6 +10,7 @@ CalculationResult, EntitySelectorDiff, ) +from port_ocean.helpers.metric import MetricFieldType, MetricType, metric class BaseEntityProcessor(BaseHandler): @@ -30,6 +32,7 @@ async def _parse_items( ) -> CalculationResult: pass + @metric(MetricType.TRANSFORM) async def parse_items( self, mapping: ResourceConfig, @@ -51,7 +54,38 @@ async def parse_items( with logger.contextualize(kind=mapping.kind): if not raw_data: return CalculationResult(EntitySelectorDiff([], []), []) - - return await self._parse_items( + result = await self._parse_items( mapping, raw_data, parse_all, send_raw_data_examples_amount ) + + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.INPUT_COUNT, len(raw_data) + ) + if event.event._metric_aggregator + else None + ) + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.OBJECT_COUNT, + len(result.entity_selector_diff.passed), + ) + if event.event._metric_aggregator + else None + ) + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.FAILED, len(result.entity_selector_diff.failed) + ) + if event.event._metric_aggregator + else None + ) + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.ERROR_COUNT, len(result.errors) + ) + if event.event._metric_aggregator + else None + ) + + return result diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index f46187a56b..ee4b65cb8f 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -30,6 +30,7 @@ ) from port_ocean.core.utils.utils import zip_and_sum, gather_and_split_errors_from_results from port_ocean.exceptions.core import OceanAbortException +from port_ocean.helpers.metric import MetricFieldType, MetricType, metric, timed_generator SEND_RAW_DATA_EXAMPLES_AMOUNT = 5 @@ -51,6 +52,7 @@ def __init__(self) -> None: async def _on_resync(self, kind: str) -> RAW_RESULT: raise NotImplementedError("on_resync must be implemented") + @metric(MetricType.EXTRACT) async def _get_resource_raw_results( self, resource_config: ResourceConfig ) -> tuple[RESYNC_RESULT, list[Exception]]: @@ -84,6 +86,7 @@ def _collect_resync_functions( return fns + @metric(MetricType.EXTRACT) async def _execute_resync_tasks( self, fns: list[Callable[[str], Awaitable[RAW_RESULT]]], @@ -129,7 +132,6 @@ async def _calculate_raw( for mapping, results in raw_diff ) ) - async def _register_resource_raw( self, resource: ResourceConfig, @@ -196,7 +198,7 @@ async def _register_in_batches( for generator in async_generators: try: - async for items in generator: + async for items in timed_generator(generator): if send_raw_data_examples_amount > 0: send_raw_data_examples_amount = max( 0, send_raw_data_examples_amount - len(passed_entities) @@ -472,6 +474,7 @@ async def sync_raw_all( creation_results.append(await task) await self.sort_and_upsert_failed_entities(user_agent_type) + except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise @@ -500,6 +503,7 @@ async def sync_raw_all( logger.error(message, exc_info=error_group) else: + await event._metric_aggregator.flush() if event._metric_aggregator else print("WHAT`") logger.info( f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}" ) diff --git a/port_ocean/helpers/metric.py b/port_ocean/helpers/metric.py new file mode 100644 index 0000000000..ae9bc57126 --- /dev/null +++ b/port_ocean/helpers/metric.py @@ -0,0 +1,172 @@ +import asyncio +import json +import time +from functools import wraps +from dataclasses import asdict, dataclass, field +from typing import Any, Callable + +from loguru import logger +from port_ocean.context import resource +import port_ocean.context.event +from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE + + +@dataclass +class BaseStats: + error_count: int = 0 + duration: float = 0.0 + object_count: int = 0 + + +@dataclass +class ApiStats(BaseStats): + rate_limit_wait: float = 0.0 + requests: dict[str, int] = field(default_factory=dict) + + +@dataclass +class ExtractStats(ApiStats): + pass + + +@dataclass +class TransformStats(BaseStats): + failed_count: int = 0 + input_count: int = 0 + + +@dataclass +class LoadStats(ApiStats): + upserted: int = 0 + deleted: int = 0 + + +@dataclass +class MetricsData: + extract: ExtractStats = field(default_factory=ExtractStats) + transform: TransformStats = field(default_factory=TransformStats) + load: LoadStats = field(default_factory=LoadStats) + + +@dataclass +class KindsMetricsData: + metrics: dict[str, MetricsData] = field(default_factory=dict) + + +class MetricFieldType: + UPSERTED = "upserted" + DELETED = "deleted" + FAILED = "failed_count" + REQUEST = "requests" + RATE_LIMIT = "rate_limit_wait" + OBJECT_COUNT = "object_count" + DURATION = "duration" + ERROR_COUNT = "error_count" + INPUT_COUNT = "input_count" + + +class MetricType: + EXTRACT = "extract" + TRANSFORM = "transform" + LOAD = "load" + + +class MetricAggregator: + def __init__(self) -> None: + self._lock = asyncio.Lock() + self.metrics: KindsMetricsData = KindsMetricsData() + + async def get_metrics(self) -> dict[str, MetricsData]: + return self.metrics.metrics + + async def get_metric(self) -> TransformStats | LoadStats | ExtractStats | None: + phase = port_ocean.context.event.event.attributes.get("phase", None) + if not phase: + return None + + metric = self.metrics.metrics.get(resource.resource.kind) + if not metric: + async with self._lock: + self.metrics.metrics[resource.resource.kind] = MetricsData() + + return getattr(self.metrics.metrics.get(resource.resource.kind), phase) + + async def increment_field(self, field: str, amount: int | float = 1) -> None: + metric = await self.get_metric() + async with self._lock: + val = getattr(metric, field) + metric.__setattr__(field, val + amount) + + async def increment_status(self, status_code: str) -> None: + metric = await self.get_metric() + if not isinstance(metric, ApiStats): + return + async with self._lock: + status = metric.requests.get(status_code) + if not status: + metric.requests[status_code] = 0 + metric.requests[status_code] = metric.requests.get(status_code, 0) + 1 + + async def flush(self) -> None: + async with self._lock: + metric_dict = asdict(self.metrics) + logger.info(f"integration metrics {json.dumps(metric_dict)}") + await self.reset() + + async def reset(self) -> None: + async with self._lock: + self.metrics = KindsMetricsData() + + +async def timed_generator( + generator: ASYNC_GENERATOR_RESYNC_TYPE, +) -> ASYNC_GENERATOR_RESYNC_TYPE: + async with port_ocean.context.event.event_context( + port_ocean.context.event.EventType.METRIC, attributes={"phase": "extract"} + ): + while True: + try: + start = time.monotonic() + items = await anext(generator) + end = time.monotonic() + duration = end - start + ( + await port_ocean.context.event.event._metric_aggregator.increment_field( + MetricFieldType.DURATION, duration + ) + if port_ocean.context.event.event._metric_aggregator + else None + ) + yield items + except Exception: + break + + +def metric(phase: str | None = None, should_capture_time: bool = True) -> Any: + def decorator(func: Callable[..., Any]) -> Any: + @wraps(func) + async def wrapper(*args: Any, **kwargs: dict[Any, Any]) -> Any: + if not phase: + _phase = port_ocean.context.event.event.attributes.get("phase") + async with port_ocean.context.event.event_context( + port_ocean.context.event.EventType.METRIC, + attributes={"phase": phase or _phase}, + ): + res = None + start = time.monotonic() + res = await func(*args, **kwargs) + if should_capture_time: + end = time.monotonic() + duration = end - start + ( + await port_ocean.context.event.event._metric_aggregator.increment_field( + MetricFieldType.DURATION, duration + ) + if port_ocean.context.event.event._metric_aggregator + else None + ) + return res + + return wrapper + + return decorator diff --git a/port_ocean/helpers/retry.py b/port_ocean/helpers/retry.py index 5b2058c86c..2cb20c871a 100644 --- a/port_ocean/helpers/retry.py +++ b/port_ocean/helpers/retry.py @@ -8,6 +8,9 @@ import httpx from dateutil.parser import isoparse +from port_ocean.context import event +from port_ocean.exceptions.context import EventContextNotFoundError +from port_ocean.helpers.metric import MetricFieldType # Adapted from https://github.com/encode/httpx/issues/108#issuecomment-1434439481 @@ -162,6 +165,16 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: response = await self._retry_operation_async(request, send_method) else: response = await transport.handle_async_request(request) + try: + ( + await event.event._metric_aggregator.increment_status( + str(response.status_code) + ) + if event.event._metric_aggregator + else None + ) + except Exception: + pass return response except Exception as e: # Retyable methods are logged via _log_error @@ -280,16 +293,45 @@ async def _retry_operation_async( attempts_made = 0 response: httpx.Response | None = None error: Exception | None = None + + metric = None + try: + metric = ( + await event.event._metric_aggregator.get_metric() + if event.event._metric_aggregator + else None + ) + except EventContextNotFoundError: + pass + while True: if attempts_made > 0: sleep_time = self._calculate_sleep(attempts_made, {}) self._log_before_retry(request, sleep_time, response, error) await asyncio.sleep(sleep_time) + ( + await event.event._metric_aggregator.increment_field( + MetricFieldType.RATE_LIMIT + ) + if metric and event.event._metric_aggregator + else None + ) error = None response = None try: response = await send_method(request) + try: + ( + await event.event._metric_aggregator.increment_status( + str(response.status_code) + ) + if metric and event.event._metric_aggregator + else None + ) + except Exception: + pass + response.request = request if remaining_attempts < 1 or not ( await self._should_retry_async(response) diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index 7101e18a2d..38b39431f8 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -158,8 +158,8 @@ def mock_sync_raw_mixin( sync_raw_mixin._entity_processor = mock_entity_processor sync_raw_mixin._entities_state_applier = mock_entities_state_applier sync_raw_mixin._port_app_config_handler = mock_port_app_config_handler - sync_raw_mixin._get_resource_raw_results = AsyncMock(return_value=([{}], [])) # type: ignore - sync_raw_mixin._entity_processor.parse_items = AsyncMock(return_value=MagicMock()) # type: ignore + sync_raw_mixin._get_resource_raw_results = AsyncMock(return_value=([{}], [])) + sync_raw_mixin._entity_processor.parse_items = AsyncMock(return_value=MagicMock()) return sync_raw_mixin @@ -194,7 +194,9 @@ async def test_sync_raw_mixin_self_dependency( calc_result_mock.entity_selector_diff.passed = entities calc_result_mock.errors = [] - mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore + mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock( + return_value=calc_result_mock + ) mock_order_by_entities_dependencies = MagicMock( side_effect=EntityTopologicalSorter.order_by_entities_dependencies @@ -249,7 +251,9 @@ async def test_sync_raw_mixin_circular_dependency( calc_result_mock.entity_selector_diff.passed = entities calc_result_mock.errors = [] - mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore + mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock( + return_value=calc_result_mock + ) mock_order_by_entities_dependencies = MagicMock( side_effect=EntityTopologicalSorter.order_by_entities_dependencies @@ -330,7 +334,9 @@ async def test_sync_raw_mixin_dependency( calc_result_mock.entity_selector_diff.passed = entities calc_result_mock.errors = [] - mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore + mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock( + return_value=calc_result_mock + ) mock_order_by_entities_dependencies = MagicMock( side_effect=EntityTopologicalSorter.order_by_entities_dependencies diff --git a/port_ocean/tests/test_metric.py b/port_ocean/tests/test_metric.py new file mode 100644 index 0000000000..c449ff2003 --- /dev/null +++ b/port_ocean/tests/test_metric.py @@ -0,0 +1,29 @@ +import json +import sys +import pytest + + +@pytest.mark.metric +def test_metrics() -> None: + print(sys.argv) + path = "/tmp/ocean/metric.log" + dealy = 2 + batch_size = 400 + total_objects = 2000 + with open(path, "r") as file: + content = file.read() + magic_string = "integration metrics" + idx = content.find(magic_string) + content = content[idx + len(magic_string) :] + obj = json.loads(content) + metrics = obj.get("metrics") + dep = metrics.get("fake-person") + load = dep.get("load") + extract = dep.get("extract") + transform = dep.get("transform") + assert round(extract.get("duration")) == round( + ((total_objects / batch_size) * dealy) + 1 + ) + assert extract.get("object_count") == total_objects + assert load.get("object_count") == total_objects + assert transform.get("object_count") == total_objects diff --git a/scripts/run-compare-diff-pef.sh b/scripts/run-compare-diff-pef.sh new file mode 100755 index 0000000000..c81ac03bff --- /dev/null +++ b/scripts/run-compare-diff-pef.sh @@ -0,0 +1,108 @@ +#!/usr/bin/env bash + +export MOCK_PORT_API='1' +export THIRD_PARTY_LATENCY_MS=10 +export ENTITY_AMOUNT=35000 +export THIRD_PARTY_BATCH_SIZE=500 + +branch=$1 +iterations=10 + +SCRIPT_BASE="$(cd -P "$(dirname "$0")" && pwd)" +ROOT_DIR="$(cd -P "${SCRIPT_BASE}/../" && pwd)" + + + +# Validate input +if [[ -z $branch ]]; then + echo "Error: Please supply a branch or multiple branches." + exit 1 +fi + +branches=('main' "$branch") +declare -A branches_map +declare -A aggregation_results + +max_branch_length=0 + +for _branch in "${branches[@]}"; do + branch_md5=$(echo -n "$_branch" | md5sum | cut -d' ' -f1) + branches_map["$_branch"]=$branch_md5 + branches_map["$branch_md5"]=$_branch + + # Update max_branch_length for table formatting + branch_length=${#_branch} + if (( branch_length > max_branch_length )); then + max_branch_length=$branch_length + fi + + git checkout "$_branch" || echo "Error: Failed to checkout branch $_branch." + + + echo "Building: $_branch" + make -f $ROOT_DIR/Makefile build + + echo "Branch: $_branch - MD5: $branch_md5" + for i in $(seq $iterations); do + echo "Running $_branch iteration $i" + export SMOKE_TEST_SUFFIX="$branch_md5-iteration-$i" + export OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS=0 + + "$SCRIPT_BASE/run-local-perf-test.sh" + sleep 10 + done +done + +for f in "$ROOT_DIR"/perf-test-results*; do + if [[ ! -f $f ]]; then + echo "Warning: No performance result files found." + continue + fi + + branch_md5=$(echo "$f" | cut -d'-' -f4) + duration=$(grep -i 'Duration' "$f" | cut -d'|' -f3 | cut -d' ' -f3) + branch_name=${branches_map[$branch_md5]} + + if [[ -z $branch_name ]]; then + echo "Warning: Branch MD5 $branch_md5 not found in branches map. Skipping file $f." + continue + fi + + # Ensure duration is numeric + if ! [[ $duration =~ ^[0-9]+$ ]]; then + echo "Warning: Invalid duration in file $f. Skipping." + continue + fi + + previous_min=${aggregation_results["$branch_name.min"]:-9999} + previous_max=${aggregation_results["$branch_name.max"]:-0} + previous_duration=${aggregation_results["$branch_name.duration"]:-0} + + aggregation_results["$branch_name.duration"]=$((previous_duration + duration)) + + if (( duration > previous_max )); then + aggregation_results["$branch_name.max"]=$duration + fi + if (( duration < previous_min )); then + aggregation_results["$branch_name.min"]=$duration + fi +done + +# Adjust table size dynamically +header_length=$((max_branch_length + 2)) +table_format="%-${header_length}s | %-10s | %-10s | %-10s\n" + +# Generate pretty summary table +printf "\n${table_format}" "Branch Name" "Total Time" "Max" "Min" +printf "${table_format}" "$(printf '%.0s-' $(seq $header_length))" "----------" "----------" "----------" + +for branch_name in "${branches[@]}"; do + total_duration=${aggregation_results["$branch_name.duration"]:-0} + max_duration=${aggregation_results["$branch_name.max"]:-0} + min_duration=${aggregation_results["$branch_name.min"]:-0} + printf "${table_format}" "$branch_name" "$(( $total_duration/$iterations ))" "$max_duration" "$min_duration" + printf "${table_format}" "$(printf '%.0s-' $(seq $header_length))" "----------" "----------" "----------" + +done +rm -f "$ROOT_DIR"/perf-test-results* +echo "Performance tests completed." diff --git a/scripts/run-local-perf-test.sh b/scripts/run-local-perf-test.sh index e5a0436f3d..eb8233a274 100755 --- a/scripts/run-local-perf-test.sh +++ b/scripts/run-local-perf-test.sh @@ -57,11 +57,12 @@ echo "# Performance Test Summary | Timestamp | Event | |:-------------:|-------------|" >"${LOG_FILE_MD}" -START_NS=$(date +%s%N) +bash --version +START_NS=$(gdate +%s%N) _log "Starting Sync" RUN_LOG_FILE="./perf-sync.log" "${SCRIPT_BASE}/run-local-smoke-test.sh" | tee "${RUN_LOG_FILE}" -END_NS=$(date +%s%N) +END_NS=$(gdate +%s%N) ELAPSED_MS=$(((END_NS - START_NS) / 1000000)) _log "Duration $((ELAPSED_MS / 1000)) seconds" @@ -85,6 +86,7 @@ fi if [[ "${MOCK_PORT_API:-0}" = "1" ]]; then + sleep 1 make smoke/stop-mock-api fi diff --git a/scripts/run-metric-test.sh b/scripts/run-metric-test.sh new file mode 100755 index 0000000000..f524b62b1f --- /dev/null +++ b/scripts/run-metric-test.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +SCRIPT_BASE="$(cd -P "$(dirname "$0")" && pwd)" +ROOT_DIR="$(cd -P "${SCRIPT_BASE}/../" && pwd)" +TEMP_DIR='/tmp/ocean' +mkdir -p $TEMP_DIR +latency_ms=2000 + +export PORT_BASE_URL='http://localhost:5555' +export OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS=$latency_ms + +make -f "$ROOT_DIR/Makefile" smoke/start-mock-api + +$SCRIPT_BASE/run-smoke-test.sh | grep 'integration metrics' > $TEMP_DIR/metric.log +cat $TEMP_DIR/metric.log +python -m pytest -m metric + +make -f "$ROOT_DIR/Makefile" smoke/stop-mock-api From a1df9b8df6ea4f70a8578991216a6a291cd010eb Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Mon, 30 Dec 2024 10:49:18 +0200 Subject: [PATCH 2/8] [Core] Add metrics to monitor integration performance Refactor increment methods. --- Makefile | 2 +- perf-test-results-perf-24022.log.md | 19 +++++ perf-test-results-perf-25123.log.md | 19 +++++ port_ocean/clients/port/mixins/entities.py | 58 +++------------ port_ocean/context/event.py | 14 ++++ .../core/handlers/entity_processor/base.py | 34 +++------ .../core/integrations/mixins/sync_raw.py | 1 - port_ocean/helpers/metric.py | 28 ++++---- port_ocean/helpers/retry.py | 33 +++------ .../core/handlers/mixins/test_sync_raw.py | 2 +- port_ocean/tests/test_metric.py | 70 +++++++++++++------ scripts/run-metric-test.sh | 2 + 12 files changed, 145 insertions(+), 137 deletions(-) create mode 100644 perf-test-results-perf-24022.log.md create mode 100644 perf-test-results-perf-25123.log.md diff --git a/Makefile b/Makefile index 7e89beca8b..b1ffa80df9 100644 --- a/Makefile +++ b/Makefile @@ -159,7 +159,7 @@ bump/single-integration: # run a mock port api server for perf / smoke tests smoke/start-mock-api: - $(ACTIVATE) && SMOKE_TEST_SUFFIX=$${SMOKE_TEST_SUFFIX:-default_value} python ./port_ocean/tests/helpers/fake_port_api.py & + $(ACTIVATE) && SMOKE_TEST_SUFFIX=$${SMOKE_TEST_SUFFIX:-default_value} python ./port_ocean/tests/helpers/fake_port_api.py > /dev/null & smoke/stop-mock-api: ps aux | grep fake_port_api | egrep -v grep | awk '{print $$2};' | xargs kill -9 diff --git a/perf-test-results-perf-24022.log.md b/perf-test-results-perf-24022.log.md new file mode 100644 index 0000000000..bcc5723fa2 --- /dev/null +++ b/perf-test-results-perf-24022.log.md @@ -0,0 +1,19 @@ +# Performance Test Summary + +### Parameters: + +| Param | Value | +|:-----:|:-----:| +| Entities Amount | -1 | +| Entity Size (KB) | -1 | +| Third Party Latency | -1 ms | +| Third Party Batch Size | -1 | + +### Run summary + +| Timestamp | Event | +|:-------------:|-------------| +| 08:23:55 | Starting Sync | +| 08:23:59 | Duration 4 seconds | +| 08:23:59 | Upserted: 401 entities | +| 08:24:00 | Perf test complete | diff --git a/perf-test-results-perf-25123.log.md b/perf-test-results-perf-25123.log.md new file mode 100644 index 0000000000..faf4f5a8d9 --- /dev/null +++ b/perf-test-results-perf-25123.log.md @@ -0,0 +1,19 @@ +# Performance Test Summary + +### Parameters: + +| Param | Value | +|:-----:|:-----:| +| Entities Amount | -1 | +| Entity Size (KB) | -1 | +| Third Party Latency | -1 ms | +| Third Party Batch Size | -1 | + +### Run summary + +| Timestamp | Event | +|:-------------:|-------------| +| 08:20:38 | Starting Sync | +| 08:20:41 | Duration 3 seconds | +| 08:20:41 | Upserted: 0 entities | +| 08:20:42 | Perf test complete | diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index e39adc6009..a8fff169c8 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -75,13 +75,8 @@ async def upsert_entity( extensions={"retryable": True}, ) if response.is_error: - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.ERROR_COUNT - ) - if event.event._metric_aggregator - else None - ) + await event.event.increment_metric(MetricFieldType.ERROR_COUNT) + logger.error( f"Error {'Validating' if validation_only else 'Upserting'} " f"entity: {entity.identifier} of " @@ -107,13 +102,7 @@ async def upsert_entity( # We return None to ignore the entity later in the delete process if result_entity.is_using_search_identifier: if not response.is_error: - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.ERROR_COUNT - ) - if event.event._metric_aggregator - else None - ) + await event.event.increment_metric(MetricFieldType.ERROR_COUNT) return None # In order to save memory we'll keep only the identifier, blueprint and relations of the @@ -129,21 +118,8 @@ async def upsert_entity( key: None if isinstance(relation, dict) else relation for key, relation in result_entity.relations.items() } - - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.OBJECT_COUNT - ) - if event.event._metric_aggregator - else None - ) - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.UPSERTED - ) - if event.event._metric_aggregator - else None - ) + await event.event.increment_metric(MetricFieldType.OBJECT_COUNT) + await event.event.increment_metric(MetricFieldType.UPSERTED) return reduced_entity @@ -202,13 +178,7 @@ async def delete_entity( ) if response.is_error: - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.ERROR_COUNT - ) - if event.event._metric_aggregator - else None - ) + await event.event.increment_metric(MetricFieldType.ERROR_COUNT) if response.status_code == 404: logger.info( f"Failed to delete entity: {entity.identifier} of blueprint: {entity.blueprint}," @@ -222,20 +192,8 @@ async def delete_entity( ) handle_status_code(response, should_raise) - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.OBJECT_COUNT - ) - if event.event._metric_aggregator - else None - ) - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.DELETED - ) - if event.event._metric_aggregator - else None - ) + await event.event.increment_metric(MetricFieldType.OBJECT_COUNT) + await event.event.increment_metric(MetricFieldType.DELETED) async def batch_delete_entities( self, diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index be983c3dea..23dcd62583 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -75,6 +75,20 @@ def abort(self) -> None: ) self._aborted = True + async def increment_status(self, status_code: str) -> None: + try: + if self._metric_aggregator: + await self._metric_aggregator.increment_status(status_code) + except Exception: + pass + + async def increment_metric(self, metric: str, amount: int | float = 1) -> None: + try: + if self._metric_aggregator: + await self._metric_aggregator.increment_field(metric, amount) + except Exception: + pass + @property def aborted(self) -> bool: return self._aborted diff --git a/port_ocean/core/handlers/entity_processor/base.py b/port_ocean/core/handlers/entity_processor/base.py index ef5f138f7c..e902a8fe6e 100644 --- a/port_ocean/core/handlers/entity_processor/base.py +++ b/port_ocean/core/handlers/entity_processor/base.py @@ -57,35 +57,17 @@ async def parse_items( result = await self._parse_items( mapping, raw_data, parse_all, send_raw_data_examples_amount ) - - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.INPUT_COUNT, len(raw_data) - ) - if event.event._metric_aggregator - else None + await event.event.increment_metric( + MetricFieldType.INPUT_COUNT, len(raw_data) ) - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.OBJECT_COUNT, - len(result.entity_selector_diff.passed), - ) - if event.event._metric_aggregator - else None + await event.event.increment_metric( + MetricFieldType.OBJECT_COUNT, len(result.entity_selector_diff.passed) ) - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.FAILED, len(result.entity_selector_diff.failed) - ) - if event.event._metric_aggregator - else None + await event.event.increment_metric( + MetricFieldType.FAILED, len(result.entity_selector_diff.failed) ) - ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.ERROR_COUNT, len(result.errors) - ) - if event.event._metric_aggregator - else None + await event.event.increment_metric( + MetricFieldType.ERROR_COUNT, len(result.errors) ) return result diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index ee4b65cb8f..b8d487a58c 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -52,7 +52,6 @@ def __init__(self) -> None: async def _on_resync(self, kind: str) -> RAW_RESULT: raise NotImplementedError("on_resync must be implemented") - @metric(MetricType.EXTRACT) async def _get_resource_raw_results( self, resource_config: ResourceConfig ) -> tuple[RESYNC_RESULT, list[Exception]]: diff --git a/port_ocean/helpers/metric.py b/port_ocean/helpers/metric.py index ae9bc57126..c4a16e59c4 100644 --- a/port_ocean/helpers/metric.py +++ b/port_ocean/helpers/metric.py @@ -86,8 +86,7 @@ async def get_metric(self) -> TransformStats | LoadStats | ExtractStats | None: metric = self.metrics.metrics.get(resource.resource.kind) if not metric: - async with self._lock: - self.metrics.metrics[resource.resource.kind] = MetricsData() + self.metrics.metrics[resource.resource.kind] = MetricsData() return getattr(self.metrics.metrics.get(resource.resource.kind), phase) @@ -99,9 +98,10 @@ async def increment_field(self, field: str, amount: int | float = 1) -> None: async def increment_status(self, status_code: str) -> None: metric = await self.get_metric() - if not isinstance(metric, ApiStats): - return + if metric is None or not isinstance(metric, ApiStats): + return None async with self._lock: + status = metric.requests.get(status_code) if not status: metric.requests[status_code] = 0 @@ -130,13 +130,13 @@ async def timed_generator( items = await anext(generator) end = time.monotonic() duration = end - start - ( - await port_ocean.context.event.event._metric_aggregator.increment_field( - MetricFieldType.DURATION, duration - ) - if port_ocean.context.event.event._metric_aggregator - else None + await port_ocean.context.event.event.increment_metric( + MetricFieldType.DURATION, duration ) + await port_ocean.context.event.event.increment_metric( + MetricFieldType.OBJECT_COUNT, len(items) + ) + yield items except Exception: break @@ -158,12 +158,8 @@ async def wrapper(*args: Any, **kwargs: dict[Any, Any]) -> Any: if should_capture_time: end = time.monotonic() duration = end - start - ( - await port_ocean.context.event.event._metric_aggregator.increment_field( - MetricFieldType.DURATION, duration - ) - if port_ocean.context.event.event._metric_aggregator - else None + await port_ocean.context.event.event.increment_metric( + MetricFieldType.DURATION, duration ) return res diff --git a/port_ocean/helpers/retry.py b/port_ocean/helpers/retry.py index 2cb20c871a..1120946374 100644 --- a/port_ocean/helpers/retry.py +++ b/port_ocean/helpers/retry.py @@ -166,14 +166,8 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: else: response = await transport.handle_async_request(request) try: - ( - await event.event._metric_aggregator.increment_status( - str(response.status_code) - ) - if event.event._metric_aggregator - else None - ) - except Exception: + await event.event.increment_status(str(response.status_code)) + except EventContextNotFoundError: pass return response except Exception as e: @@ -310,10 +304,8 @@ async def _retry_operation_async( self._log_before_retry(request, sleep_time, response, error) await asyncio.sleep(sleep_time) ( - await event.event._metric_aggregator.increment_field( - MetricFieldType.RATE_LIMIT - ) - if metric and event.event._metric_aggregator + await event.event.increment_metric(MetricFieldType.RATE_LIMIT) + if metric else None ) @@ -321,16 +313,12 @@ async def _retry_operation_async( response = None try: response = await send_method(request) - try: - ( - await event.event._metric_aggregator.increment_status( - str(response.status_code) - ) - if metric and event.event._metric_aggregator - else None - ) - except Exception: - pass + + ( + await event.event.increment_status(str(response.status_code)) + if metric + else None + ) response.request = request if remaining_attempts < 1 or not ( @@ -370,6 +358,7 @@ def _retry_operation( attempts_made = 0 response: httpx.Response | None = None error: Exception | None = None + while True: if attempts_made > 0: sleep_time = self._calculate_sleep(attempts_made, {}) diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index 38b39431f8..fe430e3cca 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -158,7 +158,7 @@ def mock_sync_raw_mixin( sync_raw_mixin._entity_processor = mock_entity_processor sync_raw_mixin._entities_state_applier = mock_entities_state_applier sync_raw_mixin._port_app_config_handler = mock_port_app_config_handler - sync_raw_mixin._get_resource_raw_results = AsyncMock(return_value=([{}], [])) + sync_raw_mixin._get_resource_raw_results = AsyncMock(return_value=([{}], [])) # type: ignore sync_raw_mixin._entity_processor.parse_items = AsyncMock(return_value=MagicMock()) return sync_raw_mixin diff --git a/port_ocean/tests/test_metric.py b/port_ocean/tests/test_metric.py index c449ff2003..6665dfa1ba 100644 --- a/port_ocean/tests/test_metric.py +++ b/port_ocean/tests/test_metric.py @@ -1,29 +1,59 @@ import json -import sys import pytest @pytest.mark.metric def test_metrics() -> None: - print(sys.argv) - path = "/tmp/ocean/metric.log" - dealy = 2 + """ + Test that the metrics logged in /tmp/ocean/metric.log match expected values. + """ + + log_path = "/tmp/ocean/metric.log" + delay = 2 batch_size = 400 total_objects = 2000 - with open(path, "r") as file: + magic_string = "integration metrics" + + with open(log_path, "r") as file: content = file.read() - magic_string = "integration metrics" - idx = content.find(magic_string) - content = content[idx + len(magic_string) :] - obj = json.loads(content) - metrics = obj.get("metrics") - dep = metrics.get("fake-person") - load = dep.get("load") - extract = dep.get("extract") - transform = dep.get("transform") - assert round(extract.get("duration")) == round( - ((total_objects / batch_size) * dealy) + 1 - ) - assert extract.get("object_count") == total_objects - assert load.get("object_count") == total_objects - assert transform.get("object_count") == total_objects + + assert magic_string in content, f"'{magic_string}' not found in {log_path}" + + start_idx = content.rfind(magic_string) + content_after_magic = content[start_idx + len(magic_string) :] + + obj = json.loads(content_after_magic) + metrics = obj.get("metrics") + assert metrics, "No 'metrics' key found in the parsed JSON." + + assert "fake-person" in metrics, "'fake-person' key missing in metrics data." + fake_person = metrics["fake-person"] + + extract = fake_person.get("extract") + load = fake_person.get("load") + transform = fake_person.get("transform") + + num_batches = total_objects / batch_size + expected_min_extract_duration = num_batches * delay + assert round(extract["duration"]) > round(expected_min_extract_duration), ( + f"Extract duration {extract['duration']} not greater than " + f"{expected_min_extract_duration}" + ) + assert extract["object_count"] == total_objects + assert ( + extract.get("requests", {}).get("200") == num_batches + ), f"Expected 'requests.200' == {num_batches}, got {extract.get('requests', {}).get('200')}" + + assert load["object_count"] == total_objects + assert ( + load.get("requests", {}).get("200") == total_objects + ), f"Expected 'requests.200' == {total_objects}, got {load.get('requests', {}).get('200')}" + + assert transform["object_count"] == total_objects + assert transform["input_count"] == total_objects + assert transform["failed_count"] == 0 + assert transform["duration"] > 0 + + assert extract["error_count"] == 0 + assert load["error_count"] == 0 + assert transform["error_count"] == 0 diff --git a/scripts/run-metric-test.sh b/scripts/run-metric-test.sh index f524b62b1f..b0bb2bc08f 100755 --- a/scripts/run-metric-test.sh +++ b/scripts/run-metric-test.sh @@ -8,6 +8,8 @@ latency_ms=2000 export PORT_BASE_URL='http://localhost:5555' export OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS=$latency_ms +make -f "$ROOT_DIR/Makefile" build + make -f "$ROOT_DIR/Makefile" smoke/start-mock-api $SCRIPT_BASE/run-smoke-test.sh | grep 'integration metrics' > $TEMP_DIR/metric.log From daf9cf01076aba641c847776d731a644df91d036 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Mon, 30 Dec 2024 14:58:12 +0200 Subject: [PATCH 3/8] [Core] Add metrics to monitor integration performance Set metrics as feature flag. --- port_ocean/config/settings.py | 7 +++++++ port_ocean/context/event.py | 13 +++++++++++++ port_ocean/core/integrations/mixins/sync_raw.py | 2 +- port_ocean/helpers/metric.py | 5 +++++ port_ocean/helpers/retry.py | 16 ++++++---------- port_ocean/ocean.py | 1 - 6 files changed, 32 insertions(+), 12 deletions(-) diff --git a/port_ocean/config/settings.py b/port_ocean/config/settings.py index 5c948af17d..234dfbfd75 100644 --- a/port_ocean/config/settings.py +++ b/port_ocean/config/settings.py @@ -79,6 +79,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow): ) runtime: Runtime = Runtime.OnPrem resources_path: str = Field(default=".port/resources") + metrics: bool = Field(default=False) @root_validator() def validate_integration_config(cls, values: dict[str, Any]) -> dict[str, Any]: @@ -101,6 +102,12 @@ def parse_config(model: Type[BaseModel], config: Any) -> BaseModel: return values + @validator("metrics") + def validate_metrics(cls, value: str) -> bool: + if value == "1": + return True + return False + @validator("runtime") def validate_runtime(cls, runtime: Runtime) -> Runtime: if runtime.is_saas_runtime: diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 23dcd62583..4c148ab63e 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -14,6 +14,7 @@ from uuid import uuid4 from loguru import logger +from port_ocean.context import ocean from port_ocean.helpers.metric import MetricAggregator from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter from pydispatch import dispatcher # type: ignore @@ -75,7 +76,13 @@ def abort(self) -> None: ) self._aborted = True + async def flush_metric_logs(self) -> None: + if event._metric_aggregator: + await event._metric_aggregator.flush() + async def increment_status(self, status_code: str) -> None: + if not self.should_record_metrics: + return try: if self._metric_aggregator: await self._metric_aggregator.increment_status(status_code) @@ -83,12 +90,18 @@ async def increment_status(self, status_code: str) -> None: pass async def increment_metric(self, metric: str, amount: int | float = 1) -> None: + if not self.should_record_metrics: + return try: if self._metric_aggregator: await self._metric_aggregator.increment_field(metric, amount) except Exception: pass + @property + def should_record_metrics(self) -> bool: + return ocean.ocean.config.metrics + @property def aborted(self) -> bool: return self._aborted diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index b8d487a58c..ff8e466227 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -502,7 +502,7 @@ async def sync_raw_all( logger.error(message, exc_info=error_group) else: - await event._metric_aggregator.flush() if event._metric_aggregator else print("WHAT`") + await event.flush_metric_logs() logger.info( f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}" ) diff --git a/port_ocean/helpers/metric.py b/port_ocean/helpers/metric.py index c4a16e59c4..f39638bdd1 100644 --- a/port_ocean/helpers/metric.py +++ b/port_ocean/helpers/metric.py @@ -121,6 +121,9 @@ async def reset(self) -> None: async def timed_generator( generator: ASYNC_GENERATOR_RESYNC_TYPE, ) -> ASYNC_GENERATOR_RESYNC_TYPE: + if not port_ocean.context.event.event.should_record_metrics: + async for items in generator: + yield items async with port_ocean.context.event.event_context( port_ocean.context.event.EventType.METRIC, attributes={"phase": "extract"} ): @@ -146,6 +149,8 @@ def metric(phase: str | None = None, should_capture_time: bool = True) -> Any: def decorator(func: Callable[..., Any]) -> Any: @wraps(func) async def wrapper(*args: Any, **kwargs: dict[Any, Any]) -> Any: + if not port_ocean.context.event.event.should_record_metrics: + return await func(*args, **kwargs) if not phase: _phase = port_ocean.context.event.event.attributes.get("phase") async with port_ocean.context.event.event_context( diff --git a/port_ocean/helpers/retry.py b/port_ocean/helpers/retry.py index 1120946374..7183b5bc4b 100644 --- a/port_ocean/helpers/retry.py +++ b/port_ocean/helpers/retry.py @@ -287,16 +287,12 @@ async def _retry_operation_async( attempts_made = 0 response: httpx.Response | None = None error: Exception | None = None - - metric = None + is_event_started = False try: - metric = ( - await event.event._metric_aggregator.get_metric() - if event.event._metric_aggregator - else None - ) + event.event.should_record_metrics + is_event_started = True except EventContextNotFoundError: - pass + is_event_started = False while True: if attempts_made > 0: @@ -305,7 +301,7 @@ async def _retry_operation_async( await asyncio.sleep(sleep_time) ( await event.event.increment_metric(MetricFieldType.RATE_LIMIT) - if metric + if is_event_started else None ) @@ -316,7 +312,7 @@ async def _retry_operation_async( ( await event.event.increment_status(str(response.status_code)) - if metric + if is_event_started else None ) diff --git a/port_ocean/ocean.py b/port_ocean/ocean.py index 6e9da0516c..8d9cbd9544 100644 --- a/port_ocean/ocean.py +++ b/port_ocean/ocean.py @@ -46,7 +46,6 @@ def __init__( _integration_config_model=config_factory, **(config_override or {}), ) - # add the integration sensitive configuration to the sensitive patterns to mask out sensitive_log_filter.hide_sensitive_strings( *self.config.get_sensitive_fields_data() From 9a46af806a2dba66f3446463549fa7973bc96103 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Mon, 30 Dec 2024 15:01:11 +0200 Subject: [PATCH 4/8] [Core] Add metrics to monitor integration performance Remove perf tests results --- perf-test-results-perf-24022.log.md | 19 ------------------- perf-test-results-perf-25123.log.md | 19 ------------------- 2 files changed, 38 deletions(-) delete mode 100644 perf-test-results-perf-24022.log.md delete mode 100644 perf-test-results-perf-25123.log.md diff --git a/perf-test-results-perf-24022.log.md b/perf-test-results-perf-24022.log.md deleted file mode 100644 index bcc5723fa2..0000000000 --- a/perf-test-results-perf-24022.log.md +++ /dev/null @@ -1,19 +0,0 @@ -# Performance Test Summary - -### Parameters: - -| Param | Value | -|:-----:|:-----:| -| Entities Amount | -1 | -| Entity Size (KB) | -1 | -| Third Party Latency | -1 ms | -| Third Party Batch Size | -1 | - -### Run summary - -| Timestamp | Event | -|:-------------:|-------------| -| 08:23:55 | Starting Sync | -| 08:23:59 | Duration 4 seconds | -| 08:23:59 | Upserted: 401 entities | -| 08:24:00 | Perf test complete | diff --git a/perf-test-results-perf-25123.log.md b/perf-test-results-perf-25123.log.md deleted file mode 100644 index faf4f5a8d9..0000000000 --- a/perf-test-results-perf-25123.log.md +++ /dev/null @@ -1,19 +0,0 @@ -# Performance Test Summary - -### Parameters: - -| Param | Value | -|:-----:|:-----:| -| Entities Amount | -1 | -| Entity Size (KB) | -1 | -| Third Party Latency | -1 ms | -| Third Party Batch Size | -1 | - -### Run summary - -| Timestamp | Event | -|:-------------:|-------------| -| 08:20:38 | Starting Sync | -| 08:20:41 | Duration 3 seconds | -| 08:20:41 | Upserted: 0 entities | -| 08:20:42 | Perf test complete | From 1d796948ef69c67ffeb0543bdc6bcb0e4e5d5d14 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Mon, 30 Dec 2024 16:29:59 +0200 Subject: [PATCH 5/8] [Core] Add metrics to monitor integration performance 1. Fix `validate_metrics`. 2. Pass Metric env to tests. --- port_ocean/config/settings.py | 4 ++-- port_ocean/context/event.py | 2 ++ scripts/run-metric-test.sh | 1 + scripts/run-smoke-test.sh | 1 + 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/port_ocean/config/settings.py b/port_ocean/config/settings.py index 234dfbfd75..2f5b6765ec 100644 --- a/port_ocean/config/settings.py +++ b/port_ocean/config/settings.py @@ -103,8 +103,8 @@ def parse_config(model: Type[BaseModel], config: Any) -> BaseModel: return values @validator("metrics") - def validate_metrics(cls, value: str) -> bool: - if value == "1": + def validate_metrics(cls, value: str | bool) -> bool: + if value == "1" or value is True: return True return False diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 4c148ab63e..47692bb8b9 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -77,6 +77,8 @@ def abort(self) -> None: self._aborted = True async def flush_metric_logs(self) -> None: + if not self.should_record_metrics: + return if event._metric_aggregator: await event._metric_aggregator.flush() diff --git a/scripts/run-metric-test.sh b/scripts/run-metric-test.sh index b0bb2bc08f..06b350c67f 100755 --- a/scripts/run-metric-test.sh +++ b/scripts/run-metric-test.sh @@ -7,6 +7,7 @@ latency_ms=2000 export PORT_BASE_URL='http://localhost:5555' export OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS=$latency_ms +export OCEAN__METRICS="1" make -f "$ROOT_DIR/Makefile" build diff --git a/scripts/run-smoke-test.sh b/scripts/run-smoke-test.sh index a3307d8b3f..bfb8fa264c 100755 --- a/scripts/run-smoke-test.sh +++ b/scripts/run-smoke-test.sh @@ -47,6 +47,7 @@ docker run --rm -i \ -e OCEAN__INTEGRATION__CONFIG__ENTITY_KB_SIZE="${OCEAN__INTEGRATION__CONFIG__ENTITY_KB_SIZE:--1}" \ -e OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_BATCH_SIZE="${OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_BATCH_SIZE:--1}" \ -e OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS="${OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS:--1}" \ + -e OCEAN__METRICS="${OCEAN__METRICS:--1}" \ -e OCEAN__RESOURCES_PATH="/opt/port-resources" \ --name=ZOMG-TEST \ "ghcr.io/port-labs/port-ocean-fake-integration:${FAKE_INTEGRATION_VERSION}" \ From 9d9e6abe6caa5c203a641a7c278a52bdb8ee44ed Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Thu, 2 Jan 2025 13:19:09 +0200 Subject: [PATCH 6/8] [Core] Add metrics to monitor integration performance Add metrics to topological sorter --- .../core/integrations/mixins/sync_raw.py | 1 + port_ocean/helpers/metric.py | 39 ++++++++++++++----- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index ff8e466227..dd97a446cc 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -398,6 +398,7 @@ async def update_raw_diff( {"before": entities_before_flatten, "after": entities_after_flatten}, user_agent_type, ) + @metric(MetricType.TOP_SORT) async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: try: if not event.entity_topological_sorter.should_execute(): diff --git a/port_ocean/helpers/metric.py b/port_ocean/helpers/metric.py index f39638bdd1..2d26fca718 100644 --- a/port_ocean/helpers/metric.py +++ b/port_ocean/helpers/metric.py @@ -42,7 +42,12 @@ class LoadStats(ApiStats): @dataclass -class MetricsData: +class TopologicalSortStats(ApiStats): + pass + + +@dataclass +class KindMetricsData: extract: ExtractStats = field(default_factory=ExtractStats) transform: TransformStats = field(default_factory=TransformStats) load: LoadStats = field(default_factory=LoadStats) @@ -50,7 +55,8 @@ class MetricsData: @dataclass class KindsMetricsData: - metrics: dict[str, MetricsData] = field(default_factory=dict) + kind_metrics: dict[str, KindMetricsData] = field(default_factory=dict) + top_sort: TopologicalSortStats = field(default_factory=TopologicalSortStats) class MetricFieldType: @@ -65,30 +71,43 @@ class MetricFieldType: INPUT_COUNT = "input_count" -class MetricType: +class GlobalMerticKind: + TOP_SORT = "top_sort" + + +class KindMetricType: EXTRACT = "extract" TRANSFORM = "transform" LOAD = "load" +class MetricType(KindMetricType, GlobalMerticKind): + pass + + class MetricAggregator: def __init__(self) -> None: self._lock = asyncio.Lock() self.metrics: KindsMetricsData = KindsMetricsData() - async def get_metrics(self) -> dict[str, MetricsData]: - return self.metrics.metrics + async def get_metrics(self) -> dict[str, KindMetricsData]: + return self.metrics + + def is_kind_metric(self): + t = [v for k, v in KindMetricType.__dict__.items() if not k.startswith("__")] + return port_ocean.context.event.event.attributes.get("phase", None) in t async def get_metric(self) -> TransformStats | LoadStats | ExtractStats | None: phase = port_ocean.context.event.event.attributes.get("phase", None) if not phase: return None + if self.is_kind_metric(): + metric = self.metrics.kind_metrics.get(resource.resource.kind) + if not metric: + self.metrics.kind_metrics[resource.resource.kind] = KindMetricsData() - metric = self.metrics.metrics.get(resource.resource.kind) - if not metric: - self.metrics.metrics[resource.resource.kind] = MetricsData() - - return getattr(self.metrics.metrics.get(resource.resource.kind), phase) + return getattr(self.metrics.kind_metrics.get(resource.resource.kind), phase) + return getattr(self.metrics, phase) async def increment_field(self, field: str, amount: int | float = 1) -> None: metric = await self.get_metric() From 2577d9a03ac71b7a311a24de9549eb189fb8f11a Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Thu, 9 Jan 2025 12:00:10 +0200 Subject: [PATCH 7/8] [Core] Add metrics with prometheus convert metrics to use prometheus --- poetry.lock | 18 +- port_ocean/clients/port/mixins/entities.py | 38 +++- port_ocean/context/event.py | 53 +---- port_ocean/context/ocean.py | 4 + port_ocean/context/resource.py | 7 +- .../core/handlers/entity_processor/base.py | 33 +-- .../core/integrations/mixins/sync_raw.py | 13 +- port_ocean/helpers/metric.py | 192 ------------------ port_ocean/helpers/metric/metric.py | 109 ++++++++++ port_ocean/helpers/metric/utils.py | 54 +++++ port_ocean/helpers/retry.py | 38 ++-- port_ocean/ocean.py | 8 + .../clients/port/mixins/test_entities.py | 20 +- pyproject.toml | 1 + scripts/run-metric-test.sh | 4 +- 15 files changed, 278 insertions(+), 314 deletions(-) delete mode 100644 port_ocean/helpers/metric.py create mode 100644 port_ocean/helpers/metric/metric.py create mode 100644 port_ocean/helpers/metric/utils.py diff --git a/poetry.lock b/poetry.lock index 2cdbb972f7..15a5e1fd1c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. [[package]] name = "aiostream" @@ -1120,6 +1120,20 @@ nodeenv = ">=0.11.1" pyyaml = ">=5.1" virtualenv = ">=20.10.0" +[[package]] +name = "prometheus-client" +version = "0.21.1" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.8" +files = [ + {file = "prometheus_client-0.21.1-py3-none-any.whl", hash = "sha256:594b45c410d6f4f8888940fe80b5cc2521b305a1fafe1c58609ef715a001f301"}, + {file = "prometheus_client-0.21.1.tar.gz", hash = "sha256:252505a722ac04b0456be05c05f75f45d760c2911ffc45f2a06bcaed9f3ae3fb"}, +] + +[package.extras] +twisted = ["twisted"] + [[package]] name = "pycparser" version = "2.22" @@ -1782,4 +1796,4 @@ cli = ["click", "cookiecutter", "jinja2-time", "rich"] [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "655f1de42f896216982bdea5f016951b8f259ee46486131796daffce27d8b98f" +content-hash = "4bc7f90bf42fc1a22c8da9447db116a37ff12d9efa7efa183e25c55ce41e396b" diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index a8fff169c8..747f411860 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -2,6 +2,9 @@ from typing import Any, Literal from urllib.parse import quote_plus +import port_ocean.context.ocean +from port_ocean.helpers.metric.utils import TimeMetric + import httpx from loguru import logger @@ -11,9 +14,8 @@ handle_status_code, PORT_HTTP_MAX_CONNECTIONS_LIMIT, ) -from port_ocean.context import event from port_ocean.core.models import Entity -from port_ocean.helpers.metric import MetricFieldType, MetricType, metric +from port_ocean.helpers.metric.metric import MetricType, MetricPhase from port_ocean.core.models import PortAPIErrorMessage from starlette import status @@ -75,7 +77,9 @@ async def upsert_entity( extensions={"retryable": True}, ) if response.is_error: - await event.event.increment_metric(MetricFieldType.ERROR_COUNT) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.ERROR_COUNT[0], [MetricPhase.LOAD] + ).inc() logger.error( f"Error {'Validating' if validation_only else 'Upserting'} " @@ -102,7 +106,9 @@ async def upsert_entity( # We return None to ignore the entity later in the delete process if result_entity.is_using_search_identifier: if not response.is_error: - await event.event.increment_metric(MetricFieldType.ERROR_COUNT) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.ERROR_COUNT[0], [MetricPhase.LOAD] + ).inc() return None # In order to save memory we'll keep only the identifier, blueprint and relations of the @@ -118,12 +124,16 @@ async def upsert_entity( key: None if isinstance(relation, dict) else relation for key, relation in result_entity.relations.items() } - await event.event.increment_metric(MetricFieldType.OBJECT_COUNT) - await event.event.increment_metric(MetricFieldType.UPSERTED) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.OBJECT_COUNT[0], [MetricPhase.LOAD] + ).inc() + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.UPSERTED[0], [MetricPhase.LOAD] + ).inc() return reduced_entity - @metric(MetricType.LOAD) + @TimeMetric(MetricPhase.LOAD) async def batch_upsert_entities( self, entities: list[Entity], @@ -155,7 +165,7 @@ async def batch_upsert_entities( return entity_results - @metric(MetricType.LOAD) + @TimeMetric(MetricPhase.LOAD) async def delete_entity( self, entity: Entity, @@ -178,7 +188,9 @@ async def delete_entity( ) if response.is_error: - await event.event.increment_metric(MetricFieldType.ERROR_COUNT) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.ERROR_COUNT[0], [MetricPhase.LOAD] + ).inc() if response.status_code == 404: logger.info( f"Failed to delete entity: {entity.identifier} of blueprint: {entity.blueprint}," @@ -192,8 +204,12 @@ async def delete_entity( ) handle_status_code(response, should_raise) - await event.event.increment_metric(MetricFieldType.OBJECT_COUNT) - await event.event.increment_metric(MetricFieldType.DELETED) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.OBJECT_COUNT[0], [MetricPhase.LOAD] + ).inc() + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.DELETED[0], [MetricPhase.LOAD] + ).inc() async def batch_delete_entities( self, diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 47692bb8b9..6ce6014c2c 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -14,8 +14,6 @@ from uuid import uuid4 from loguru import logger -from port_ocean.context import ocean -from port_ocean.helpers.metric import MetricAggregator from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter from pydispatch import dispatcher # type: ignore from werkzeug.local import LocalStack, LocalProxy @@ -55,7 +53,6 @@ class EventContext: _parent_event: Optional["EventContext"] = None _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) - _metric_aggregator: Optional["MetricAggregator"] = None entity_topological_sorter: EntityTopologicalSorter = field( default_factory=EntityTopologicalSorter ) @@ -76,34 +73,6 @@ def abort(self) -> None: ) self._aborted = True - async def flush_metric_logs(self) -> None: - if not self.should_record_metrics: - return - if event._metric_aggregator: - await event._metric_aggregator.flush() - - async def increment_status(self, status_code: str) -> None: - if not self.should_record_metrics: - return - try: - if self._metric_aggregator: - await self._metric_aggregator.increment_status(status_code) - except Exception: - pass - - async def increment_metric(self, metric: str, amount: int | float = 1) -> None: - if not self.should_record_metrics: - return - try: - if self._metric_aggregator: - await self._metric_aggregator.increment_field(metric, amount) - except Exception: - pass - - @property - def should_record_metrics(self) -> bool: - return ocean.ocean.config.metrics - @property def aborted(self) -> bool: return self._aborted @@ -174,12 +143,6 @@ async def event_context( attributes = {**parent_attributes, **(attributes or {})} - aggregator = ( - parent._metric_aggregator - if parent and parent._metric_aggregator - else MetricAggregator() - ) - new_event = EventContext( event_type, trigger_type=trigger_type, @@ -187,7 +150,6 @@ async def event_context( _parent_event=parent, # inherit port app config from parent event, so it can be used in nested events _port_app_config=parent.port_app_config if parent else None, - _metric_aggregator=aggregator, entity_topological_sorter=entity_topological_sorter, ) _event_context_stack.push(new_event) @@ -203,7 +165,6 @@ def _handle_event(triggering_event_id: int) -> None: dispatcher.connect(_handle_event, event_type) dispatcher.send(event_type, triggering_event_id=event.id) - is_silent = EventType.METRIC == event_type start_time = get_time(seconds_precision=False) with logger.contextualize( event_trigger_type=event.trigger_type, @@ -214,7 +175,7 @@ def _handle_event(triggering_event_id: int) -> None: event.resource_config.kind if event.resource_config else None ), ): - logger.info("Event started") if not is_silent else None + logger.info("Event started") try: yield event except: @@ -225,14 +186,10 @@ def _handle_event(triggering_event_id: int) -> None: finally: end_time = get_time(seconds_precision=False) time_elapsed = round(end_time - start_time, 5) - ( - logger.bind( - success=success, - time_elapsed=time_elapsed, - ).info("Event finished") - if not is_silent - else None - ) + logger.bind( + success=success, + time_elapsed=time_elapsed, + ).info("Event finished") dispatcher.disconnect(_handle_event, event_type) diff --git a/port_ocean/context/ocean.py b/port_ocean/context/ocean.py index 0830729788..37a0250b7b 100644 --- a/port_ocean/context/ocean.py +++ b/port_ocean/context/ocean.py @@ -36,6 +36,10 @@ def app(self) -> "Ocean": ) return self._app + @property + def metrics(self): + return self._app.metrics + @property def initialized(self) -> bool: return self._app is not None diff --git a/port_ocean/context/resource.py b/port_ocean/context/resource.py index 4577e5bca7..92f7b7ca5b 100644 --- a/port_ocean/context/resource.py +++ b/port_ocean/context/resource.py @@ -23,6 +23,7 @@ class ResourceContext: """ resource_config: "ResourceConfig" + index: int @property def kind(self) -> str: @@ -50,12 +51,10 @@ def _get_resource_context() -> ResourceContext: @asynccontextmanager async def resource_context( - resource_config: "ResourceConfig", + resource_config: "ResourceConfig", index: int = 0 ) -> AsyncIterator[ResourceContext]: _resource_context_stack.push( - ResourceContext( - resource_config=resource_config, - ) + ResourceContext(resource_config=resource_config, index=index) ) with logger.contextualize(resource_kind=resource.kind): diff --git a/port_ocean/core/handlers/entity_processor/base.py b/port_ocean/core/handlers/entity_processor/base.py index e902a8fe6e..97ed391e1d 100644 --- a/port_ocean/core/handlers/entity_processor/base.py +++ b/port_ocean/core/handlers/entity_processor/base.py @@ -1,8 +1,9 @@ from abc import abstractmethod -from loguru import logger +from port_ocean.helpers.metric.utils import TimeMetric +import port_ocean.context.ocean -from port_ocean.context import event +from loguru import logger from port_ocean.core.handlers.base import BaseHandler from port_ocean.core.handlers.port_app_config.models import ResourceConfig from port_ocean.core.ocean_types import ( @@ -10,7 +11,7 @@ CalculationResult, EntitySelectorDiff, ) -from port_ocean.helpers.metric import MetricFieldType, MetricType, metric +from port_ocean.helpers.metric.metric import MetricType, MetricPhase class BaseEntityProcessor(BaseHandler): @@ -32,7 +33,7 @@ async def _parse_items( ) -> CalculationResult: pass - @metric(MetricType.TRANSFORM) + @TimeMetric(MetricPhase.TRANSFORM) async def parse_items( self, mapping: ResourceConfig, @@ -57,17 +58,17 @@ async def parse_items( result = await self._parse_items( mapping, raw_data, parse_all, send_raw_data_examples_amount ) - await event.event.increment_metric( - MetricFieldType.INPUT_COUNT, len(raw_data) - ) - await event.event.increment_metric( - MetricFieldType.OBJECT_COUNT, len(result.entity_selector_diff.passed) - ) - await event.event.increment_metric( - MetricFieldType.FAILED, len(result.entity_selector_diff.failed) - ) - await event.event.increment_metric( - MetricFieldType.ERROR_COUNT, len(result.errors) - ) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.INPUT_COUNT[0], [MetricPhase.LOAD] + ).inc(len(raw_data)) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.OBJECT_COUNT[0], [MetricPhase.LOAD] + ).inc(len(result.entity_selector_diff.passed)) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.FAILED_COUNT[0], [MetricPhase.LOAD] + ).inc(len(result.entity_selector_diff.failed)) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.ERROR_COUNT[0], [MetricPhase.LOAD] + ).inc(len(result.errors)) return result diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index dd97a446cc..ed2ca5364c 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -30,7 +30,8 @@ ) from port_ocean.core.utils.utils import zip_and_sum, gather_and_split_errors_from_results from port_ocean.exceptions.core import OceanAbortException -from port_ocean.helpers.metric import MetricFieldType, MetricType, metric, timed_generator +from port_ocean.helpers.metric.metric import MetricPhase +from port_ocean.helpers.metric.utils import TimeMetric, timed_generator SEND_RAW_DATA_EXAMPLES_AMOUNT = 5 @@ -85,7 +86,7 @@ def _collect_resync_functions( return fns - @metric(MetricType.EXTRACT) + @TimeMetric(MetricPhase.EXTRACT) async def _execute_resync_tasks( self, fns: list[Callable[[str], Awaitable[RAW_RESULT]]], @@ -398,7 +399,7 @@ async def update_raw_diff( {"before": entities_before_flatten, "after": entities_after_flatten}, user_agent_type, ) - @metric(MetricType.TOP_SORT) + @TimeMetric(MetricPhase.TOP_SORT) async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: try: if not event.entity_topological_sorter.should_execute(): @@ -461,10 +462,10 @@ async def sync_raw_all( creation_results: list[tuple[list[Entity], list[Exception]]] = [] try: - for resource in app_config.resources: + for index,resource in enumerate(app_config.resources): # create resource context per resource kind, so resync method could have access to the resource # config as we might have multiple resources in the same event - async with resource_context(resource): + async with resource_context(resource,index): task = asyncio.get_event_loop().create_task( self._register_in_batches(resource, user_agent_type) ) @@ -503,7 +504,7 @@ async def sync_raw_all( logger.error(message, exc_info=error_group) else: - await event.flush_metric_logs() + await ocean.metrics.flush() logger.info( f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}" ) diff --git a/port_ocean/helpers/metric.py b/port_ocean/helpers/metric.py deleted file mode 100644 index 2d26fca718..0000000000 --- a/port_ocean/helpers/metric.py +++ /dev/null @@ -1,192 +0,0 @@ -import asyncio -import json -import time -from functools import wraps -from dataclasses import asdict, dataclass, field -from typing import Any, Callable - -from loguru import logger -from port_ocean.context import resource -import port_ocean.context.event -from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE - - -@dataclass -class BaseStats: - error_count: int = 0 - duration: float = 0.0 - object_count: int = 0 - - -@dataclass -class ApiStats(BaseStats): - rate_limit_wait: float = 0.0 - requests: dict[str, int] = field(default_factory=dict) - - -@dataclass -class ExtractStats(ApiStats): - pass - - -@dataclass -class TransformStats(BaseStats): - failed_count: int = 0 - input_count: int = 0 - - -@dataclass -class LoadStats(ApiStats): - upserted: int = 0 - deleted: int = 0 - - -@dataclass -class TopologicalSortStats(ApiStats): - pass - - -@dataclass -class KindMetricsData: - extract: ExtractStats = field(default_factory=ExtractStats) - transform: TransformStats = field(default_factory=TransformStats) - load: LoadStats = field(default_factory=LoadStats) - - -@dataclass -class KindsMetricsData: - kind_metrics: dict[str, KindMetricsData] = field(default_factory=dict) - top_sort: TopologicalSortStats = field(default_factory=TopologicalSortStats) - - -class MetricFieldType: - UPSERTED = "upserted" - DELETED = "deleted" - FAILED = "failed_count" - REQUEST = "requests" - RATE_LIMIT = "rate_limit_wait" - OBJECT_COUNT = "object_count" - DURATION = "duration" - ERROR_COUNT = "error_count" - INPUT_COUNT = "input_count" - - -class GlobalMerticKind: - TOP_SORT = "top_sort" - - -class KindMetricType: - EXTRACT = "extract" - TRANSFORM = "transform" - LOAD = "load" - - -class MetricType(KindMetricType, GlobalMerticKind): - pass - - -class MetricAggregator: - def __init__(self) -> None: - self._lock = asyncio.Lock() - self.metrics: KindsMetricsData = KindsMetricsData() - - async def get_metrics(self) -> dict[str, KindMetricsData]: - return self.metrics - - def is_kind_metric(self): - t = [v for k, v in KindMetricType.__dict__.items() if not k.startswith("__")] - return port_ocean.context.event.event.attributes.get("phase", None) in t - - async def get_metric(self) -> TransformStats | LoadStats | ExtractStats | None: - phase = port_ocean.context.event.event.attributes.get("phase", None) - if not phase: - return None - if self.is_kind_metric(): - metric = self.metrics.kind_metrics.get(resource.resource.kind) - if not metric: - self.metrics.kind_metrics[resource.resource.kind] = KindMetricsData() - - return getattr(self.metrics.kind_metrics.get(resource.resource.kind), phase) - return getattr(self.metrics, phase) - - async def increment_field(self, field: str, amount: int | float = 1) -> None: - metric = await self.get_metric() - async with self._lock: - val = getattr(metric, field) - metric.__setattr__(field, val + amount) - - async def increment_status(self, status_code: str) -> None: - metric = await self.get_metric() - if metric is None or not isinstance(metric, ApiStats): - return None - async with self._lock: - - status = metric.requests.get(status_code) - if not status: - metric.requests[status_code] = 0 - metric.requests[status_code] = metric.requests.get(status_code, 0) + 1 - - async def flush(self) -> None: - async with self._lock: - metric_dict = asdict(self.metrics) - logger.info(f"integration metrics {json.dumps(metric_dict)}") - await self.reset() - - async def reset(self) -> None: - async with self._lock: - self.metrics = KindsMetricsData() - - -async def timed_generator( - generator: ASYNC_GENERATOR_RESYNC_TYPE, -) -> ASYNC_GENERATOR_RESYNC_TYPE: - if not port_ocean.context.event.event.should_record_metrics: - async for items in generator: - yield items - async with port_ocean.context.event.event_context( - port_ocean.context.event.EventType.METRIC, attributes={"phase": "extract"} - ): - while True: - try: - start = time.monotonic() - items = await anext(generator) - end = time.monotonic() - duration = end - start - await port_ocean.context.event.event.increment_metric( - MetricFieldType.DURATION, duration - ) - await port_ocean.context.event.event.increment_metric( - MetricFieldType.OBJECT_COUNT, len(items) - ) - - yield items - except Exception: - break - - -def metric(phase: str | None = None, should_capture_time: bool = True) -> Any: - def decorator(func: Callable[..., Any]) -> Any: - @wraps(func) - async def wrapper(*args: Any, **kwargs: dict[Any, Any]) -> Any: - if not port_ocean.context.event.event.should_record_metrics: - return await func(*args, **kwargs) - if not phase: - _phase = port_ocean.context.event.event.attributes.get("phase") - async with port_ocean.context.event.event_context( - port_ocean.context.event.EventType.METRIC, - attributes={"phase": phase or _phase}, - ): - res = None - start = time.monotonic() - res = await func(*args, **kwargs) - if should_capture_time: - end = time.monotonic() - duration = end - start - await port_ocean.context.event.event.increment_metric( - MetricFieldType.DURATION, duration - ) - return res - - return wrapper - - return decorator diff --git a/port_ocean/helpers/metric/metric.py b/port_ocean/helpers/metric/metric.py new file mode 100644 index 0000000000..99a51befc0 --- /dev/null +++ b/port_ocean/helpers/metric/metric.py @@ -0,0 +1,109 @@ +from fastapi import APIRouter +from port_ocean.exceptions.context import ResourceContextNotFoundError +import prometheus_client + +from loguru import logger +from port_ocean.context import resource +from prometheus_client import Gauge +import prometheus_client.openmetrics +import prometheus_client.openmetrics.exposition +import prometheus_client.parser + + +class MetricPhase: + EXTRACT = "extract" + TRANSFORM = "transform" + LOAD = "load" + TOP_SORT = "top_sort" + + +class MetricType: + DURATION = ("duration_seconds", "duration description", ["kind", "phase"]) + OBJECT_COUNT = ("object_count", "object_count description", ["kind", "phase"]) + ERROR_COUNT = ("error_count", "error_count description", ["kind", "phase"]) + RATE_LIMIT_WAIT = ( + "rate_limit_wait_seconds", + "rate_limit_wait description", + ["kind", "phase", "endpoint"], + ) + FAILED_COUNT = ("failed_count", "failed_count description", ["kind", "phase"]) + INPUT_COUNT = ( + "input_count", + "input_count description", + ["kind", "phase"], + ) + UPSERTED = ("upserted_count", "upserted description", ["kind", "phase"]) + DELETED = ("deleted_count", "deleted description", ["kind", "phase"]) + REQUESTS = ( + "http_requests_count", + "requests description", + ["kind", "status", "endpoint"], + ) + + +class Metrics: + def __init__(self, enabled: bool) -> None: + self.enabled = enabled + + self.registry = prometheus_client.CollectorRegistry() + self.metrics: dict[str, Gauge] = {} + self.load_metrics() + + def load_metrics(self): + if not self.enabled: + return None + for attr in dir(MetricType): + if callable(getattr(MetricType, attr)) or attr.startswith("__"): + continue + name, description, lables = getattr(MetricType, attr) + self.metrics[name] = Gauge( + name, description, lables, registry=self.registry + ) + + def get_metric(self, name, lables: list[str]): + if not self.enabled: + + class Empty: + def inc(self, *args): + return None + + return Empty() + return self.metrics.get(name).labels(self.get_kind(), *lables) + + def create_mertic_router(self): + if not self.enabled: + return APIRouter() + router = APIRouter() + + @router.get("/") + async def prom_metrics(): + return self.generate_latest() + + return router + + def get_kind(self) -> str: + try: + return f"{resource.resource.kind}-{resource.resource.index}" + except ResourceContextNotFoundError: + return "" + + def generate_latest(self): + return prometheus_client.openmetrics.exposition.generate_latest( + self.registry + ).decode() + + async def flush(self) -> None: + latest_raw = self.generate_latest() + metric_families = prometheus_client.parser.text_string_to_metric_families( + latest_raw + ) + metrics_dict = {} + for family in metric_families: + for sample in family.samples: + label_parts = [str(v) for _, v in sample.labels.items()] + label_str = "__".join(label_parts) + + dict_key = f"{sample.name}__{label_str}" if label_str else sample.name + + metrics_dict[dict_key] = sample.value + logger.info(f"prom metrics - {metrics_dict}") diff --git a/port_ocean/helpers/metric/utils.py b/port_ocean/helpers/metric/utils.py new file mode 100644 index 0000000000..f79418ac6d --- /dev/null +++ b/port_ocean/helpers/metric/utils.py @@ -0,0 +1,54 @@ +from functools import wraps +import time +from typing import Any, Callable + +import port_ocean.context.ocean +from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE +from port_ocean.helpers.metric.metric import MetricPhase, MetricType + + +async def timed_generator( + generator: ASYNC_GENERATOR_RESYNC_TYPE, +) -> ASYNC_GENERATOR_RESYNC_TYPE: + if not port_ocean.context.ocean.ocean.metrics.enabled: + async for items in generator: + yield items + while True: + try: + start = time.monotonic() + items = await anext(generator) + end = time.monotonic() + duration = end - start + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.DURATION[0], [MetricPhase.EXTRACT] + ).inc(duration) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.OBJECT_COUNT[0], [MetricPhase.EXTRACT] + ).inc(len(items)) + yield items + except StopAsyncIteration: + break + except Exception as e: + raise e + + +def TimeMetric(phase: str | None = None) -> Any: + def decorator(func: Callable[..., Any]) -> Any: + + @wraps(func) + async def wrapper(*args: Any, **kwargs: dict[Any, Any]) -> Any: + if not port_ocean.context.ocean.ocean.metrics.enabled: + return await func(*args, **kwargs) + start = time.monotonic() + res = await func(*args, **kwargs) + end = time.monotonic() + duration = end - start + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.DURATION[0], [phase] + ).inc(duration) + + return res + + return wrapper + + return decorator diff --git a/port_ocean/helpers/retry.py b/port_ocean/helpers/retry.py index 7183b5bc4b..e64af636cb 100644 --- a/port_ocean/helpers/retry.py +++ b/port_ocean/helpers/retry.py @@ -8,9 +8,8 @@ import httpx from dateutil.parser import isoparse -from port_ocean.context import event -from port_ocean.exceptions.context import EventContextNotFoundError -from port_ocean.helpers.metric import MetricFieldType +from port_ocean.helpers.metric.metric import MetricType, MetricPhase +import port_ocean.context.ocean # Adapted from https://github.com/encode/httpx/issues/108#issuecomment-1434439481 @@ -165,10 +164,10 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: response = await self._retry_operation_async(request, send_method) else: response = await transport.handle_async_request(request) - try: - await event.event.increment_status(str(response.status_code)) - except EventContextNotFoundError: - pass + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.REQUESTS[0], [response.status_code, request.url] + ).inc() + return response except Exception as e: # Retyable methods are logged via _log_error @@ -287,34 +286,23 @@ async def _retry_operation_async( attempts_made = 0 response: httpx.Response | None = None error: Exception | None = None - is_event_started = False - try: - event.event.should_record_metrics - is_event_started = True - except EventContextNotFoundError: - is_event_started = False - while True: if attempts_made > 0: sleep_time = self._calculate_sleep(attempts_made, {}) self._log_before_retry(request, sleep_time, response, error) await asyncio.sleep(sleep_time) - ( - await event.event.increment_metric(MetricFieldType.RATE_LIMIT) - if is_event_started - else None - ) + if response.status_code == 429: + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.RATE_LIMIT_WAIT[0], [MetricPhase.LOAD, request.url] + ).inc(sleep_time) error = None response = None try: response = await send_method(request) - - ( - await event.event.increment_status(str(response.status_code)) - if is_event_started - else None - ) + port_ocean.context.ocean.ocean.metrics.get_metric( + MetricType.REQUESTS[0], [response.status_code, request.url] + ).inc() response.request = request if remaining_attempts < 1 or not ( diff --git a/port_ocean/ocean.py b/port_ocean/ocean.py index 8d9cbd9544..055a63075e 100644 --- a/port_ocean/ocean.py +++ b/port_ocean/ocean.py @@ -4,6 +4,8 @@ from contextlib import asynccontextmanager from typing import Callable, Any, Dict, AsyncIterator, Type +import port_ocean.helpers.metric.metric + from fastapi import FastAPI, APIRouter from loguru import logger from pydantic import BaseModel @@ -51,6 +53,9 @@ def __init__( *self.config.get_sensitive_fields_data() ) self.integration_router = integration_router or APIRouter() + self.metrics = port_ocean.helpers.metric.metric.Metrics( + enabled=self.config.metrics + ) self.port_client = PortClient( base_url=self.config.port.base_url, @@ -114,6 +119,9 @@ async def execute_resync_all() -> None: def initialize_app(self) -> None: self.fast_api_app.include_router(self.integration_router, prefix="/integration") + self.fast_api_app.include_router( + self.metrics.create_mertic_router(), prefix="/metrics" + ) @asynccontextmanager async def lifecycle(_: FastAPI) -> AsyncIterator[None]: diff --git a/port_ocean/tests/clients/port/mixins/test_entities.py b/port_ocean/tests/clients/port/mixins/test_entities.py index 0a2d641d1b..c6e95f4fc8 100644 --- a/port_ocean/tests/clients/port/mixins/test_entities.py +++ b/port_ocean/tests/clients/port/mixins/test_entities.py @@ -1,5 +1,5 @@ from typing import Any -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest @@ -37,17 +37,19 @@ async def entity_client(monkeypatch: Any) -> EntityClientMixin: async def test_batch_upsert_entities_read_timeout_should_raise_false( entity_client: EntityClientMixin, ) -> None: - result_entities = await entity_client.batch_upsert_entities( - entities=all_entities, request_options=MagicMock(), should_raise=False - ) + with patch("port_ocean.context.event.event", MagicMock()): + result_entities = await entity_client.batch_upsert_entities( + entities=all_entities, request_options=MagicMock(), should_raise=False + ) - assert result_entities == expected_result_entities + assert result_entities == expected_result_entities async def test_batch_upsert_entities_read_timeout_should_raise_true( entity_client: EntityClientMixin, ) -> None: - with pytest.raises(ReadTimeout): - await entity_client.batch_upsert_entities( - entities=all_entities, request_options=MagicMock(), should_raise=True - ) + with patch("port_ocean.context.event.event", MagicMock()): + with pytest.raises(ReadTimeout): + await entity_client.batch_upsert_entities( + entities=all_entities, request_options=MagicMock(), should_raise=True + ) diff --git a/pyproject.toml b/pyproject.toml index f6c02bd788..9740c0f7e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,7 @@ click = { version = "^8.1.3", optional = true } rich = { version = "^13.4.1", optional = true } cookiecutter = { version = "^2.1.1", optional = true } jinja2-time = { version = "^0.2.0", optional = true } +prometheus-client = "^0.21.1" [tool.poetry.extras] cli = ["click", "rich", "cookiecutter", "jinja2-time"] diff --git a/scripts/run-metric-test.sh b/scripts/run-metric-test.sh index 06b350c67f..621db82b0f 100755 --- a/scripts/run-metric-test.sh +++ b/scripts/run-metric-test.sh @@ -8,12 +8,14 @@ latency_ms=2000 export PORT_BASE_URL='http://localhost:5555' export OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS=$latency_ms export OCEAN__METRICS="1" +export ENTITY_AMOUNT=45000 +export THIRD_PARTY_BATCH_SIZE=500 make -f "$ROOT_DIR/Makefile" build make -f "$ROOT_DIR/Makefile" smoke/start-mock-api -$SCRIPT_BASE/run-smoke-test.sh | grep 'integration metrics' > $TEMP_DIR/metric.log +$SCRIPT_BASE/run-smoke-test.sh cat $TEMP_DIR/metric.log python -m pytest -m metric From 075a6caba18abdc84a85e8616d805e6e886588f8 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Thu, 9 Jan 2025 18:47:14 +0200 Subject: [PATCH 8/8] Update tests and add mode to ocean client --- port_ocean/clients/port/utils.py | 2 + port_ocean/context/event.py | 1 - port_ocean/context/ocean.py | 5 +- port_ocean/helpers/async_client.py | 4 + port_ocean/helpers/metric/metric.py | 40 ++++--- port_ocean/helpers/metric/utils.py | 2 +- port_ocean/helpers/retry.py | 13 +- port_ocean/tests/test_metric.py | 176 +++++++++++++++++++++++----- port_ocean/utils/async_http.py | 9 +- scripts/run-metric-test.sh | 4 +- scripts/run-smoke-test.sh | 1 + 11 files changed, 200 insertions(+), 57 deletions(-) diff --git a/port_ocean/clients/port/utils.py b/port_ocean/clients/port/utils.py index 70917c55b9..0db5624512 100644 --- a/port_ocean/clients/port/utils.py +++ b/port_ocean/clients/port/utils.py @@ -2,6 +2,7 @@ import httpx from loguru import logger +from port_ocean.helpers.metric.metric import MetricPhase from werkzeug.local import LocalStack, LocalProxy from port_ocean.clients.port.retry_transport import TokenRetryTransport @@ -43,6 +44,7 @@ def _get_http_client_context(port_client: "PortClient") -> httpx.AsyncClient: }, timeout=PORT_HTTPX_TIMEOUT, limits=PORT_HTTPX_LIMITS, + mode=MetricPhase.LOAD, ) _http_client.push(client) diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 6ce6014c2c..5ebb074001 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -40,7 +40,6 @@ class EventType: START = "start" RESYNC = "resync" HTTP_REQUEST = "http_request" - METRIC = "metric" @dataclass diff --git a/port_ocean/context/ocean.py b/port_ocean/context/ocean.py index 37a0250b7b..a95f2496ba 100644 --- a/port_ocean/context/ocean.py +++ b/port_ocean/context/ocean.py @@ -1,6 +1,7 @@ from typing import Callable, TYPE_CHECKING, Any, Literal, Union from fastapi import APIRouter +from port_ocean.helpers.metric.metric import Metrics from pydantic.main import BaseModel from werkzeug.local import LocalProxy @@ -37,8 +38,8 @@ def app(self) -> "Ocean": return self._app @property - def metrics(self): - return self._app.metrics + def metrics(self) -> Metrics: + return self.app.metrics @property def initialized(self) -> bool: diff --git a/port_ocean/helpers/async_client.py b/port_ocean/helpers/async_client.py index c78e40e61c..1dafe33ba6 100644 --- a/port_ocean/helpers/async_client.py +++ b/port_ocean/helpers/async_client.py @@ -3,6 +3,7 @@ import httpx from loguru import logger +from port_ocean.helpers.metric.metric import MetricPhase from port_ocean.helpers.retry import RetryTransport @@ -17,10 +18,12 @@ def __init__( self, transport_class: Type[RetryTransport] = RetryTransport, transport_kwargs: dict[str, Any] | None = None, + mode: str = MetricPhase.LOAD, **kwargs: Any, ): self._transport_kwargs = transport_kwargs self._transport_class = transport_class + self.mode = mode super().__init__(**kwargs) def _init_transport( # type: ignore[override] @@ -37,6 +40,7 @@ def _init_transport( # type: ignore[override] **kwargs, ), logger=logger, + mode=self.mode, **(self._transport_kwargs or {}), ) diff --git a/port_ocean/helpers/metric/metric.py b/port_ocean/helpers/metric/metric.py index 99a51befc0..9917113f53 100644 --- a/port_ocean/helpers/metric/metric.py +++ b/port_ocean/helpers/metric/metric.py @@ -1,3 +1,4 @@ +from typing import Any from fastapi import APIRouter from port_ocean.exceptions.context import ResourceContextNotFoundError import prometheus_client @@ -37,10 +38,18 @@ class MetricType: REQUESTS = ( "http_requests_count", "requests description", - ["kind", "status", "endpoint"], + ["kind", "phase", "status", "endpoint"], ) +class EmptyMetric: + def inc(self, *args: Any) -> None: + return None + + def labels(self, *args: Any) -> None: + return None + + class Metrics: def __init__(self, enabled: bool) -> None: self.enabled = enabled @@ -49,7 +58,7 @@ def __init__(self, enabled: bool) -> None: self.metrics: dict[str, Gauge] = {} self.load_metrics() - def load_metrics(self): + def load_metrics(self) -> None: if not self.enabled: return None for attr in dir(MetricType): @@ -60,23 +69,22 @@ def load_metrics(self): name, description, lables, registry=self.registry ) - def get_metric(self, name, lables: list[str]): + def get_metric(self, name: str, lables: list[str]) -> Gauge | EmptyMetric: if not self.enabled: - - class Empty: - def inc(self, *args): - return None - - return Empty() - return self.metrics.get(name).labels(self.get_kind(), *lables) - - def create_mertic_router(self): + return EmptyMetric() + metrics = self.metrics.get(name) + # Should i add a new metric although it was not initialized? + if not metrics: + return EmptyMetric() + return metrics.labels(self.get_kind(), *lables) + + def create_mertic_router(self) -> APIRouter: if not self.enabled: return APIRouter() router = APIRouter() @router.get("/") - async def prom_metrics(): + async def prom_metrics() -> str: return self.generate_latest() return router @@ -85,9 +93,9 @@ def get_kind(self) -> str: try: return f"{resource.resource.kind}-{resource.resource.index}" except ResourceContextNotFoundError: - return "" + return "init" - def generate_latest(self): + def generate_latest(self) -> str: return prometheus_client.openmetrics.exposition.generate_latest( self.registry ).decode() @@ -106,4 +114,4 @@ async def flush(self) -> None: dict_key = f"{sample.name}__{label_str}" if label_str else sample.name metrics_dict[dict_key] = sample.value - logger.info(f"prom metrics - {metrics_dict}") + logger.bind(**metrics_dict).info("prometheus metrics") diff --git a/port_ocean/helpers/metric/utils.py b/port_ocean/helpers/metric/utils.py index f79418ac6d..08f8834a77 100644 --- a/port_ocean/helpers/metric/utils.py +++ b/port_ocean/helpers/metric/utils.py @@ -32,7 +32,7 @@ async def timed_generator( raise e -def TimeMetric(phase: str | None = None) -> Any: +def TimeMetric(phase: str) -> Any: def decorator(func: Callable[..., Any]) -> Any: @wraps(func) diff --git a/port_ocean/helpers/retry.py b/port_ocean/helpers/retry.py index e64af636cb..fb4f51c159 100644 --- a/port_ocean/helpers/retry.py +++ b/port_ocean/helpers/retry.py @@ -70,6 +70,7 @@ def __init__( retryable_methods: Iterable[str] | None = None, retry_status_codes: Iterable[int] | None = None, logger: Any | None = None, + mode: str = MetricPhase.LOAD, ) -> None: """ Initializes the instance of RetryTransport class with the given parameters. @@ -122,6 +123,7 @@ def __init__( self._jitter_ratio = jitter_ratio self._max_backoff_wait = max_backoff_wait self._logger = logger + self.mode = mode def handle_request(self, request: httpx.Request) -> httpx.Response: """ @@ -165,7 +167,8 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: else: response = await transport.handle_async_request(request) port_ocean.context.ocean.ocean.metrics.get_metric( - MetricType.REQUESTS[0], [response.status_code, request.url] + MetricType.REQUESTS[0], + [self.mode, str(response.status_code), str(request.url)], ).inc() return response @@ -291,9 +294,10 @@ async def _retry_operation_async( sleep_time = self._calculate_sleep(attempts_made, {}) self._log_before_retry(request, sleep_time, response, error) await asyncio.sleep(sleep_time) - if response.status_code == 429: + if response and response.status_code == 429: port_ocean.context.ocean.ocean.metrics.get_metric( - MetricType.RATE_LIMIT_WAIT[0], [MetricPhase.LOAD, request.url] + MetricType.RATE_LIMIT_WAIT[0], + [self.mode, MetricPhase.LOAD, str(request.url)], ).inc(sleep_time) error = None @@ -301,7 +305,8 @@ async def _retry_operation_async( try: response = await send_method(request) port_ocean.context.ocean.ocean.metrics.get_metric( - MetricType.REQUESTS[0], [response.status_code, request.url] + MetricType.REQUESTS[0], + [self.mode, str(response.status_code), str(request.url)], ).inc() response.request = request diff --git a/port_ocean/tests/test_metric.py b/port_ocean/tests/test_metric.py index 6665dfa1ba..b5ab8ee65f 100644 --- a/port_ocean/tests/test_metric.py +++ b/port_ocean/tests/test_metric.py @@ -1,4 +1,4 @@ -import json +import ast import pytest @@ -12,48 +12,168 @@ def test_metrics() -> None: delay = 2 batch_size = 400 total_objects = 2000 - magic_string = "integration metrics" + magic_string = "prometheus metrics |" + # Read the file with open(log_path, "r") as file: content = file.read() + # Ensure the magic string is present in the content assert magic_string in content, f"'{magic_string}' not found in {log_path}" + # Isolate and parse the JSON object after the magic string start_idx = content.rfind(magic_string) content_after_magic = content[start_idx + len(magic_string) :] + obj = ast.literal_eval(content_after_magic) - obj = json.loads(content_after_magic) - metrics = obj.get("metrics") - assert metrics, "No 'metrics' key found in the parsed JSON." + # ---------------------------------------------------------------------------- + # 1. Validate Extract Duration (using original delay/batch_size logic) + # ---------------------------------------------------------------------------- + num_batches = total_objects / batch_size # e.g., 2000 / 400 = 5 + expected_min_extract_duration = num_batches * delay # e.g., 5 * 2 = 10 - assert "fake-person" in metrics, "'fake-person' key missing in metrics data." - fake_person = metrics["fake-person"] + # Check "fake-person-1" extract duration is > expected_min_extract_duration + actual_extract_duration = obj.get("duration_seconds__fake-person-1__extract", 0) + assert round(actual_extract_duration) > round(expected_min_extract_duration), ( + f"Extract duration {actual_extract_duration} not greater than " + f"{expected_min_extract_duration}" + ) - extract = fake_person.get("extract") - load = fake_person.get("load") - transform = fake_person.get("transform") + # ---------------------------------------------------------------------------- + # 2. Check Durations for Both "fake-person-1" and "fake-department-0" + # ---------------------------------------------------------------------------- + # -- fake-person-1 + transform_duration_p1 = obj.get("duration_seconds__fake-person-1__transform", 0) + load_duration_p1 = obj.get("duration_seconds__fake-person-1__load", 0) + assert ( + transform_duration_p1 > 0 + ), f"Expected transform duration > 0, got {transform_duration_p1}" + assert load_duration_p1 > 0, f"Expected load duration > 0, got {load_duration_p1}" - num_batches = total_objects / batch_size - expected_min_extract_duration = num_batches * delay - assert round(extract["duration"]) > round(expected_min_extract_duration), ( - f"Extract duration {extract['duration']} not greater than " - f"{expected_min_extract_duration}" + # -- fake-department-0 + extract_duration_dept0 = obj.get("duration_seconds__fake-department-0__extract", 0) + transform_duration_dept0 = obj.get( + "duration_seconds__fake-department-0__transform", 0 + ) + load_duration_dept0 = obj.get("duration_seconds__fake-department-0__load", 0) + + assert ( + extract_duration_dept0 > 0 + ), f"Expected department extract duration > 0, got {extract_duration_dept0}" + assert ( + transform_duration_dept0 > 0 + ), f"Expected department transform duration > 0, got {transform_duration_dept0}" + assert ( + load_duration_dept0 > 0 + ), f"Expected department load duration > 0, got {load_duration_dept0}" + + # Optionally, check the "init__top_sort" duration too, if it's relevant: + init_top_sort = obj.get("duration_seconds__init__top_sort", 0) + assert init_top_sort >= 0, f"Expected init__top_sort >= 0, got {init_top_sort}" + + # ---------------------------------------------------------------------------- + # 3. Check Object Counts + # ---------------------------------------------------------------------------- + # -- fake-person-1 + person_extract_count = obj.get("object_count__fake-person-1__extract", 0) + person_load_count = obj.get("object_count__fake-person-1__load", 0) + assert person_extract_count == 2000.0, ( + f"Expected object_count__fake-person-1__extract=2000.0, " + f"got {person_extract_count}" + ) + assert person_load_count == 4000.0, ( + f"Expected object_count__fake-person-1__load=4000.0, " + f"got {person_load_count}" + ) + + # -- fake-department-0 + dept_extract_count = obj.get("object_count__fake-department-0__extract", 0) + dept_load_count = obj.get("object_count__fake-department-0__load", 0) + assert dept_extract_count == 5.0, ( + f"Expected object_count__fake-department-0__extract=5.0, " + f"got {dept_extract_count}" + ) + assert dept_load_count == 10.0, ( + f"Expected object_count__fake-department-0__load=10.0, " + f"got {dept_load_count}" ) - assert extract["object_count"] == total_objects + + # ---------------------------------------------------------------------------- + # 4. Check Input/Upserted Counts + # ---------------------------------------------------------------------------- + # -- fake-person-1 + input_count_p1 = obj.get("input_count__fake-person-1__load", 0) + upserted_count_p1 = obj.get("upserted_count__fake-person-1__load", 0) + assert ( + input_count_p1 == 2000.0 + ), f"Expected input_count__fake-person-1__load=2000.0, got {input_count_p1}" assert ( - extract.get("requests", {}).get("200") == num_batches - ), f"Expected 'requests.200' == {num_batches}, got {extract.get('requests', {}).get('200')}" + upserted_count_p1 == 2000.0 + ), f"Expected upserted_count__fake-person-1__load=2000.0, got {upserted_count_p1}" - assert load["object_count"] == total_objects + # -- fake-department-0 + input_count_dept0 = obj.get("input_count__fake-department-0__load", 0) + upserted_count_dept0 = obj.get("upserted_count__fake-department-0__load", 0) assert ( - load.get("requests", {}).get("200") == total_objects - ), f"Expected 'requests.200' == {total_objects}, got {load.get('requests', {}).get('200')}" + input_count_dept0 == 5.0 + ), f"Expected input_count__fake-department-0__load=5.0, got {input_count_dept0}" + assert ( + upserted_count_dept0 == 5.0 + ), f"Expected upserted_count__fake-department-0__load=5.0, got {upserted_count_dept0}" - assert transform["object_count"] == total_objects - assert transform["input_count"] == total_objects - assert transform["failed_count"] == 0 - assert transform["duration"] > 0 + # ---------------------------------------------------------------------------- + # 5. Check Error and Failed Counts + # ---------------------------------------------------------------------------- + # -- fake-person-1 + error_count_p1 = obj.get("error_count__fake-person-1__load", 0) + failed_count_p1 = obj.get("failed_count__fake-person-1__load", 0) + assert ( + error_count_p1 == 0.0 + ), f"Expected error_count__fake-person-1__load=0.0, got {error_count_p1}" + assert ( + failed_count_p1 == 0.0 + ), f"Expected failed_count__fake-person-1__load=0.0, got {failed_count_p1}" - assert extract["error_count"] == 0 - assert load["error_count"] == 0 - assert transform["error_count"] == 0 + # -- fake-department-0 + error_count_dept0 = obj.get("error_count__fake-department-0__load", 0) + failed_count_dept0 = obj.get("failed_count__fake-department-0__load", 0) + assert ( + error_count_dept0 == 0.0 + ), f"Expected error_count__fake-department-0__load=0.0, got {error_count_dept0}" + assert ( + failed_count_dept0 == 0.0 + ), f"Expected failed_count__fake-department-0__load=0.0, got {failed_count_dept0}" + + # ---------------------------------------------------------------------------- + # 6. Check HTTP Request Counts (200s) + # ---------------------------------------------------------------------------- + # Example: we confirm certain request counters match the sample data provided: + assert ( + obj.get( + "http_requests_count__http://host.docker.internal:5555/v1/auth/access_token__init__load__200", + 0, + ) + == 1.0 + ), "Expected 1.0 for auth access_token 200 requests" + assert ( + obj.get( + "http_requests_count__http://host.docker.internal:5555/v1/integration/smoke-test-integration__init__load__200", + 0, + ) + == 5.0 + ), "Expected 5.0 for integration/smoke-test-integration 200 requests" + assert ( + obj.get( + "http_requests_count__http://localhost:8000/integration/department/hr/employees?limit=-1&entity_kb_size=1&latency=2000__fake-person-1__extract__200", + 0, + ) + == 1.0 + ), "Expected 1.0 for hr/employees?limit=-1 extract 200 requests" + expected_requests = { + "http_requests_count__http://localhost:8000/integration/department/marketing/employees?limit=-1&entity_kb_size=1&latency=2000__fake-person-1__extract__200": 1.0, + "http_requests_count__http://localhost:8000/integration/department/finance/employees?limit=-1&entity_kb_size=1&latency=2000__fake-person-1__extract__200": 1.0, + } + for key, expected_val in expected_requests.items(): + assert ( + obj.get(key, 0) == expected_val + ), f"Expected {expected_val} for '{key}', got {obj.get(key)}" diff --git a/port_ocean/utils/async_http.py b/port_ocean/utils/async_http.py index a96c497a41..63734f3809 100644 --- a/port_ocean/utils/async_http.py +++ b/port_ocean/utils/async_http.py @@ -1,4 +1,5 @@ import httpx +from port_ocean.helpers.metric.metric import MetricPhase from werkzeug.local import LocalStack, LocalProxy from port_ocean.context.ocean import ocean @@ -11,14 +12,18 @@ def _get_http_client_context() -> httpx.AsyncClient: client = _http_client.top if client is None: - client = OceanAsyncClient(RetryTransport, timeout=ocean.config.client_timeout) + client = OceanAsyncClient( + RetryTransport, + timeout=ocean.config.client_timeout, + mode=MetricPhase.EXTRACT, + ) _http_client.push(client) return client """ -Utilize this client for all outbound integration requests to the third-party application. It functions as a wrapper +Utilize this client for all outbound integration requests to the third-party application. It functions as a wrapper around the httpx.AsyncClient, incorporating retry logic at the transport layer for handling retries on 5xx errors and connection errors. diff --git a/scripts/run-metric-test.sh b/scripts/run-metric-test.sh index 621db82b0f..20215e2186 100755 --- a/scripts/run-metric-test.sh +++ b/scripts/run-metric-test.sh @@ -10,13 +10,11 @@ export OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS=$latency_ms export OCEAN__METRICS="1" export ENTITY_AMOUNT=45000 export THIRD_PARTY_BATCH_SIZE=500 - make -f "$ROOT_DIR/Makefile" build make -f "$ROOT_DIR/Makefile" smoke/start-mock-api -$SCRIPT_BASE/run-smoke-test.sh -cat $TEMP_DIR/metric.log +$SCRIPT_BASE/run-smoke-test.sh| grep 'prometheus metric' > $TEMP_DIR/metric.log python -m pytest -m metric make -f "$ROOT_DIR/Makefile" smoke/stop-mock-api diff --git a/scripts/run-smoke-test.sh b/scripts/run-smoke-test.sh index bfb8fa264c..4695e05ecc 100755 --- a/scripts/run-smoke-test.sh +++ b/scripts/run-smoke-test.sh @@ -49,6 +49,7 @@ docker run --rm -i \ -e OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS="${OCEAN__INTEGRATION__CONFIG__THIRD_PARTY_LATENCY_MS:--1}" \ -e OCEAN__METRICS="${OCEAN__METRICS:--1}" \ -e OCEAN__RESOURCES_PATH="/opt/port-resources" \ + -e APPLICATION__LOG_LEVEL="DEBUG" \ --name=ZOMG-TEST \ "ghcr.io/port-labs/port-ocean-fake-integration:${FAKE_INTEGRATION_VERSION}" \ -c "source ./.venv/bin/activate && pip install --root-user-action=ignore /opt/dist/${TAR_FILE}[cli] && ocean sail -O"