Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add metrics for ocean core #1260

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ bump/single-integration:

# run a mock port api server for perf / smoke tests
smoke/start-mock-api:
$(ACTIVATE) && SMOKE_TEST_SUFFIX=$${SMOKE_TEST_SUFFIX:-default_value} python ./port_ocean/tests/helpers/fake_port_api.py &
$(ACTIVATE) && SMOKE_TEST_SUFFIX=$${SMOKE_TEST_SUFFIX:-default_value} python ./port_ocean/tests/helpers/fake_port_api.py > /dev/null &

smoke/stop-mock-api:
ps aux | grep fake_port_api | egrep -v grep | awk '{print $$2};' | xargs kill -9
16 changes: 15 additions & 1 deletion port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
handle_status_code,
PORT_HTTP_MAX_CONNECTIONS_LIMIT,
)
from port_ocean.core.models import Entity, PortAPIErrorMessage
from port_ocean.context import event
from port_ocean.core.models import Entity
from port_ocean.helpers.metric import MetricFieldType, MetricType, metric
from port_ocean.core.models import PortAPIErrorMessage
from starlette import status


Expand Down Expand Up @@ -72,6 +75,8 @@ async def upsert_entity(
extensions={"retryable": True},
)
if response.is_error:
await event.event.increment_metric(MetricFieldType.ERROR_COUNT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await event.event.increment_metric(MetricFieldType.ERROR_COUNT)
await event.event.increment_metric(MetricFieldType.PORT_API_ERROR_COUNT)


logger.error(
f"Error {'Validating' if validation_only else 'Upserting'} "
f"entity: {entity.identifier} of "
Expand All @@ -96,6 +101,8 @@ async def upsert_entity(
# Happens when upsert fails and search identifier is defined.
# We return None to ignore the entity later in the delete process
if result_entity.is_using_search_identifier:
if not response.is_error:
await event.event.increment_metric(MetricFieldType.ERROR_COUNT)
return None

# In order to save memory we'll keep only the identifier, blueprint and relations of the
Expand All @@ -111,9 +118,12 @@ async def upsert_entity(
key: None if isinstance(relation, dict) else relation
for key, relation in result_entity.relations.items()
}
await event.event.increment_metric(MetricFieldType.OBJECT_COUNT)
await event.event.increment_metric(MetricFieldType.UPSERTED)

return reduced_entity

@metric(MetricType.LOAD)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be on top of the upsert_entity method rather than batch_upsert_entities ?

async def batch_upsert_entities(
self,
entities: list[Entity],
Expand Down Expand Up @@ -145,6 +155,7 @@ async def batch_upsert_entities(

return entity_results

@metric(MetricType.LOAD)
async def delete_entity(
self,
entity: Entity,
Expand All @@ -167,6 +178,7 @@ async def delete_entity(
)

if response.is_error:
await event.event.increment_metric(MetricFieldType.ERROR_COUNT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this kind of error count be in the upsert_entity as well?

if response.status_code == 404:
logger.info(
f"Failed to delete entity: {entity.identifier} of blueprint: {entity.blueprint},"
Expand All @@ -180,6 +192,8 @@ async def delete_entity(
)

handle_status_code(response, should_raise)
await event.event.increment_metric(MetricFieldType.OBJECT_COUNT)
await event.event.increment_metric(MetricFieldType.DELETED)

async def batch_delete_entities(
self,
Expand Down
7 changes: 7 additions & 0 deletions port_ocean/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
)
runtime: Runtime = Runtime.OnPrem
resources_path: str = Field(default=".port/resources")
metrics: bool = Field(default=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to have the type as some kind of enum for cases where it won't be only true or false but rather might want to only monitor extract metrics


@root_validator()
def validate_integration_config(cls, values: dict[str, Any]) -> dict[str, Any]:
Expand All @@ -101,6 +102,12 @@ def parse_config(model: Type[BaseModel], config: Any) -> BaseModel:

return values

@validator("metrics")
def validate_metrics(cls, value: str | bool) -> bool:
if value == "1" or value is True:
return True
return False

@validator("runtime")
def validate_runtime(cls, runtime: Runtime) -> Runtime:
if runtime.is_saas_runtime:
Expand Down
55 changes: 50 additions & 5 deletions port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from uuid import uuid4

from loguru import logger
from port_ocean.context import ocean
from port_ocean.helpers.metric import MetricAggregator
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from pydispatch import dispatcher # type: ignore
from werkzeug.local import LocalStack, LocalProxy
Expand All @@ -40,6 +42,7 @@ class EventType:
START = "start"
RESYNC = "resync"
HTTP_REQUEST = "http_request"
METRIC = "metric"


@dataclass
Expand All @@ -52,6 +55,7 @@ class EventContext:
_parent_event: Optional["EventContext"] = None
_event_id: str = field(default_factory=lambda: str(uuid4()))
_on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list)
_metric_aggregator: Optional["MetricAggregator"] = None
entity_topological_sorter: EntityTopologicalSorter = field(
default_factory=EntityTopologicalSorter
)
Expand All @@ -72,6 +76,34 @@ def abort(self) -> None:
)
self._aborted = True

async def flush_metric_logs(self) -> None:
if not self.should_record_metrics:
return
if event._metric_aggregator:
await event._metric_aggregator.flush()

async def increment_status(self, status_code: str) -> None:
if not self.should_record_metrics:
return
try:
if self._metric_aggregator:
await self._metric_aggregator.increment_status(status_code)
except Exception:
pass

async def increment_metric(self, metric: str, amount: int | float = 1) -> None:
if not self.should_record_metrics:
return
try:
if self._metric_aggregator:
await self._metric_aggregator.increment_field(metric, amount)
except Exception:
pass

@property
def should_record_metrics(self) -> bool:
return ocean.ocean.config.metrics

@property
def aborted(self) -> bool:
return self._aborted
Expand Down Expand Up @@ -141,13 +173,21 @@ async def event_context(
)

attributes = {**parent_attributes, **(attributes or {})}

aggregator = (
parent._metric_aggregator
if parent and parent._metric_aggregator
else MetricAggregator()
)

new_event = EventContext(
event_type,
trigger_type=trigger_type,
attributes=attributes,
_parent_event=parent,
# inherit port app config from parent event, so it can be used in nested events
_port_app_config=parent.port_app_config if parent else None,
_metric_aggregator=aggregator,
entity_topological_sorter=entity_topological_sorter,
)
_event_context_stack.push(new_event)
Expand All @@ -163,6 +203,7 @@ def _handle_event(triggering_event_id: int) -> None:
dispatcher.connect(_handle_event, event_type)
dispatcher.send(event_type, triggering_event_id=event.id)

is_silent = EventType.METRIC == event_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is needed for?

start_time = get_time(seconds_precision=False)
with logger.contextualize(
event_trigger_type=event.trigger_type,
Expand All @@ -173,7 +214,7 @@ def _handle_event(triggering_event_id: int) -> None:
event.resource_config.kind if event.resource_config else None
),
):
logger.info("Event started")
logger.info("Event started") if not is_silent else None
try:
yield event
except:
Expand All @@ -184,10 +225,14 @@ def _handle_event(triggering_event_id: int) -> None:
finally:
end_time = get_time(seconds_precision=False)
time_elapsed = round(end_time - start_time, 5)
logger.bind(
success=success,
time_elapsed=time_elapsed,
).info("Event finished")
(
logger.bind(
success=success,
time_elapsed=time_elapsed,
).info("Event finished")
if not is_silent
else None
)

dispatcher.disconnect(_handle_event, event_type)

Expand Down
20 changes: 18 additions & 2 deletions port_ocean/core/handlers/entity_processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

from loguru import logger

from port_ocean.context import event
from port_ocean.core.handlers.base import BaseHandler
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
from port_ocean.core.ocean_types import (
RAW_ITEM,
CalculationResult,
EntitySelectorDiff,
)
from port_ocean.helpers.metric import MetricFieldType, MetricType, metric


class BaseEntityProcessor(BaseHandler):
Expand All @@ -30,6 +32,7 @@ async def _parse_items(
) -> CalculationResult:
pass

@metric(MetricType.TRANSFORM)
async def parse_items(
self,
mapping: ResourceConfig,
Expand All @@ -51,7 +54,20 @@ async def parse_items(
with logger.contextualize(kind=mapping.kind):
if not raw_data:
return CalculationResult(EntitySelectorDiff([], []), [])

return await self._parse_items(
result = await self._parse_items(
mapping, raw_data, parse_all, send_raw_data_examples_amount
)
await event.event.increment_metric(
MetricFieldType.INPUT_COUNT, len(raw_data)
)
await event.event.increment_metric(
MetricFieldType.OBJECT_COUNT, len(result.entity_selector_diff.passed)
)
await event.event.increment_metric(
MetricFieldType.FAILED, len(result.entity_selector_diff.failed)
)
await event.event.increment_metric(
MetricFieldType.ERROR_COUNT, len(result.errors)
)

return result
7 changes: 5 additions & 2 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from port_ocean.core.utils.utils import zip_and_sum, gather_and_split_errors_from_results
from port_ocean.exceptions.core import OceanAbortException
from port_ocean.helpers.metric import MetricFieldType, MetricType, metric, timed_generator

SEND_RAW_DATA_EXAMPLES_AMOUNT = 5

Expand Down Expand Up @@ -84,6 +85,7 @@ def _collect_resync_functions(

return fns

@metric(MetricType.EXTRACT)
async def _execute_resync_tasks(
self,
fns: list[Callable[[str], Awaitable[RAW_RESULT]]],
Expand Down Expand Up @@ -129,7 +131,6 @@ async def _calculate_raw(
for mapping, results in raw_diff
)
)

async def _register_resource_raw(
self,
resource: ResourceConfig,
Expand Down Expand Up @@ -196,7 +197,7 @@ async def _register_in_batches(

for generator in async_generators:
try:
async for items in generator:
async for items in timed_generator(generator):
if send_raw_data_examples_amount > 0:
send_raw_data_examples_amount = max(
0, send_raw_data_examples_amount - len(passed_entities)
Expand Down Expand Up @@ -472,6 +473,7 @@ async def sync_raw_all(
creation_results.append(await task)

await self.sort_and_upsert_failed_entities(user_agent_type)

except asyncio.CancelledError as e:
logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state")
raise
Expand Down Expand Up @@ -500,6 +502,7 @@ async def sync_raw_all(

logger.error(message, exc_info=error_group)
else:
await event.flush_metric_logs()
logger.info(
f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}"
)
Expand Down
Loading
Loading