-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Add metrics for ocean core #1260
base: main
Are you sure you want to change the base?
Changes from 5 commits
46d7860
a1df9b8
daf9cf0
9a46af8
1d79694
9d9e6ab
2577d9a
075a6ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,10 @@ | |
handle_status_code, | ||
PORT_HTTP_MAX_CONNECTIONS_LIMIT, | ||
) | ||
from port_ocean.core.models import Entity, PortAPIErrorMessage | ||
from port_ocean.context import event | ||
from port_ocean.core.models import Entity | ||
from port_ocean.helpers.metric import MetricFieldType, MetricType, metric | ||
from port_ocean.core.models import PortAPIErrorMessage | ||
from starlette import status | ||
|
||
|
||
|
@@ -72,6 +75,8 @@ async def upsert_entity( | |
extensions={"retryable": True}, | ||
) | ||
if response.is_error: | ||
await event.event.increment_metric(MetricFieldType.ERROR_COUNT) | ||
|
||
logger.error( | ||
f"Error {'Validating' if validation_only else 'Upserting'} " | ||
f"entity: {entity.identifier} of " | ||
|
@@ -96,6 +101,8 @@ async def upsert_entity( | |
# Happens when upsert fails and search identifier is defined. | ||
# We return None to ignore the entity later in the delete process | ||
if result_entity.is_using_search_identifier: | ||
if not response.is_error: | ||
await event.event.increment_metric(MetricFieldType.ERROR_COUNT) | ||
return None | ||
|
||
# In order to save memory we'll keep only the identifier, blueprint and relations of the | ||
|
@@ -111,9 +118,12 @@ async def upsert_entity( | |
key: None if isinstance(relation, dict) else relation | ||
for key, relation in result_entity.relations.items() | ||
} | ||
await event.event.increment_metric(MetricFieldType.OBJECT_COUNT) | ||
await event.event.increment_metric(MetricFieldType.UPSERTED) | ||
|
||
return reduced_entity | ||
|
||
@metric(MetricType.LOAD) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't it be on top of the |
||
async def batch_upsert_entities( | ||
self, | ||
entities: list[Entity], | ||
|
@@ -145,6 +155,7 @@ async def batch_upsert_entities( | |
|
||
return entity_results | ||
|
||
@metric(MetricType.LOAD) | ||
async def delete_entity( | ||
self, | ||
entity: Entity, | ||
|
@@ -167,6 +178,7 @@ async def delete_entity( | |
) | ||
|
||
if response.is_error: | ||
await event.event.increment_metric(MetricFieldType.ERROR_COUNT) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this kind of error count be in the |
||
if response.status_code == 404: | ||
logger.info( | ||
f"Failed to delete entity: {entity.identifier} of blueprint: {entity.blueprint}," | ||
|
@@ -180,6 +192,8 @@ async def delete_entity( | |
) | ||
|
||
handle_status_code(response, should_raise) | ||
await event.event.increment_metric(MetricFieldType.OBJECT_COUNT) | ||
await event.event.increment_metric(MetricFieldType.DELETED) | ||
|
||
async def batch_delete_entities( | ||
self, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,6 +79,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow): | |
) | ||
runtime: Runtime = Runtime.OnPrem | ||
resources_path: str = Field(default=".port/resources") | ||
metrics: bool = Field(default=False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest to have the type as some kind of enum for cases where it won't be only |
||
|
||
@root_validator() | ||
def validate_integration_config(cls, values: dict[str, Any]) -> dict[str, Any]: | ||
|
@@ -101,6 +102,12 @@ def parse_config(model: Type[BaseModel], config: Any) -> BaseModel: | |
|
||
return values | ||
|
||
@validator("metrics") | ||
def validate_metrics(cls, value: str | bool) -> bool: | ||
if value == "1" or value is True: | ||
return True | ||
return False | ||
|
||
@validator("runtime") | ||
def validate_runtime(cls, runtime: Runtime) -> Runtime: | ||
if runtime.is_saas_runtime: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,8 @@ | |
from uuid import uuid4 | ||
|
||
from loguru import logger | ||
from port_ocean.context import ocean | ||
from port_ocean.helpers.metric import MetricAggregator | ||
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter | ||
from pydispatch import dispatcher # type: ignore | ||
from werkzeug.local import LocalStack, LocalProxy | ||
|
@@ -40,6 +42,7 @@ class EventType: | |
START = "start" | ||
RESYNC = "resync" | ||
HTTP_REQUEST = "http_request" | ||
METRIC = "metric" | ||
|
||
|
||
@dataclass | ||
|
@@ -52,6 +55,7 @@ class EventContext: | |
_parent_event: Optional["EventContext"] = None | ||
_event_id: str = field(default_factory=lambda: str(uuid4())) | ||
_on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) | ||
_metric_aggregator: Optional["MetricAggregator"] = None | ||
entity_topological_sorter: EntityTopologicalSorter = field( | ||
default_factory=EntityTopologicalSorter | ||
) | ||
|
@@ -72,6 +76,34 @@ def abort(self) -> None: | |
) | ||
self._aborted = True | ||
|
||
async def flush_metric_logs(self) -> None: | ||
if not self.should_record_metrics: | ||
return | ||
if event._metric_aggregator: | ||
await event._metric_aggregator.flush() | ||
|
||
async def increment_status(self, status_code: str) -> None: | ||
if not self.should_record_metrics: | ||
return | ||
try: | ||
if self._metric_aggregator: | ||
await self._metric_aggregator.increment_status(status_code) | ||
except Exception: | ||
pass | ||
|
||
async def increment_metric(self, metric: str, amount: int | float = 1) -> None: | ||
if not self.should_record_metrics: | ||
return | ||
try: | ||
if self._metric_aggregator: | ||
await self._metric_aggregator.increment_field(metric, amount) | ||
except Exception: | ||
pass | ||
|
||
@property | ||
def should_record_metrics(self) -> bool: | ||
return ocean.ocean.config.metrics | ||
|
||
@property | ||
def aborted(self) -> bool: | ||
return self._aborted | ||
|
@@ -141,13 +173,21 @@ async def event_context( | |
) | ||
|
||
attributes = {**parent_attributes, **(attributes or {})} | ||
|
||
aggregator = ( | ||
parent._metric_aggregator | ||
if parent and parent._metric_aggregator | ||
else MetricAggregator() | ||
) | ||
|
||
new_event = EventContext( | ||
event_type, | ||
trigger_type=trigger_type, | ||
attributes=attributes, | ||
_parent_event=parent, | ||
# inherit port app config from parent event, so it can be used in nested events | ||
_port_app_config=parent.port_app_config if parent else None, | ||
_metric_aggregator=aggregator, | ||
entity_topological_sorter=entity_topological_sorter, | ||
) | ||
_event_context_stack.push(new_event) | ||
|
@@ -163,6 +203,7 @@ def _handle_event(triggering_event_id: int) -> None: | |
dispatcher.connect(_handle_event, event_type) | ||
dispatcher.send(event_type, triggering_event_id=event.id) | ||
|
||
is_silent = EventType.METRIC == event_type | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this is needed for? |
||
start_time = get_time(seconds_precision=False) | ||
with logger.contextualize( | ||
event_trigger_type=event.trigger_type, | ||
|
@@ -173,7 +214,7 @@ def _handle_event(triggering_event_id: int) -> None: | |
event.resource_config.kind if event.resource_config else None | ||
), | ||
): | ||
logger.info("Event started") | ||
logger.info("Event started") if not is_silent else None | ||
try: | ||
yield event | ||
except: | ||
|
@@ -184,10 +225,14 @@ def _handle_event(triggering_event_id: int) -> None: | |
finally: | ||
end_time = get_time(seconds_precision=False) | ||
time_elapsed = round(end_time - start_time, 5) | ||
logger.bind( | ||
success=success, | ||
time_elapsed=time_elapsed, | ||
).info("Event finished") | ||
( | ||
logger.bind( | ||
success=success, | ||
time_elapsed=time_elapsed, | ||
).info("Event finished") | ||
if not is_silent | ||
else None | ||
) | ||
|
||
dispatcher.disconnect(_handle_event, event_type) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.