-
Notifications
You must be signed in to change notification settings - Fork 54
/
base.py
47 lines (31 loc) · 1.15 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from __future__ import annotations
import enum
from dataclasses import dataclass
from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar, Union
from mypy_extensions import NamedArg
Config = Any # TODO: better typing for config
SingleArgumentStageFunction = Callable[[Any, NamedArg(type=Any, name="config")], None] # noqa: F821
NoArgumentStageFunction = Callable[[NamedArg(type=Any, name="config")], None] # noqa: F821
StageFunction = Union[NoArgumentStageFunction, SingleArgumentStageFunction]
class StageAnnotationType(enum.Enum):
CONCURRENCY = enum.auto()
RETRIES = enum.auto()
StageAnnotations = Dict[StageAnnotationType, Any]
@dataclass(frozen=True)
class Stage:
function: StageFunction
name: str
mappable: Optional[Iterable] = None
annotations: Optional[StageAnnotations] = None
@dataclass(frozen=True)
class Pipeline:
stages: Iterable[Stage]
config: Optional[Config] = None
T = TypeVar("T")
class PipelineExecutor(Generic[T]):
@staticmethod
def compile(pipeline: Pipeline) -> T:
raise NotImplementedError
@staticmethod
def execute(plan: T):
raise NotImplementedError