Skip to content

Commit

Permalink
[Core] Fix resync issue when using search identifier in relation sc…
Browse files Browse the repository at this point in the history
…hema (#890)
  • Loading branch information
Tankilevitch authored Aug 11, 2024
1 parent 2e169bf commit af64535
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 51 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.9.12 (2024-08-06)

### Bug Fixes

- Fixed resync issue when calculating the diff of entities failed due to search identifier in relation mapping


## 0.9.11 (2024-08-05)


Expand Down
61 changes: 31 additions & 30 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 21 additions & 4 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def upsert_entity(
request_options: RequestOptions,
user_agent_type: UserAgentType | None = None,
should_raise: bool = True,
) -> None:
) -> Entity:
validation_only = request_options["validation_only"]
async with self.semaphore:
logger.debug(
Expand Down Expand Up @@ -57,15 +57,21 @@ async def upsert_entity(
f"blueprint: {entity.blueprint}"
)
handle_status_code(response, should_raise)
result = response.json()
result_entity = Entity.parse_obj(result)
# Set the results of the search relation and identifier to the entity
entity.identifier = result_entity.identifier or entity.identifier
entity.relations = result_entity.relations or entity.relations
return entity

async def batch_upsert_entities(
self,
entities: list[Entity],
request_options: RequestOptions,
user_agent_type: UserAgentType | None = None,
should_raise: bool = True,
) -> None:
await asyncio.gather(
) -> list[Entity]:
modified_entities_results = await asyncio.gather(
*(
self.upsert_entity(
entity,
Expand All @@ -75,8 +81,19 @@ async def batch_upsert_entities(
)
for entity in entities
),
return_exceptions=True,
return_exceptions=should_raise,
)
entity_results = [
entity for entity in modified_entities_results if isinstance(entity, Entity)
]
if not should_raise:
return entity_results

for entity_result in modified_entities_results:
if isinstance(entity_result, Exception):
raise entity_result

return entity_results

async def delete_entity(
self,
Expand Down
5 changes: 4 additions & 1 deletion port_ocean/core/handlers/entities_state_applier/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ async def delete_diff(
@abstractmethod
async def upsert(
self, entities: list[Entity], user_agent_type: UserAgentType
) -> None:
) -> list[Entity]:
"""Upsert (insert or update) the given entities into the state.
Args:
entities (list[Entity]): The entities to be upserted.
user_agent_type (UserAgentType): The user agent responsible for the upsert.
Returns:
list[Entity]: The upserted entities.
"""
pass

Expand Down
23 changes: 13 additions & 10 deletions port_ocean/core/handlers/entities_state_applier/port/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ async def apply_diff(
logger.info(
f"Updating entity diff (created: {len(diff.created)}, deleted: {len(diff.deleted)}, modified: {len(diff.modified)})"
)
await self.upsert(kept_entities, user_agent_type)
modified_entities = await self.upsert(kept_entities, user_agent_type)

await self._safe_delete(diff.deleted, kept_entities, user_agent_type)
await self._safe_delete(diff.deleted, modified_entities, user_agent_type)

async def delete_diff(
self,
Expand All @@ -95,10 +95,11 @@ async def delete_diff(

async def upsert(
self, entities: list[Entity], user_agent_type: UserAgentType
) -> None:
) -> list[Entity]:
logger.info(f"Upserting {len(entities)} entities")
modified_entities: list[Entity] = []
if event.port_app_config.create_missing_related_entities:
await self.context.port_client.batch_upsert_entities(
modified_entities = await self.context.port_client.batch_upsert_entities(
entities,
event.port_app_config.get_port_request_options(),
user_agent_type,
Expand All @@ -108,14 +109,16 @@ async def upsert(
ordered_created_entities = reversed(
order_by_entities_dependencies(entities)
)

for entity in ordered_created_entities:
await self.context.port_client.upsert_entity(
entity,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
modified_entities.append(
await self.context.port_client.upsert_entity(
entity,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
)
)
return modified_entities

async def delete(
self, entities: list[Entity], user_agent_type: UserAgentType
Expand Down
6 changes: 4 additions & 2 deletions port_ocean/core/integrations/mixins/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ async def sync(
"""
entities_at_port = await ocean.port_client.search_entities(user_agent_type)

await self.entities_state_applier.upsert(entities, user_agent_type)
modified_entities = await self.entities_state_applier.upsert(
entities, user_agent_type
)
await self.entities_state_applier.delete_diff(
{"before": entities_at_port, "after": entities}, user_agent_type
{"before": entities_at_port, "after": modified_entities}, user_agent_type
)

logger.info("Finished syncing change")
8 changes: 5 additions & 3 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ async def _register_resource_raw(
objects_diff = await self._calculate_raw(
[(resource, results)], parse_all, send_raw_data_examples_amount
)
await self.entities_state_applier.upsert(
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)

return objects_diff[0]
return CalculationResult(
objects_diff[0].entity_selector_diff._replace(passed=modified_objects),
errors=objects_diff[0].errors,
)

async def _unregister_resource_raw(
self,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.9.11"
version = "0.9.12"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit af64535

Please sign in to comment.