Skip to content

Commit

Permalink
Global Pathway config refactor. (#5507)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Michał Bartoszkiewicz <[email protected]>
GitOrigin-RevId: dc0abfcbc1d40193b35a55c8cc261a932bcaafa3
  • Loading branch information
2 people authored and Manul from Pathway committed Jan 26, 2024
1 parent 2b9bcb7 commit 3dc1077
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 63 deletions.
89 changes: 89 additions & 0 deletions python/pathway/internals/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
from dataclasses import dataclass, field

from pathway import persistence
from pathway.internals import api


def _env_field(name: str, default: str | None = None):
def factory():
return os.environ.get(name, default)

return field(default_factory=factory)


def _env_bool_field(name: str):
def factory():
value = os.environ.get(name, "false").lower()
if value in ("1", "true", "yes"):
return True
elif value in ("0", "false", "no"):
return False
else:
raise ValueError(
f"Unexpected value for {name!r} environment variable: {value!r}"
)

return field(default_factory=factory)


def _snapshot_access() -> api.SnapshotAccess | None:
match os.environ.get("PATHWAY_SNAPSHOT_ACCESS", "").lower():
case "record":
return api.SnapshotAccess.RECORD
case "replay":
return api.SnapshotAccess.REPLAY
case _:
return None


def _persistence_mode() -> api.PersistenceMode:
match os.environ.get(
"PATHWAY_PERSISTENCE_MODE", os.environ.get("PATHWAY_REPLAY_MODE", "")
).lower():
case "speedrun":
return api.PersistenceMode.SPEEDRUN_REPLAY
case "batch":
return api.PersistenceMode.BATCH
case _:
return api.PersistenceMode.BATCH


@dataclass
class PathwayConfig:
continue_after_replay: bool = _env_bool_field("PATHWAY_CONTINUE_AFTER_REPLAY")
ignore_asserts: bool = _env_bool_field("PATHWAY_IGNORE_ASSERTS")
runtime_typechecking: bool = _env_bool_field("PATHWAY_RUNTIME_TYPECHECKING")
persistence_mode: api.PersistenceMode = field(default_factory=_persistence_mode)
snapshot_access: api.SnapshotAccess | None = field(default_factory=_snapshot_access)
replay_storage: str | None = _env_field("PATHWAY_REPLAY_STORAGE")

@property
def replay_config(
self,
) -> persistence.Config | None:
if self.replay_storage:
if self.snapshot_access not in (
api.SnapshotAccess.RECORD,
api.SnapshotAccess.REPLAY,
):
raise ValueError(
"unexpected value of PATHWAY_SNAPSHOT_ACCESS environment variable "
+ "- when PATHWAY_REPLAY_STORAGE is set, PATHWAY_SNAPSHOT_ACCESS "
+ "needs to be set to either 'record' or 'replay'"
)
data_storage = persistence.Backend.filesystem(self.replay_storage)
persistence_config = persistence.Config.simple_config(
data_storage,
persistence_mode=self.persistence_mode,
snapshot_access=self.snapshot_access,
continue_after_replay=self.continue_after_replay,
)
return persistence_config
else:
return None


pathway_config = PathwayConfig()

__all__ = ["PathwayConfig", "pathway_config"]
59 changes: 0 additions & 59 deletions python/pathway/internals/environ.py

This file was deleted.

9 changes: 5 additions & 4 deletions python/pathway/internals/graph_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

from collections.abc import Callable, Iterable

from pathway.internals import api, environ, parse_graph as graph, table, trace
from pathway.internals import api, parse_graph as graph, table, trace
from pathway.internals.column_path import ColumnPath
from pathway.internals.config import pathway_config
from pathway.internals.graph_runner.async_utils import new_event_loop
from pathway.internals.graph_runner.row_transformer_operator_handler import ( # noqa: registers handler for RowTransformerOperator
RowTransformerOperatorHandler,
Expand Down Expand Up @@ -46,14 +47,14 @@ def __init__(
self._graph = input_graph
self.debug = debug
if ignore_asserts is None:
ignore_asserts = environ.ignore_asserts
ignore_asserts = pathway_config.ignore_asserts
self.ignore_asserts = ignore_asserts
self.monitoring_level = monitoring_level
self.with_http_server = with_http_server
self.default_logging = default_logging
self.persistence_config = persistence_config or environ.get_replay_config()
self.persistence_config = persistence_config or pathway_config.replay_config
if runtime_typechecking is None:
self.runtime_typechecking = environ.runtime_typechecking
self.runtime_typechecking = pathway_config.runtime_typechecking
else:
self.runtime_typechecking = runtime_typechecking

Expand Down

0 comments on commit 3dc1077

Please sign in to comment.