Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean up exception handling in the startup code #9059

Merged
merged 1 commit into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9059.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix incorrect exit code when there is an error at startup.
150 changes: 91 additions & 59 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
# Copyright 2019-2021 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,7 +20,7 @@
import socket
import sys
import traceback
from typing import Iterable
from typing import Awaitable, Callable, Iterable

from typing_extensions import NoReturn

Expand Down Expand Up @@ -143,6 +144,45 @@ def quit_with_error(error_string: str) -> NoReturn:
sys.exit(1)


def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
"""Register a callback with the reactor, to be called once it is running

This can be used to initialise parts of the system which require an asynchronous
setup.

Any exception raised by the callback will be printed and logged, and the process
will exit.
"""

async def wrapper():
try:
await cb(*args, **kwargs)
except Exception:
# previously, we used Failure().printTraceback() here, in the hope that
# would give better tracebacks than traceback.print_exc(). However, that
# doesn't handle chained exceptions (with a __cause__ or __context__) well,
# and I *think* the need for Failure() is reduced now that we mostly use
# async/await.

# Write the exception to both the logs *and* the unredirected stderr,
# because people tend to get confused if it only goes to one or the other.
#
# One problem with this is that if people are using a logging config that
# logs to the console (as is common eg under docker), they will get two
# copies of the exception. We could maybe try to detect that, but it's
# probably a cost we can bear.
logger.fatal("Error during startup", exc_info=True)
print("Error during startup:", file=sys.__stderr__)
traceback.print_exc(file=sys.__stderr__)

# it's no use calling sys.exit here, since that just raises a SystemExit
# exception which is then caught by the reactor, and everything carries
# on as normal.
os._exit(1)

reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))


def listen_metrics(bind_addresses, port):
"""
Start Prometheus metrics server.
Expand Down Expand Up @@ -227,7 +267,7 @@ def refresh_certificate(hs):
logger.info("Context factories updated.")


def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
"""
Start a Synapse server or worker.

