Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CVAT][Exchange Oracle] Data cleanup #2428

Merged
merged 29 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
67d2056
[Exchange Oracle] Refactor webhooks handling
Bobronium Aug 21, 2024
b5295b4
[Exchange Oracle] Add `send_webhook` function
Bobronium Aug 21, 2024
a083272
[Exchange Oracle] Add `StorageClient.remove_files`
Bobronium Aug 21, 2024
310ea8c
[Exchange Oracle] Support creating webhooks from reputation oracle
Bobronium Aug 21, 2024
7201d5b
[Exchange Oracle] Support `escrow_finished` event handling from Reput…
Bobronium Aug 21, 2024
cc4c240
[Exchange Oracle] Cleanup on escrow cancellation and failing escrow c…
Bobronium Aug 22, 2024
5518ab4
[Exchange Oracle] escrow_finished -> escrow_completed, finished -> de…
Bobronium Aug 22, 2024
e0a2a49
[Exchange Oracle] Ignore NotFoundException when deleting cloudstorage…
Bobronium Aug 26, 2024
d311882
[Exchange Oracle] Reformat state_trackers.py
Bobronium Aug 26, 2024
f11ae53
[Exchange Oracle] Fix various issues
Bobronium Aug 28, 2024
9260e75
[Exchange Oracle] Simplify @cron_job creation and handling
Bobronium Aug 28, 2024
673adda
[Exchange Oracle] Refactor crons structure
Bobronium Aug 28, 2024
8d342de
[Exchange Oracle] Delete files by prefix
Bobronium Aug 28, 2024
b161726
[Exchange Oracle] Don't change status of the tasks and jobs during cl…
Bobronium Aug 28, 2024
473a7b0
[Exchange Oracle] Use EscrowCleaner -> cleanup_escrow
Bobronium Aug 28, 2024
8b9def1
[Exchange Oracle] Apply pre-commit fixes
Bobronium Aug 28, 2024
a82ff12
[Exchange Oracle] Enable F401
Bobronium Aug 28, 2024
b64ae78
[Exchange Oracle] Delete projects by escrow_address and chain_id
Bobronium Aug 28, 2024
db72a27
[Exchange Oracle] Add handle_webhook on_fail
Bobronium Aug 29, 2024
907b0d2
[Exchange Oracle] Add missing keys in .env.template
Bobronium Aug 29, 2024
b226239
[Exchange Oracle] Add missing keys in .env.template
Bobronium Aug 29, 2024
1c68f87
[Exchange Oracle] Add missing `reputation_oracle` to `sender_events_m…
Bobronium Aug 29, 2024
329c158
Update packages/examples/cvat/exchange-oracle/src/core/oracle_events.py
Bobronium Sep 2, 2024
9233a90
Always clean up escrow storage
Bobronium Sep 2, 2024
8b93823
[Exchange Oracle] Delete cloud storages after projects have been deleted
Bobronium Sep 2, 2024
ac4b604
[Exchange Oracle] Ignore API errors while deleting the projects/cloud…
Bobronium Sep 2, 2024
53a812f
[Exchange Oracle] Add tests for exception handling and retries
Bobronium Sep 2, 2024
dec7b4f
[Exchange Oracle] Rename log_error -> _log_error
Bobronium Sep 3, 2024
c909589
[Exchange Oracle] Fix incomplete rename
Bobronium Sep 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion packages/examples/cvat/exchange-oracle/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion packages/examples/cvat/exchange-oracle/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ hexbytes = ">=1.2.0" # required for to_0x_hex() function
[tool.poetry.group.dev.dependencies]
pre-commit = "^3.0.4"
ruff = "^0.6.0"
pytest-mock = "^3.14.0"

[tool.ruff]
line-length = 100
Expand Down Expand Up @@ -99,7 +100,6 @@ ignore = [
"ERA001", # Found commented-out code
"N801", # Class name should use CapWords convention
"PLR0915", # Too many statements
"F401", # Imported but unused
"PLR2004", # Magic value used in comparison, consider replacing with a constant variable
"ANN002", # Missing type annotation for `*args`
"TRY300", # Consider moving this statement to an `else` block
Expand Down Expand Up @@ -130,6 +130,7 @@ ignore = [
]
# alembic is not a package in a traditional sense, so putting __init__.py there doesn't make sense
"alembic/*" = ["INP001"]
"__init__.py" = ["F401"]

[tool.ruff.lint.pep8-naming]
classmethod-decorators = [
Expand Down
3 changes: 3 additions & 0 deletions packages/examples/cvat/exchange-oracle/src/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ PROCESS_JOB_LAUNCHER_WEBHOOKS_INT=
PROCESS_JOB_LAUNCHER_WEBHOOKS_CHUNK_SIZE=
PROCESS_RECORDING_ORACLE_WEBHOOKS_INT=
PROCESS_RECORDING_ORACLE_WEBHOOKS_CHUNK_SIZE=
PROCESS_REPUTATION_ORACLE_WEBHOOKS_CHUNK_SIZE=
PROCESS_REPUTATION_ORACLE_WEBHOOKS_INT=
TRACK_COMPLETED_PROJECTS_INT=
TRACK_COMPLETED_PROJECTS_CHUNK_SIZE=
TRACK_COMPLETED_TASKS_INT=
Expand Down Expand Up @@ -90,6 +92,7 @@ HUMAN_APP_SIGNATURE=
LOCALHOST_RECORDING_ORACLE_ADDRESS=
LOCALHOST_RECORDING_ORACLE_URL=
LOCALHOST_JOB_LAUNCHER_URL=
LOCALHOST_REPUTATION_ORACLE_URL=

# Encryption
PGP_PRIVATE_KEY=
Expand Down
23 changes: 14 additions & 9 deletions packages/examples/cvat/exchange-oracle/src/chain/escrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from human_protocol_sdk.storage import StorageUtils

from src.core.config import Config
from src.core.types import OracleWebhookTypes


def get_escrow(chain_id: int, escrow_address: str) -> EscrowData:
Expand Down Expand Up @@ -56,12 +57,16 @@ def get_escrow_manifest(chain_id: int, escrow_address: str) -> dict:
return json.loads(manifest_content)


def get_job_launcher_address(chain_id: int, escrow_address: str) -> str:
return get_escrow(chain_id, escrow_address).launcher


def get_recording_oracle_address(chain_id: int, escrow_address: str) -> str:
if address := Config.localhost.recording_oracle_address:
return address

return get_escrow(chain_id, escrow_address).recording_oracle
def get_available_webhook_types(
chain_id: int, escrow_address: str
) -> dict[str, OracleWebhookTypes]:
escrow = get_escrow(chain_id, escrow_address)
return {
escrow.launcher.lower(): OracleWebhookTypes.job_launcher,
(
Config.localhost.recording_oracle_address or escrow.recording_oracle
).lower(): OracleWebhookTypes.recording_oracle,
(
Config.localhost.reputation_oracle_url or escrow.reputation_oracle
).lower(): OracleWebhookTypes.reputation_oracle,
}
9 changes: 9 additions & 0 deletions packages/examples/cvat/exchange-oracle/src/chain/kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ def get_recording_oracle_url(chain_id: int, escrow_address: str) -> str:
return OperatorUtils.get_leader(ChainId(chain_id), escrow.recording_oracle).webhook_url


def get_reputation_oracle_url(chain_id: int, escrow_address: str) -> str:
if url := Config.localhost.recording_oracle_url:
return url

escrow = get_escrow(chain_id, escrow_address)

return OperatorUtils.get_leader(ChainId(chain_id), escrow.recording_oracle).webhook_url


def get_job_launcher_url(chain_id: int, escrow_address: str) -> str:
if url := Config.localhost.job_launcher_url:
return url
Expand Down
15 changes: 11 additions & 4 deletions packages/examples/cvat/exchange-oracle/src/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,25 @@ class LocalhostConfig(_NetworkConfig):

recording_oracle_address = os.environ.get("LOCALHOST_RECORDING_ORACLE_ADDRESS")
recording_oracle_url = os.environ.get("LOCALHOST_RECORDING_ORACLE_URL")
reputation_oracle_url = os.environ.get("LOCALHOST_REPUTATION_ORACLE_URL")
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved


class CronConfig:
process_job_launcher_webhooks_int = int(os.environ.get("PROCESS_JOB_LAUNCHER_WEBHOOKS_INT", 30))
process_job_launcher_webhooks_chunk_size = os.environ.get(
"PROCESS_JOB_LAUNCHER_WEBHOOKS_CHUNK_SIZE", 5
process_job_launcher_webhooks_chunk_size = int(
os.environ.get("PROCESS_JOB_LAUNCHER_WEBHOOKS_CHUNK_SIZE", 5)
)
process_recording_oracle_webhooks_int = int(
os.environ.get("PROCESS_RECORDING_ORACLE_WEBHOOKS_INT", 30)
)
process_recording_oracle_webhooks_chunk_size = os.environ.get(
"PROCESS_RECORDING_ORACLE_WEBHOOKS_CHUNK_SIZE", 5
process_recording_oracle_webhooks_chunk_size = int(
os.environ.get("PROCESS_RECORDING_ORACLE_WEBHOOKS_CHUNK_SIZE", 5)
)
process_reputation_oracle_webhooks_chunk_size = int(
os.environ.get("PROCESS_REPUTATION_ORACLE_WEBHOOKS_CHUNK_SIZE", 5)
)
process_reputation_oracle_webhooks_int = int(
os.environ.get("PROCESS_REPUTATION_ORACLE_WEBHOOKS_INT", 5)
)
track_completed_projects_int = int(os.environ.get("TRACK_COMPLETED_PROJECTS_INT", 30))
track_completed_projects_chunk_size = os.environ.get("TRACK_COMPLETED_PROJECTS_CHUNK_SIZE", 5)
Expand Down
14 changes: 12 additions & 2 deletions packages/examples/cvat/exchange-oracle/src/core/oracle_events.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from typing import Union

from pydantic import BaseModel

from src.core.types import (
ExchangeOracleEventTypes,
JobLauncherEventTypes,
OracleWebhookTypes,
RecordingOracleEventTypes,
ReputationOracleEventTypes,
)

EventTypeTag = ExchangeOracleEventTypes | JobLauncherEventTypes | RecordingOracleEventTypes
Expand Down Expand Up @@ -44,13 +43,23 @@ class ExchangeOracleEvent_TaskFinished(OracleEvent):
pass # escrow is enough for now


class ExchangeOracleEvent_EscrowCleaned(OracleEvent):
pass


class ReputationOracleEvent_EscrowCompleted(OracleEvent):
pass


_event_type_map = {
JobLauncherEventTypes.escrow_created: JobLauncherEvent_EscrowCreated,
JobLauncherEventTypes.escrow_canceled: JobLauncherEvent_EscrowCanceled,
RecordingOracleEventTypes.task_completed: RecordingOracleEvent_TaskCompleted,
RecordingOracleEventTypes.task_rejected: RecordingOracleEvent_TaskRejected,
ExchangeOracleEventTypes.task_creation_failed: ExchangeOracleEvent_TaskCreationFailed,
ExchangeOracleEventTypes.task_finished: ExchangeOracleEvent_TaskFinished,
ExchangeOracleEventTypes.escrow_cleaned: ExchangeOracleEvent_EscrowCleaned,
ReputationOracleEventTypes.escrow_completed: ReputationOracleEvent_EscrowCompleted,
}


Expand Down Expand Up @@ -83,6 +92,7 @@ def parse_event(
OracleWebhookTypes.job_launcher: JobLauncherEventTypes,
OracleWebhookTypes.recording_oracle: RecordingOracleEventTypes,
OracleWebhookTypes.exchange_oracle: ExchangeOracleEventTypes,
OracleWebhookTypes.reputation_oracle: ReputationOracleEventTypes,
}

sender_events = sender_events_mapping.get(sender)
Expand Down
12 changes: 10 additions & 2 deletions packages/examples/cvat/exchange-oracle/src/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@
from src.core.types import Networks


def compose_data_bucket_prefix(escrow_address: str, chain_id: Networks):
return f"{escrow_address}@{chain_id}"


def compose_results_bucket_prefix(escrow_address: str, chain_id: Networks):
return f"{escrow_address}@{chain_id}{Config.storage_config.results_dir_suffix}"


def compose_data_bucket_filename(escrow_address: str, chain_id: Networks, filename: str) -> str:
return f"{escrow_address}@{chain_id}/{filename}"
return f"{compose_data_bucket_prefix(escrow_address, chain_id)}/{filename}"


def compose_results_bucket_filename(escrow_address: str, chain_id: Networks, filename: str) -> str:
return f"{escrow_address}@{chain_id}{Config.storage_config.results_dir_suffix}/{filename}"
return f"{compose_results_bucket_prefix(escrow_address, chain_id)}/{filename}"
8 changes: 8 additions & 0 deletions packages/examples/cvat/exchange-oracle/src/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class ProjectStatuses(str, Enum, metaclass=BetterEnumMeta):
validation = "validation"
canceled = "canceled"
recorded = "recorded"
deleted = "deleted"


class TaskStatuses(str, Enum, metaclass=BetterEnumMeta):
Expand Down Expand Up @@ -55,11 +56,13 @@ class OracleWebhookTypes(str, Enum, metaclass=BetterEnumMeta):
exchange_oracle = "exchange_oracle"
job_launcher = "job_launcher"
recording_oracle = "recording_oracle"
reputation_oracle = "reputation_oracle"


class ExchangeOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
task_creation_failed = "task_creation_failed"
task_finished = "task_finished"
escrow_cleaned = "escrow_cleaned"


class JobLauncherEventTypes(str, Enum, metaclass=BetterEnumMeta):
Expand All @@ -72,6 +75,11 @@ class RecordingOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
task_rejected = "task_rejected"


class ReputationOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
# TODO: rename to ReputationOracleEventType
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
escrow_completed = "escrow_completed"


class OracleWebhookStatuses(str, Enum, metaclass=BetterEnumMeta):
pending = "pending"
completed = "completed"
Expand Down
Loading
Loading