Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
handle PID_FILE better
Browse files Browse the repository at this point in the history
zzstoatzz committed Jan 14, 2025
1 parent b51a915 commit 903f5df
Showing 2 changed files with 33 additions and 22 deletions.
27 changes: 11 additions & 16 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@

logger = get_logger(__name__)

PID_FILE = "server.pid"
PID_FILE = Path(PREFECT_HOME.value()) / "services.pid"


def generate_welcome_blurb(base_url: str, ui_enabled: bool):
@@ -515,9 +515,6 @@ def _get_service_settings() -> dict[str, "prefect.settings.Setting"]:
"""Get mapping of service names to their enabled/disabled settings."""
return {
"Telemetry": prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED,
"TaskRunRecorder": prefect.settings.PREFECT_API_SERVICES_TASK_RUN_RECORDER_ENABLED,
"EventPersister": prefect.settings.PREFECT_API_SERVICES_EVENT_PERSISTER_ENABLED,
"Distributor": prefect.settings.PREFECT_API_EVENTS_STREAM_OUT_ENABLED,
"Scheduler": prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED,
"RecentDeploymentsScheduler": prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED,
"MarkLateRuns": prefect.settings.PREFECT_API_SERVICES_LATE_RUNS_ENABLED,
@@ -698,11 +695,10 @@ def start_services(
),
):
"""Start all enabled Prefect services in one process."""
pid_file = Path(PREFECT_HOME.value()) / "services" / "manager.pid"
pid_file.parent.mkdir(parents=True, exist_ok=True)
PID_FILE.parent.mkdir(parents=True, exist_ok=True)

if pid_file.exists():
pid = _read_pid_file(pid_file)
if PID_FILE.exists():
pid = _read_pid_file(PID_FILE)
if pid is not None and _is_process_running(pid):
app.console.print(
"\n[yellow]Services are already running in the background.[/]"
@@ -711,7 +707,7 @@ def start_services(
raise typer.Exit(code=1)
else:
# Stale or invalid file
_cleanup_pid_file(pid_file)
_cleanup_pid_file(PID_FILE)

if not (enabled_services := _get_enabled_services()):
app.console.print("[red]No services are enabled![/]")
@@ -747,7 +743,7 @@ def start_services(
app.console.print("[red]Failed to start services in the background![/]")
raise typer.Exit(code=1)

_write_pid_file(pid_file, process.pid)
_write_pid_file(PID_FILE, process.pid)
app.console.print(
"\n[green]Services are running in the background.[/]"
"\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]"
@@ -757,20 +753,19 @@ def start_services(
@services_app.command(aliases=["stop"])
async def stop_services():
"""Stop any background Prefect services that were started."""
pid_file = Path(PREFECT_HOME.value()) / "services" / "manager.pid"

if not pid_file.exists():
if not PID_FILE.exists():
app.console.print("No services are running in the background.")
raise typer.Exit()

if (pid := _read_pid_file(pid_file)) is None:
_cleanup_pid_file(pid_file)
if (pid := _read_pid_file(PID_FILE)) is None:
_cleanup_pid_file(PID_FILE)
app.console.print("No valid PID file found.")
raise typer.Exit()

if not _is_process_running(pid):
app.console.print("[yellow]Services were not running[/]")
_cleanup_pid_file(pid_file)
_cleanup_pid_file(PID_FILE)
return

app.console.print("\n[yellow]Shutting down...[/]")
@@ -790,5 +785,5 @@ async def stop_services():
break
await asyncio.sleep(1)

_cleanup_pid_file(pid_file)
_cleanup_pid_file(PID_FILE)
app.console.print("\n[green]All services stopped.[/]")
28 changes: 22 additions & 6 deletions tests/cli/test_server_services.py
Original file line number Diff line number Diff line change
@@ -19,8 +19,24 @@ def enable_all_services():
yield


@pytest.fixture
def pid_file(monkeypatch: pytest.MonkeyPatch) -> Path:
pid_file = Path(PREFECT_HOME.value()) / "services.pid"
monkeypatch.setattr("prefect.cli.server.PID_FILE", pid_file)
return pid_file


@pytest.fixture(autouse=True)
def cleanup_pid_file(pid_file: Path):
if pid_file.exists():
pid_file.unlink()
yield
if pid_file.exists():
pid_file.unlink()


class TestBackgroundServices:
def test_start_and_stop_services(self):
def test_start_and_stop_services(self, pid_file: Path):
invoke_and_assert(
command=[
"server",
@@ -32,7 +48,6 @@ def test_start_and_stop_services(self):
expected_code=0,
)

pid_file = Path(PREFECT_HOME.value()) / "services" / "manager.pid"
assert pid_file.exists(), "Services PID file does not exist"

invoke_and_assert(
@@ -47,7 +62,7 @@ def test_start_and_stop_services(self):

assert not pid_file.exists(), "Services PID file still exists"

def test_start_duplicate_services(self):
def test_start_duplicate_services(self, pid_file: Path):
invoke_and_assert(
command=[
"server",
@@ -59,6 +74,8 @@ def test_start_duplicate_services(self):
expected_code=0,
)

assert pid_file.exists(), "PID file should exist before duplicate test"

invoke_and_assert(
command=[
"server",
@@ -80,10 +97,9 @@ def test_start_duplicate_services(self):
expected_code=0,
)

def test_stop_stale_pid_file(self):
pid_file = Path(PREFECT_HOME.value()) / "services" / "manager.pid"
def test_stop_stale_pid_file(self, pid_file: Path):
pid_file.parent.mkdir(parents=True, exist_ok=True)
pid_file.write_text("99999")
pid_file.write_text("99999") # Use a likely unused PID

invoke_and_assert(
command=[

0 comments on commit 903f5df

Please sign in to comment.