Skip to content

Commit

Permalink
Use context managers to simplify log serve management (#27756)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish authored Nov 17, 2022
1 parent 355b008 commit 5004dac
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 37 deletions.
29 changes: 13 additions & 16 deletions airflow/cli/commands/celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Celery command."""
from __future__ import annotations

from contextlib import contextmanager
from multiprocessing import Process

import daemon
Expand Down Expand Up @@ -83,22 +84,16 @@ def flower(args):
celery_app.start(options)


def _serve_logs(skip_serve_logs: bool = False) -> Process | None:
@contextmanager
def _serve_logs(skip_serve_logs: bool = False):
"""Starts serve_logs sub-process."""
sub_proc = None
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
return sub_proc
return None


def _run_worker(options, skip_serve_logs):
sub_proc = _serve_logs(skip_serve_logs)
try:
celery_app.worker_main(options)
finally:
if sub_proc:
sub_proc.terminate()
yield
if sub_proc:
sub_proc.terminate()


@cli_utils.action_cli
Expand Down Expand Up @@ -190,17 +185,19 @@ def worker(args):
stdout_handle.truncate(0)
stderr_handle.truncate(0)

ctx = daemon.DaemonContext(
daemon_context = daemon.DaemonContext(
files_preserve=[handle],
umask=int(umask, 8),
stdout=stdout_handle,
stderr=stderr_handle,
)
with ctx:
_run_worker(options=options, skip_serve_logs=skip_serve_logs)
with daemon_context, _serve_logs(skip_serve_logs):
celery_app.worker_main(options)

else:
# Run Celery worker in the same process
_run_worker(options=options, skip_serve_logs=skip_serve_logs)
with _serve_logs(skip_serve_logs):
celery_app.worker_main(options)


@cli_utils.action_cli
Expand Down
36 changes: 15 additions & 21 deletions airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import signal
from contextlib import contextmanager
from multiprocessing import Process

import daemon
Expand All @@ -31,28 +32,15 @@
from airflow.utils.scheduler_health import serve_health_check


def _create_scheduler_job(args):
def _run_scheduler_job(args):
job = SchedulerJob(
subdir=process_subdir(args.subdir),
num_runs=args.num_runs,
do_pickle=args.do_pickle,
)
return job


def _run_scheduler_job(args):
skip_serve_logs = args.skip_serve_logs
job = _create_scheduler_job(args)
logs_sub_proc = _serve_logs(skip_serve_logs)
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
health_sub_proc = _serve_health_check(enable_health_check)
try:
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
job.run()
finally:
if logs_sub_proc:
logs_sub_proc.terminate()
if health_sub_proc:
health_sub_proc.terminate()


@cli_utils.action_cli
Expand Down Expand Up @@ -85,23 +73,29 @@ def scheduler(args):
_run_scheduler_job(args=args)


def _serve_logs(skip_serve_logs: bool = False) -> Process | None:
@contextmanager
def _serve_logs(skip_serve_logs: bool = False):
"""Starts serve_logs sub-process."""
from airflow.configuration import conf
from airflow.utils.serve_logs import serve_logs

sub_proc = None
if conf.get("core", "executor") in ["LocalExecutor", "SequentialExecutor"]:
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
return sub_proc
return None
yield
if sub_proc:
sub_proc.terminate()


def _serve_health_check(enable_health_check: bool = False) -> Process | None:
@contextmanager
def _serve_health_check(enable_health_check: bool = False):
"""Starts serve_health_check sub-process."""
sub_proc = None
if enable_health_check:
sub_proc = Process(target=serve_health_check)
sub_proc.start()
return sub_proc
return None
yield
if sub_proc:
sub_proc.terminate()

0 comments on commit 5004dac

Please sign in to comment.