-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path_models.py
58 lines (50 loc) · 1.43 KB
/
_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""
Models for Crystal connector
"""
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class RequestLifeCycleStage(Enum):
"""
Crystal status states.
"""
NEW = 'NEW'
BUFFERED = 'BUFFERED'
RUNNING = 'RUNNING'
COMPLETED = 'COMPLETED'
FAILED = 'FAILED'
SCHEDULING_TIMEOUT = 'SCHEDULING_TIMEOUT'
DEADLINE_EXCEEDED = 'DEADLINE_EXCEEDED'
THROTTLED = 'THROTTLED'
@dataclass
class RequestResult:
"""
The Crystal result when retrieving an existing run.
"""
run_id: str
status: RequestLifeCycleStage
result_uri: Optional[str] = None
run_error_message: Optional[str] = None
@classmethod
def from_dict(cls, dict_: dict):
"""
Constructs a CrystalResult object from a dictionary containing the
keys from the /result HTTP GET request.
:param dict_: The (JSON) dict from the HTTP request.
:return: The corresponding CrystalResult object.
"""
return RequestResult(
run_id=dict_['requestId'],
status=RequestLifeCycleStage(dict_['status']),
result_uri=dict_['resultUri'],
run_error_message=dict_['runErrorMessage'],
)
@dataclass
class AlgorithmRunResult:
"""
The result of an algorithm to be submitted to Crystal.
"""
run_id: str
cause: Optional[str] = None
message: Optional[str] = None
sas_uri: Optional[str] = None