Expand All @@ -241,75 +281,67 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
hs: homeserver instance
listeners: Listener configuration ('listeners' in homeserver.yaml)
"""
try:
# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):
# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):
reactor = hs.get_reactor()

reactor = hs.get_reactor()
@wrap_as_background_process("sighup")
def handle_sighup(*args, **kwargs):
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sdnotify(b"RELOADING=1")

@wrap_as_background_process("sighup")
def handle_sighup(*args, **kwargs):
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sdnotify(b"RELOADING=1")
for i, args, kwargs in _sighup_callbacks:
i(*args, **kwargs)

for i, args, kwargs in _sighup_callbacks:
i(*args, **kwargs)
sdnotify(b"READY=1")

sdnotify(b"READY=1")
# We defer running the sighup handlers until next reactor tick. This
# is so that we're in a sane state, e.g. flushing the logs may fail
# if the sighup happens in the middle of writing a log entry.
def run_sighup(*args, **kwargs):
# `callFromThread` should be "signal safe" as well as thread
# safe.
reactor.callFromThread(handle_sighup, *args, **kwargs)

# We defer running the sighup handlers until next reactor tick. This
# is so that we're in a sane state, e.g. flushing the logs may fail
# if the sighup happens in the middle of writing a log entry.
def run_sighup(*args, **kwargs):
# `callFromThread` should be "signal safe" as well as thread
# safe.
reactor.callFromThread(handle_sighup, *args, **kwargs)
signal.signal(signal.SIGHUP, run_sighup)

signal.signal(signal.SIGHUP, run_sighup)
register_sighup(refresh_certificate, hs)

register_sighup(refresh_certificate, hs)
# Load the certificate from disk.
refresh_certificate(hs)

# Load the certificate from disk.
refresh_certificate(hs)
# Start the tracer
synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
hs
)

# Start the tracer
synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
hs
)
# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.get_datastore().db_pool.start_profiling()
hs.get_pusherpool().start()

# Log when we start the shut down process.
hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", logger.info, "Shutting down..."
)

# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.get_datastore().db_pool.start_profiling()
hs.get_pusherpool().start()
setup_sentry(hs)
setup_sdnotify(hs)

# Log when we start the shut down process.
hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", logger.info, "Shutting down..."
)
# If background tasks are running on the main process, start collecting the
# phone home stats.
if hs.config.run_background_tasks:
start_phone_stats_home(hs)

setup_sentry(hs)
setup_sdnotify(hs)

# If background tasks are running on the main process, start collecting the
# phone home stats.
if hs.config.run_background_tasks:
start_phone_stats_home(hs)

# We now freeze all allocated objects in the hopes that (almost)
# everything currently allocated are things that will be used for the
# rest of time. Doing so means less work each GC (hopefully).
#
# This only works on Python 3.7
if sys.version_info >= (3, 7):
gc.collect()
gc.freeze()
except Exception:
traceback.print_exc(file=sys.stderr)
reactor = hs.get_reactor()
if reactor.running:
reactor.stop()
sys.exit(1)
# We now freeze all allocated objects in the hopes that (almost)
# everything currently allocated are things that will be used for the
# rest of time. Doing so means less work each GC (hopefully).
#
# This only works on Python 3.7
if sys.version_info >= (3, 7):
gc.collect()
gc.freeze()


def setup_sentry(hs):
Expand Down
7 changes: 3 additions & 4 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from typing_extensions import ContextManager

from twisted.internet import address, reactor
from twisted.internet import address

import synapse
import synapse.events
Expand All @@ -34,6 +34,7 @@
SERVER_KEY_V2_PREFIX,
)
from synapse.app import _base
from synapse.app._base import register_start
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
Expand Down Expand Up @@ -960,9 +961,7 @@ def start(config_options):
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()

reactor.addSystemEventTrigger(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it matter that this is going from addSystemEventTrigger -> callWhenRunning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.

callWhenRunning looks like:

    def callWhenRunning(self, _callable, *args, **kw):
        if self.running:
            _callable(*args, **kw)
        else:
            return self.addSystemEventTrigger('after', 'startup',
                                              _callable, *args, **kw)

so really this is just going from addSystemEventTrigger("before", "startup", ...) to addSystemEventTrigger("after", "startup", ...). I don't think that matters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I don't think that would matter.

"before", "startup", _base.start, hs, config.worker_listeners
)
register_start(_base.start, hs, config.worker_listeners)

_base.start_worker_reactor("synapse-generic-worker", config)

Expand Down
62 changes: 25 additions & 37 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import sys
from typing import Iterable, Iterator

from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.internet import reactor
from twisted.web.resource import EncodingResourceWrapper, IResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
Expand All @@ -38,7 +37,7 @@
WEB_CLIENT_PREFIX,
)
from synapse.app import _base
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
from synapse.config._base import ConfigError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.config.homeserver import HomeServerConfig
Expand Down Expand Up @@ -414,40 +413,29 @@ async def reprovision_acme():
_base.refresh_certificate(hs)

async def start():
try:
# Run the ACME provisioning code, if it's enabled.
if hs.config.acme_enabled:
acme = hs.get_acme_handler()
# Start up the webservices which we will respond to ACME
# challenges with, and then provision.
await acme.start_listening()
await do_acme()

# Check if it needs to be reprovisioned every day.
hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)

# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
await oidc.load_jwks()

_base.start(hs, config.listeners)

hs.get_datastore().db_pool.updates.start_doing_background_updates()
except Exception:
# Print the exception and bail out.
print("Error during startup:", file=sys.stderr)

# this gives better tracebacks than traceback.print_exc()
Failure().printTraceback(file=sys.stderr)

if reactor.running:
reactor.stop()
sys.exit(1)

reactor.callWhenRunning(lambda: defer.ensureDeferred(start()))
# Run the ACME provisioning code, if it's enabled.
if hs.config.acme_enabled:
acme = hs.get_acme_handler()
# Start up the webservices which we will respond to ACME
# challenges with, and then provision.
await acme.start_listening()
await do_acme()

# Check if it needs to be reprovisioned every day.
hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)

# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
await oidc.load_jwks()

await _base.start(hs, config.listeners)

hs.get_datastore().db_pool.updates.start_doing_background_updates()

register_start(start)

return hs

Expand Down