diff --git a/python/pathway/internals/config.py b/python/pathway/internals/config.py new file mode 100644 index 00000000..60d02f6a --- /dev/null +++ b/python/pathway/internals/config.py @@ -0,0 +1,89 @@ +import os +from dataclasses import dataclass, field + +from pathway import persistence +from pathway.internals import api + + +def _env_field(name: str, default: str | None = None): + def factory(): + return os.environ.get(name, default) + + return field(default_factory=factory) + + +def _env_bool_field(name: str): + def factory(): + value = os.environ.get(name, "false").lower() + if value in ("1", "true", "yes"): + return True + elif value in ("0", "false", "no"): + return False + else: + raise ValueError( + f"Unexpected value for {name!r} environment variable: {value!r}" + ) + + return field(default_factory=factory) + + +def _snapshot_access() -> api.SnapshotAccess | None: + match os.environ.get("PATHWAY_SNAPSHOT_ACCESS", "").lower(): + case "record": + return api.SnapshotAccess.RECORD + case "replay": + return api.SnapshotAccess.REPLAY + case _: + return None + + +def _persistence_mode() -> api.PersistenceMode: + match os.environ.get( + "PATHWAY_PERSISTENCE_MODE", os.environ.get("PATHWAY_REPLAY_MODE", "") + ).lower(): + case "speedrun": + return api.PersistenceMode.SPEEDRUN_REPLAY + case "batch": + return api.PersistenceMode.BATCH + case _: + return api.PersistenceMode.BATCH + + +@dataclass +class PathwayConfig: + continue_after_replay: bool = _env_bool_field("PATHWAY_CONTINUE_AFTER_REPLAY") + ignore_asserts: bool = _env_bool_field("PATHWAY_IGNORE_ASSERTS") + runtime_typechecking: bool = _env_bool_field("PATHWAY_RUNTIME_TYPECHECKING") + persistence_mode: api.PersistenceMode = field(default_factory=_persistence_mode) + snapshot_access: api.SnapshotAccess | None = field(default_factory=_snapshot_access) + replay_storage: str | None = _env_field("PATHWAY_REPLAY_STORAGE") + + @property + def replay_config( + self, + ) -> persistence.Config | None: + if self.replay_storage: + if self.snapshot_access not in ( + api.SnapshotAccess.RECORD, + api.SnapshotAccess.REPLAY, + ): + raise ValueError( + "unexpected value of PATHWAY_SNAPSHOT_ACCESS environment variable " + + "- when PATHWAY_REPLAY_STORAGE is set, PATHWAY_SNAPSHOT_ACCESS " + + "needs to be set to either 'record' or 'replay'" + ) + data_storage = persistence.Backend.filesystem(self.replay_storage) + persistence_config = persistence.Config.simple_config( + data_storage, + persistence_mode=self.persistence_mode, + snapshot_access=self.snapshot_access, + continue_after_replay=self.continue_after_replay, + ) + return persistence_config + else: + return None + + +pathway_config = PathwayConfig() + +__all__ = ["PathwayConfig", "pathway_config"] diff --git a/python/pathway/internals/environ.py b/python/pathway/internals/environ.py deleted file mode 100644 index 17389c83..00000000 --- a/python/pathway/internals/environ.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright © 2024 Pathway - -import os - -from pathway.internals import api -from pathway.persistence import ( - Backend as PersistentStorageBackend, - Config as PersistenceConfig, -) - -ignore_asserts = os.environ.get("PATHWAY_IGNORE_ASSERTS", "false").lower() in ( - "1", - "true", - "yes", -) - -runtime_typechecking = os.environ.get( - "PATHWAY_RUNTIME_TYPECHECKING", "false" -).lower() in ( - "1", - "true", - "yes", -) - - -def get_replay_config(): - if replay_storage := os.environ.get("PATHWAY_REPLAY_STORAGE"): - fallback_mode = os.environ.get("PATHWAY_REPLAY_MODE", "") - match os.environ.get("PATHWAY_PERSISTENCE_MODE", fallback_mode).lower(): - case "speedrun": - persistence_mode = api.PersistenceMode.SPEEDRUN_REPLAY - case "batch": - persistence_mode = api.PersistenceMode.BATCH - case _: - persistence_mode = api.PersistenceMode.BATCH - match os.environ.get("PATHWAY_SNAPSHOT_ACCESS", "").lower(): - case "record": - snapshot_access = api.SnapshotAccess.RECORD - case "replay": - snapshot_access = api.SnapshotAccess.REPLAY - case _: - raise ValueError( - """unexpected value of PATHWAY_SNAPSHOT_ACCESS environment variable """ - """- when PATHWAY_REPLAY_STORAGE is set, PATHWAY_SNAPSHOT_ACCESS """ - """needs to be set to either "record" or "replay" """ - ) - - continue_after_replay = bool(os.environ.get("PATHWAY_CONTINUE_AFTER_REPLAY")) - - data_storage = PersistentStorageBackend.filesystem(replay_storage) - persistence_config = PersistenceConfig.simple_config( - data_storage, - persistence_mode=persistence_mode, - snapshot_access=snapshot_access, - continue_after_replay=continue_after_replay, - ) - return persistence_config - else: - return None diff --git a/python/pathway/internals/graph_runner/__init__.py b/python/pathway/internals/graph_runner/__init__.py index b86f10ea..7ed93bbd 100644 --- a/python/pathway/internals/graph_runner/__init__.py +++ b/python/pathway/internals/graph_runner/__init__.py @@ -4,8 +4,9 @@ from collections.abc import Callable, Iterable -from pathway.internals import api, environ, parse_graph as graph, table, trace +from pathway.internals import api, parse_graph as graph, table, trace from pathway.internals.column_path import ColumnPath +from pathway.internals.config import pathway_config from pathway.internals.graph_runner.async_utils import new_event_loop from pathway.internals.graph_runner.row_transformer_operator_handler import ( # noqa: registers handler for RowTransformerOperator RowTransformerOperatorHandler, @@ -46,14 +47,14 @@ def __init__( self._graph = input_graph self.debug = debug if ignore_asserts is None: - ignore_asserts = environ.ignore_asserts + ignore_asserts = pathway_config.ignore_asserts self.ignore_asserts = ignore_asserts self.monitoring_level = monitoring_level self.with_http_server = with_http_server self.default_logging = default_logging - self.persistence_config = persistence_config or environ.get_replay_config() + self.persistence_config = persistence_config or pathway_config.replay_config if runtime_typechecking is None: - self.runtime_typechecking = environ.runtime_typechecking + self.runtime_typechecking = pathway_config.runtime_typechecking else: self.runtime_typechecking = runtime_typechecking