Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CVAT][Exchange Oracle] Data cleanup #2428

Merged
merged 29 commits into from
Sep 5, 2024
Merged
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
67d2056
[Exchange Oracle] Refactor webhooks handling
Bobronium Aug 21, 2024
b5295b4
[Exchange Oracle] Add `send_webhook` function
Bobronium Aug 21, 2024
a083272
[Exchange Oracle] Add `StorageClient.remove_files`
Bobronium Aug 21, 2024
310ea8c
[Exchange Oracle] Support creating webhooks from reputation oracle
Bobronium Aug 21, 2024
7201d5b
[Exchange Oracle] Support `escrow_finished` event handling from Reput…
Bobronium Aug 21, 2024
cc4c240
[Exchange Oracle] Cleanup on escrow cancellation and failing escrow c…
Bobronium Aug 22, 2024
5518ab4
[Exchange Oracle] escrow_finished -> escrow_completed, finished -> de…
Bobronium Aug 22, 2024
e0a2a49
[Exchange Oracle] Ignore NotFoundException when deleting cloudstorage…
Bobronium Aug 26, 2024
d311882
[Exchange Oracle] Reformat state_trackers.py
Bobronium Aug 26, 2024
f11ae53
[Exchange Oracle] Fix various issues
Bobronium Aug 28, 2024
9260e75
[Exchange Oracle] Simplify @cron_job creation and handling
Bobronium Aug 28, 2024
673adda
[Exchange Oracle] Refactor crons structure
Bobronium Aug 28, 2024
8d342de
[Exchange Oracle] Delete files by prefix
Bobronium Aug 28, 2024
b161726
[Exchange Oracle] Don't change status of the tasks and jobs during cl…
Bobronium Aug 28, 2024
473a7b0
[Exchange Oracle] Use EscrowCleaner -> cleanup_escrow
Bobronium Aug 28, 2024
8b9def1
[Exchange Oracle] Apply pre-commit fixes
Bobronium Aug 28, 2024
a82ff12
[Exchange Oracle] Enable F401
Bobronium Aug 28, 2024
b64ae78
[Exchange Oracle] Delete projects by escrow_address and chain_id
Bobronium Aug 28, 2024
db72a27
[Exchange Oracle] Add handle_webhook on_fail
Bobronium Aug 29, 2024
907b0d2
[Exchange Oracle] Add missing keys in .env.template
Bobronium Aug 29, 2024
b226239
[Exchange Oracle] Add missing keys in .env.template
Bobronium Aug 29, 2024
1c68f87
[Exchange Oracle] Add missing `reputation_oracle` to `sender_events_m…
Bobronium Aug 29, 2024
329c158
Update packages/examples/cvat/exchange-oracle/src/core/oracle_events.py
Bobronium Sep 2, 2024
9233a90
Always clean up escrow storage
Bobronium Sep 2, 2024
8b93823
[Exchange Oracle] Delete cloud storages after projects have been deleted
Bobronium Sep 2, 2024
ac4b604
[Exchange Oracle] Ignore API errors while deleting the projects/cloud…
Bobronium Sep 2, 2024
53a812f
[Exchange Oracle] Add tests for exception handling and retries
Bobronium Sep 2, 2024
dec7b4f
[Exchange Oracle] Rename log_error -> _log_error
Bobronium Sep 3, 2024
c909589
[Exchange Oracle] Fix incomplete rename
Bobronium Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[Exchange Oracle] Add handle_webhook on_fail
Bobronium committed Sep 2, 2024
commit db72a27203114c58fe477b47b3e053e67779e1ac
Original file line number Diff line number Diff line change
@@ -13,7 +13,13 @@


@contextmanager
def handle_webhook(logger: logging.Logger, session: Session, webhook: Webhook):
def handle_webhook(
logger: logging.Logger,
session: Session,
webhook: Webhook,
*,
on_fail: Callable[[Session, Webhook, Exception], None] = lambda _s, _w, _e: None,
):
logger.debug(
"Processing webhook "
f"{webhook.type}.{webhook.event_type}~{webhook.signature} "
@@ -26,7 +32,14 @@ def handle_webhook(logger: logging.Logger, session: Session, webhook: Webhook):
except Exception as e:
savepoint.rollback()
logger.exception(f"Webhook {webhook.id} sending failed: {e}")
webhook_service.outbox.handle_webhook_fail(session, webhook.id)
savepoint = session.begin_nested()
try:
on_fail(session, webhook, e)
except Exception:
savepoint.rollback()
raise
finally:
webhook_service.outbox.handle_webhook_fail(session, webhook.id)
else:
webhook_service.outbox.handle_webhook_success(session, webhook.id)
logger.debug("Webhook handled successfully")
Original file line number Diff line number Diff line change
@@ -16,12 +16,30 @@
from src.core.types import JobLauncherEventTypes, Networks, OracleWebhookTypes, ProjectStatuses
from src.crons._cron_job import cron_job
from src.crons.webhooks._common import handle_webhook, process_outgoing_webhooks
from src.db import SessionLocal
from src.db.utils import ForUpdateParams
from src.handlers.escrow_cleanup import cleanup_escrow
from src.models.webhook import Webhook


def handle_failure(session: Session, webhook: Webhook, exc: Exception) -> None:
if (
webhook.event_type == JobLauncherEventTypes.escrow_created
and webhook.attempts + 1 >= Config.webhook_max_retries
):
logging.error(
f"Exceeded maximum retries for {webhook.escrow_address=} creation. "
f"Notifying job launcher."
)
# TODO: think about unifying this further
oracle_db_service.outbox.create_webhook(
session=session,
escrow_address=webhook.escrow_address,
chain_id=webhook.chain_id,
type=OracleWebhookTypes.job_launcher,
event=ExchangeOracleEvent_TaskCreationFailed(reason=str(exc)),
)


@cron_job
def process_incoming_job_launcher_webhooks(logger: logging.Logger, session: Session):
"""
@@ -35,7 +53,7 @@ def process_incoming_job_launcher_webhooks(logger: logging.Logger, session: Sess
)

for webhook in webhooks:
with handle_webhook(logger, session, webhook):
with handle_webhook(logger, session, webhook, on_fail=handle_failure):
handle_job_launcher_event(webhook, db_session=session, logger=logger)


@@ -67,7 +85,7 @@ def handle_job_launcher_event(webhook: Webhook, *, db_session: Session, logger:

cvat.create_task(webhook.escrow_address, webhook.chain_id)

except Exception as ex:
except Exception:
projects = cvat_db_service.get_projects_by_escrow_address(
db_session, webhook.escrow_address
)
@@ -76,20 +94,6 @@ def handle_job_launcher_event(webhook: Webhook, *, db_session: Session, logger:
cvat_db_service.delete_projects(
db_session, webhook.escrow_address, webhook.chain_id
)

# We should not notify before the webhook handling attempts have expired
if webhook.attempts + 1 >= Config.webhook_max_retries:
# new session is required since exception might roll back the current session
# creation of this webhook should not depend on this
with SessionLocal.begin() as session:
oracle_db_service.outbox.create_webhook(
session=session,
escrow_address=webhook.escrow_address,
chain_id=webhook.chain_id,
type=OracleWebhookTypes.job_launcher,
event=ExchangeOracleEvent_TaskCreationFailed(reason=str(ex)),
)

raise

case JobLauncherEventTypes.escrow_canceled: