Skip to content

Commit

Permalink
better background behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Jan 14, 2025
1 parent 15ecf88 commit 14ad097
Showing 1 changed file with 161 additions and 119 deletions.
280 changes: 161 additions & 119 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import inspect
import os
import shlex
import shutil
import signal
import socket
import subprocess
import sys
import textwrap
import time
from pathlib import Path

import anyio
Expand Down Expand Up @@ -509,12 +509,10 @@ async def stamp(revision: str):
exit_with_success("Stamping database with revision succeeded!")


def _discover_services() -> (
tuple[
list[type[prefect.server.services.loop_service.LoopService]],
dict[str, prefect.settings.Setting],
]
):
# ----------------------------------------------------------------------
# 1) Discover
# ----------------------------------------------------------------------
def _discover_services():
from prefect.server.events.services import triggers
from prefect.server.services import (
cancellation_cleanup,
Expand Down Expand Up @@ -572,9 +570,12 @@ def _discover_services() -> (
return discovered_services, service_settings


# ----------------------------------------------------------------------
# 2) Actual runner: runs everything in one event loop
# ----------------------------------------------------------------------
async def _run_services(
service_classes: list[type[prefect.server.services.loop_service.LoopService]],
) -> None:
):
services = [cls() for cls in service_classes]
tasks: list[
tuple[
Expand All @@ -585,178 +586,219 @@ async def _run_services(
for service in services:
task = asyncio.create_task(service.start())
tasks.append((task, service))
app.console.print(f"[dim]✓ {service.name}[/]")
logger.info(
f"Started service: {service.name}"
) # or console.print if you prefer

try:
await asyncio.gather(*(task for task, _ in tasks))
await asyncio.gather(*(t for t, _ in tasks))
except asyncio.CancelledError:
logger.info("Received cancellation, stopping services...")
for task, service in tasks:
task.cancel()
app.console.print(f"[dim]✓ Stopped {service.name}[/]")
await asyncio.gather(*(task for task, _ in tasks), return_exceptions=True)
logger.info(f"Stopped service: {service.name}")
await asyncio.gather(*(t for t, _ in tasks), return_exceptions=True)


def _is_process_running(pid: int) -> bool:
"""Check if a process is running."""
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, OSError):
return False


@services_app.command(aliases=["ls", "list"])
async def list_services():
"""List all services"""
# ----------------------------------------------------------------------
# 3) Hidden subcommand for the child to run in the background
# This is the actual manager process entrypoint.
# ----------------------------------------------------------------------
@services_app.command(hidden=True, name="manager")
def run_manager_process():
"""
This is an internal entrypoint used by `prefect server services start --background`.
Users do not call this directly.
We do everything in sync so that the child won't exit until the user kills it.
"""
# Re-discover enabled services in the child (same environment)
discovered_services, service_settings = _discover_services()
enabled_services = [
svc
for svc in discovered_services
if service_settings.get(svc.__name__, False).value()
]
if not enabled_services:
logger.error("No services are enabled! Exiting manager.")
sys.exit(1)

logger.info("Manager process started. Starting services...")
try:
asyncio.run(_run_services(enabled_services))
except KeyboardInterrupt:
pass
logger.info("Manager process has exited.")


# ----------------------------------------------------------------------
# 4) CLI commands
# ----------------------------------------------------------------------
@services_app.command(aliases=["ls", "list"])
def list_services():
discovered_services, service_settings = _discover_services()
table = Table(title="Available Services", expand=True)
table.add_column("Name", style="blue", no_wrap=True)
table.add_column("Setting Enabled (T/F)", style="green", no_wrap=True)
table.add_column("Enabled?", style="green", no_wrap=True)
table.add_column("Description", style="cyan", no_wrap=False)

for service_class in discovered_services:
name = service_class.__name__
for svc in discovered_services:
name = svc.__name__
setting = service_settings.get(name, False)
enabled = "T" if setting.value() else "F" # type: ignore
is_enabled = setting.value() if setting else False
enabled_text = "T" if is_enabled else "F"

description = ""
if doc := inspect.getdoc(service_class):
description = doc.split("\n")[0].strip()
if len(description) > 60:
description = description[:57] + "..."
doc = inspect.getdoc(svc) or ""
description = doc.split("\n", 1)[0].strip()
if len(description) > 60:
description = description[:57] + "..."

table.add_row(name, enabled, description)
table.add_row(name, enabled_text, description)

app.console.print(table)
typer.echo(table)


@services_app.command(aliases=["start"])
async def start_services(
def start_services(
background: bool = typer.Option(
False, "--background", "-b", help="Run the services in the background"
),
_internal: bool = typer.Option(
False, "--internal", hidden=True, help="Internal flag for background process"
),
):
"""Start all enabled Prefect services"""
pid_file = Path(PREFECT_HOME.value() / "services" / "manager.pid")
logger.debug(f"Using PID file at {pid_file}")
"""
Start all enabled Prefect services in one process.
"""
pid_file = Path(PREFECT_HOME.value()) / "services" / "manager.pid"
pid_file.parent.mkdir(parents=True, exist_ok=True) # ensure directory exists

if pid_file.exists():
try:
pid = int(pid_file.read_text())
logger.debug(f"Found existing PID file with PID {pid}")
if _is_process_running(pid):
app.console.print(
"\n[yellow]Services are already running in the background.[/]"
"\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them first.[/]"
"\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]"
)
return
logger.debug(f"Process {pid} is not running")
except (ValueError, OSError) as e:
logger.debug(f"Error reading PID file: {e}")
pid_file.unlink(missing_ok=True)
logger.debug("Removed stale PID file")

discovered_services, service_settings = _discover_services()
enabled_services = [
service_class
for service_class in discovered_services
if service_settings.get(service_class.__name__, False).value()
]
raise typer.Exit()
else:
# Stale file
pid_file.unlink(missing_ok=True)
except (ValueError, OSError):
# Could not read or parse
pid_file.unlink(missing_ok=True)

# 1) Foreground run
if not background:
app.console.print("\n[blue]Starting services... Press CTRL+C to stop[/]\n")
# Re-discover in the parent so we can fail fast if none enabled
discovered_services, service_settings = _discover_services()
enabled_services = [
svc
for svc in discovered_services
if service_settings.get(svc.__name__, False).value()
]

if not enabled_services:
exit_with_error("No services are enabled!")
if not enabled_services:
app.console.print("[red]No services are enabled![/]")
raise typer.Exit(code=1)

if background and not _internal:
pid_dir = Path(PREFECT_HOME.value() / "services")
pid_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"Created services directory at {pid_dir}")
try:
asyncio.run(_run_services(enabled_services))
except KeyboardInterrupt:
pass
app.console.print("\n[green]All services stopped.[/]")
return

prefect_executable = shutil.which("prefect")
logger.debug(f"Found prefect executable at {prefect_executable}")
if not prefect_executable:
app.console.print("[red]Could not find prefect executable[/]")
return
# 2) Background run => spawn the "manager" subcommand
# We'll *not* discard logs. Instead, at least redirect them to a file.
log_file = pid_file.parent / "services.log"

command = [
"prefect", # We'll rely on `prefect` CLI group
"server",
"services",
"manager", # subcommand above
]
with open(log_file, "ab") as f:
process = subprocess.Popen(
[prefect_executable, "server", "services", "start", "--internal"],
env=os.environ,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
command,
env=os.environ.copy(),
stdout=f,
stderr=f,
start_new_session=True, # separate process group on Unix
)
logger.debug(f"Started background process with PID {process.pid}")

await asyncio.sleep(1.0)
if process.poll() is not None:
logger.debug("Process failed to start")
app.console.print("[red]Failed to start services[/]")
return

logger.debug(f"Writing PID {process.pid} to {pid_file}")
pid_file.write_text(str(process.pid))
logger.debug("Wrote PID file successfully")

app.console.print(
"\n[green]Services are running in the background.[/]"
"\n[blue]Use[/] [yellow]`prefect server services ls`[/] [blue]to check their status.[/]"
"\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]"
)
else:
app.console.print("\n[blue]Starting services... Press CTRL+C to stop[/]\n")
try:
await _run_services(enabled_services)
except KeyboardInterrupt:
pass
finally:
app.console.print("\n[green]All services stopped.[/]")
time.sleep(1)
if process.poll() is not None:
app.console.print("[red]Failed to start services in the background![/]")
# Print tail of log to help debugging
with open(log_file, "rb") as f:
lines = f.read().decode().splitlines()[-10:]
for line in lines:
app.console.print(f"[dim]{line}[/]")
raise typer.Exit(code=1)

# If child is still running, write out the PID
pid_file.write_text(str(process.pid))
app.console.print(
"\n[green]Services are running in the background.[/]"
f"\n[dim]Logs are in: {log_file}[/]"
"\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]"
)


@services_app.command(aliases=["stop"])
async def stop_services():
"""Stop all background services"""
pid_file = Path(PREFECT_HOME.value() / "services" / "manager.pid")
logger.debug(f"Looking for PID file at {pid_file}")
"""
Stop any background Prefect services that were started.
"""
pid_file = Path(PREFECT_HOME.value()) / "services" / "manager.pid"

if not pid_file.exists():
logger.debug("PID file not found")
exit_with_success("No services are running in the background.")
app.console.print("No services are running in the background.")
raise typer.Exit()

pid = None
try:
pid = int(pid_file.read_text())
logger.debug(f"Read PID {pid} from file")

if _is_process_running(pid):
logger.debug(f"Sending SIGTERM to process {pid}")
os.kill(pid, signal.SIGTERM)
app.console.print("\n[yellow]Shutting down...[/]")

# Wait for process to exit
for i in range(5): # 5 second timeout
if not _is_process_running(pid):
logger.debug(f"Process {pid} stopped after {i + 1} seconds")
break
logger.debug(f"Waiting for process {pid} to stop...")
await asyncio.sleep(1)
except (ValueError, OSError):
# Can't parse or read
pid_file.unlink(missing_ok=True)
app.console.print("No valid PID file found.")
raise typer.Exit()

app.console.print("[dim]✓ Services stopped[/]")
else:
logger.debug(f"Process {pid} was not running")
app.console.print("[yellow]Services were not running[/]")
except (ValueError, OSError) as e:
logger.debug(f"Error stopping process: {e}")
app.console.print(f"[red]✗ Failed to stop services: {str(e)}[/]")
finally:
logger.debug(f"Removing PID file {pid_file}")
if not _is_process_running(pid):
app.console.print("[yellow]Services were not running[/]")
pid_file.unlink(missing_ok=True)
# Attempt to remove directory if empty
try:
pid_file.parent.rmdir()
logger.debug("Removed services directory")
except OSError:
logger.debug("Could not remove services directory")
pass
app.console.print("\n[green]All services stopped.[/]")
return

app.console.print("\n[yellow]Shutting down...[/]")
os.kill(pid, signal.SIGTERM)

# We’ll wait a bit for it to exit
for _ in range(5):
if not _is_process_running(pid):
app.console.print("[dim]✓ Services stopped[/]")
break
await asyncio.sleep(1)

pid_file.unlink(missing_ok=True)
try:
pid_file.parent.rmdir()
except OSError:
pass

app.console.print("\n[green]All services stopped.[/]")

0 comments on commit 14ad097

Please sign in to comment.