Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add prefect server services CLI commands #16706

Merged
merged 19 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion load_testing/run-server.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env bash

DB_TYPE=${1:-sqlite} # Default to sqlite if no argument provided
NO_SERVICES=${2:-false} # Default to false if no argument provided

# Function to start postgres container
start_postgres() {
Expand Down Expand Up @@ -65,13 +66,14 @@ if [[ $DB_TYPE == sqlite ]]; then
elif [[ $DB_TYPE == postgres:* ]]; then
PG_VERSION=${DB_TYPE#postgres:}
start_postgres $PG_VERSION
export PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/prefect"
prefect config set PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/prefect"
else
echo "Invalid database type. Use 'sqlite' or 'postgres:<version>'"
exit 1
fi

PREFECT_API_URL=http://localhost:4200/api \
PREFECT__SERVER_WEBSERVER_ONLY=$NO_SERVICES \
OTEL_SERVICE_NAME=prefect-server \
OTEL_TRACES_EXPORTER=otlp \
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 \
Expand Down
303 changes: 294 additions & 9 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
Command line interface for working with the Prefect API and server.
"""

from __future__ import annotations

import asyncio
import inspect
import os
import shlex
import signal
Expand All @@ -10,11 +14,11 @@
import sys
import textwrap
from pathlib import Path
from types import ModuleType

import anyio
import anyio.abc
import typer
import uvicorn
from rich.table import Table

import prefect
import prefect.settings
Expand Down Expand Up @@ -49,15 +53,18 @@
help="Start a Prefect server instance and interact with the database",
)
database_app = PrefectTyper(name="database", help="Interact with the database.")
services_app = PrefectTyper(name="services", help="Interact with server loop services.")
server_app.add_typer(database_app)
server_app.add_typer(services_app)
app.add_typer(server_app)

logger = get_logger(__name__)

PID_FILE = "server.pid"
SERVER_PID_FILE_NAME = "server.pid"
SERVICES_PID_FILE = Path(PREFECT_HOME.value()) / "services.pid"


def generate_welcome_blurb(base_url, ui_enabled: bool):
def generate_welcome_blurb(base_url: str, ui_enabled: bool):
blurb = textwrap.dedent(
r"""
___ ___ ___ ___ ___ ___ _____
Expand Down Expand Up @@ -240,7 +247,7 @@ def start(
if no_services:
server_settings["PREFECT_SERVER_ANALYTICS_ENABLED"] = "False"

pid_file = Path(PREFECT_HOME.value() / PID_FILE)
pid_file = Path(PREFECT_HOME.value()) / SERVER_PID_FILE_NAME
# check if port is already in use
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
Expand Down Expand Up @@ -350,10 +357,10 @@ def _run_in_foreground(
@server_app.command()
async def stop():
"""Stop a Prefect server instance running in the background"""
pid_file = anyio.Path(PREFECT_HOME.value() / PID_FILE)
if not await pid_file.exists():
pid_file = Path(PREFECT_HOME.value()) / SERVER_PID_FILE_NAME
if not pid_file.exists():
exit_with_success("No server running in the background.")
pid = int(await pid_file.read_text())
pid = int(pid_file.read_text())
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
Expand All @@ -363,7 +370,7 @@ async def stop():
finally:
# The file probably exists, but use `missing_ok` to avoid an
# error if the file was deleted by another actor
await pid_file.unlink(missing_ok=True)
pid_file.unlink(missing_ok=True)
app.console.print("Server stopped!")


Expand Down Expand Up @@ -501,3 +508,281 @@ async def stamp(revision: str):
app.console.print("Stamping database with revision ...")
await run_sync_in_worker_thread(alembic_stamp, revision=revision)
exit_with_success("Stamping database with revision succeeded!")


def _get_service_settings() -> dict[str, "prefect.settings.Setting"]:
"""Get mapping of service names to their enabled/disabled settings."""
return {
"Telemetry": prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED,
"Scheduler": prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED,
cicdw marked this conversation as resolved.
Show resolved Hide resolved
"RecentDeploymentsScheduler": prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED,
"MarkLateRuns": prefect.settings.PREFECT_API_SERVICES_LATE_RUNS_ENABLED,
"FailExpiredPauses": prefect.settings.PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_ENABLED,
"CancellationCleanup": prefect.settings.PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED,
"FlowRunNotifications": prefect.settings.PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED,
"Foreman": prefect.settings.PREFECT_API_SERVICES_FOREMAN_ENABLED,
"ReactiveTriggers": prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED,
"ProactiveTriggers": prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED,
"Actions": prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED,
}


def _get_service_modules() -> list[ModuleType]:
"""Get list of modules containing service implementations."""
from prefect.server.events.services import triggers
from prefect.server.services import (
cancellation_cleanup,
flow_run_notifications,
foreman,
late_runs,
pause_expirations,
scheduler,
task_run_recorder,
telemetry,
)

return [
cancellation_cleanup,
flow_run_notifications,
foreman,
late_runs,
pause_expirations,
scheduler,
task_run_recorder,
telemetry,
triggers,
]


def _discover_service_classes() -> (
list[type["prefect.server.services.loop_service.LoopService"]]
):
"""Discover all available service classes."""
from prefect.server.services.loop_service import LoopService

discovered: list[type[LoopService]] = []
for module in _get_service_modules():
for _, obj in inspect.getmembers(module):
if (
inspect.isclass(obj)
and issubclass(obj, LoopService)
and obj != LoopService
):
discovered.append(obj)
return discovered


def _get_enabled_services() -> (
list[type["prefect.server.services.loop_service.LoopService"]]
):
"""Get list of enabled service classes."""
service_settings = _get_service_settings()
return [
svc
for svc in _discover_service_classes()
if service_settings.get(svc.__name__, False).value() # type: ignore
]


async def _run_services(
service_classes: list[type["prefect.server.services.loop_service.LoopService"]],
):
"""Run the given service classes until cancelled."""
services = [cls() for cls in service_classes]
tasks: list[
tuple[
asyncio.Task[None], type["prefect.server.services.loop_service.LoopService"]
]
] = []

for service in services:
task = asyncio.create_task(service.start())
tasks.append((task, service))
logger.debug(f"Started service: {service.name}")

try:
await asyncio.gather(*(t for t, _ in tasks))
except asyncio.CancelledError:
logger.info("Received cancellation, stopping services...")
for task, service in tasks:
task.cancel()
logger.debug(f"Stopped service: {service.name}")
await asyncio.gather(*(t for t, _ in tasks), return_exceptions=True)


def _is_process_running(pid: int) -> bool:
"""Check if a process is running by attempting to send signal 0."""
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, OSError):
return False


def _read_pid_file(path: Path) -> int | None:
"""Read and validate a PID from a file."""
try:
return int(path.read_text())
except (ValueError, OSError, FileNotFoundError):
return None


def _write_pid_file(path: Path, pid: int) -> None:
"""Write a PID to a file, creating parent directories if needed."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(str(pid))


def _cleanup_pid_file(path: Path) -> None:
"""Remove PID file and try to cleanup empty parent directory."""
path.unlink(missing_ok=True)
try:
path.parent.rmdir()
except OSError:
pass


# this is a hidden command used by the `prefect server services start --background` command
@services_app.command(hidden=True, name="manager")
def run_manager_process():
"""
This is an internal entrypoint used by `prefect server services start --background`.
Users do not call this directly.

We do everything in sync so that the child won't exit until the user kills it.
"""
if not (enabled_services := _get_enabled_services()):
logger.error("No services are enabled! Exiting manager.")
sys.exit(1)

logger.debug("Manager process started. Starting services...")
try:
asyncio.run(_run_services(enabled_services))
except KeyboardInterrupt:
pass
finally:
logger.debug("Manager process has exited.")


# public, user-facing `prefect server services` commands
@services_app.command(aliases=["ls"])
def list_services():
"""List all available services and their status."""
service_settings = _get_service_settings()
table = Table(title="Available Services", expand=True)
table.add_column("Name", style="blue", no_wrap=True)
table.add_column("Enabled?", style="green", no_wrap=True)
table.add_column("Description", style="cyan", no_wrap=False)

for svc in _discover_service_classes():
name = svc.__name__
setting = service_settings.get(name, False)
is_enabled = setting.value() if setting else False # type: ignore
assert isinstance(is_enabled, bool), "Setting value is not a boolean"

doc = inspect.getdoc(svc) or ""
description = doc.split("\n", 1)[0].strip()
table.add_row(name, str(is_enabled), description)

app.console.print(table)


@services_app.command(aliases=["start"])
def start_services(
background: bool = typer.Option(
False, "--background", "-b", help="Run the services in the background"
),
):
"""Start all enabled Prefect services in one process."""
SERVICES_PID_FILE.parent.mkdir(parents=True, exist_ok=True)

if SERVICES_PID_FILE.exists():
pid = _read_pid_file(SERVICES_PID_FILE)
if pid is not None and _is_process_running(pid):
app.console.print(
"\n[yellow]Services are already running in the background.[/]"
"\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]"
)
raise typer.Exit(code=1)
else:
# Stale or invalid file
_cleanup_pid_file(SERVICES_PID_FILE)

if not (enabled_services := _get_enabled_services()):
app.console.print("[red]No services are enabled![/]")
raise typer.Exit(code=1)

if not background:
app.console.print("\n[blue]Starting services... Press CTRL+C to stop[/]\n")
try:
asyncio.run(_run_services(enabled_services))
except KeyboardInterrupt:
pass
app.console.print("\n[green]All services stopped.[/]")
return

for service in enabled_services:
app.console.print(f"Starting service: [yellow]{service.__name__}[/]")

process = subprocess.Popen(
[
"prefect",
"server",
"services",
"manager",
],
env=os.environ.copy(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=(False if os.name == "nt" else True), # POSIX-only
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == "nt" else 0,
)

if process.poll() is not None:
app.console.print("[red]Failed to start services in the background![/]")
raise typer.Exit(code=1)

_write_pid_file(SERVICES_PID_FILE, process.pid)
app.console.print(
"\n[green]Services are running in the background.[/]"
"\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]"
)


@services_app.command(aliases=["stop"])
async def stop_services():
"""Stop any background Prefect services that were started."""

if not SERVICES_PID_FILE.exists():
app.console.print("No services are running in the background.")
raise typer.Exit()

if (pid := _read_pid_file(SERVICES_PID_FILE)) is None:
_cleanup_pid_file(SERVICES_PID_FILE)
app.console.print("No valid PID file found.")
raise typer.Exit()

if not _is_process_running(pid):
app.console.print("[yellow]Services were not running[/]")
_cleanup_pid_file(SERVICES_PID_FILE)
return

app.console.print("\n[yellow]Shutting down...[/]")
try:
if os.name == "nt":
# On Windows, send Ctrl+C to the process group
os.kill(pid, signal.CTRL_C_EVENT)
else:
# On Unix, send SIGTERM
os.kill(pid, signal.SIGTERM)
except (ProcessLookupError, OSError):
pass

for _ in range(5):
if not _is_process_running(pid):
app.console.print("[dim]✓ Services stopped[/]")
break
await asyncio.sleep(1)

_cleanup_pid_file(SERVICES_PID_FILE)
app.console.print("\n[green]All services stopped.[/]")
2 changes: 2 additions & 0 deletions src/prefect/server/events/services/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ async def stop(self) -> None:


class ProactiveTriggers(LoopService):
"""A loop service that runs the proactive triggers consumer"""

def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any):
super().__init__(
loop_seconds=(
Expand Down
Loading
Loading