Skip to content

Commit

Permalink
♲ Move the request processing loop to own method
Browse files Browse the repository at this point in the history
This helps clarify the abstraction layers, reducing the complexity of
individual methods, making it easier to reason about what's going on
there.
  • Loading branch information
webknjaz committed Mar 13, 2024
1 parent 3b9d914 commit 432a855
Showing 1 changed file with 92 additions and 72 deletions.
164 changes: 92 additions & 72 deletions cheroot/workers/threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,82 +100,23 @@ def __init__(self, server):
threading.Thread.__init__(self)

def run(self):
"""Process incoming HTTP connections.
"""Set up incoming HTTP connection processing loop.
Retrieves incoming connections from thread pool.
This is the thread's entry-point. It performs lop-layer
exception handling and interrupt processing.
:exc:`KeyboardInterrupt` and :exc:`SystemExit` bubbling up
from the inner-layer code constitute a global server interrupt
request. When they happen, the worker thread exits.
:raises BaseException: when an unexpected non-interrupt
exception leaks from the inner layers
# noqa: DAR401 KeyboardInterrupt SystemExit
"""
self.server.stats['Worker Threads'][self.name] = self.stats
self.ready = True
try:
self.ready = True
while True:
conn = self.server.requests.get()
if conn is _SHUTDOWNREQUEST:
return

self.conn = conn
is_stats_enabled = self.server.stats['Enabled']
if is_stats_enabled:
self.start_time = time.time()
keep_conn_open = False
try:
keep_conn_open = conn.communicate()
except ConnectionError as connection_error:
keep_conn_open = False # Drop the connection cleanly
self.server.error_log(
'Got a connection error while handling a '
f'connection from {conn.remote_addr !s}:'
f'{conn.remote_port !s} ({connection_error !s})',
level=logging.INFO,
)
continue
except (KeyboardInterrupt, SystemExit) as shutdown_request:
# Shutdown request
keep_conn_open = False # Drop the connection cleanly
self.server.error_log(
'Got a server shutdown request while handling a '
f'connection from {conn.remote_addr !s}:'
f'{conn.remote_port !s} ({shutdown_request !s})',
level=logging.DEBUG,
)
raise SystemExit(
str(shutdown_request),
) from shutdown_request
except Exception as unhandled_error:
# NOTE: Only a shutdown request should bubble up to the
# NOTE: external cleanup code. Otherwise, this thread dies.
# NOTE: If this were to happen, the threadpool would still
# NOTE: list a dead thread without knowing its state. And
# NOTE: the calling code would fail to schedule processing
# NOTE: of new requests.
self.server.error_log(
'Unhandled error while processing an incoming '
f'connection {unhandled_error !r}',
level=logging.ERROR,
traceback=True,
)
continue # Prevent the thread from dying
finally:
# NOTE: Any exceptions coming from within `finally` may
# NOTE: kill the thread, causing the threadpool to only
# NOTE: contain references to dead threads rendering the
# NOTE: server defunct, effectively meaning a DoS.
# NOTE: Ideally, things called here should process
# NOTE: everything recoverable internally. Any unhandled
# NOTE: errors will bubble up into the outer try/except
# NOTE: block. They will be treated as fatal and turned
# NOTE: into server shutdown requests and then reraised
# NOTE: unconditionally.
if keep_conn_open:
self.server.put_conn(conn)
else:
conn.close()
if is_stats_enabled:
self.requests_seen += self.conn.requests_seen
self.bytes_read += self.conn.rfile.bytes_read
self.bytes_written += self.conn.wfile.bytes_written
self.work_time += time.time() - self.start_time
self.start_time = None
self.conn = None
self._process_connections_until_interrupted()
except (KeyboardInterrupt, SystemExit) as interrupt_exc:
interrupt_cause = interrupt_exc.__cause__ or interrupt_exc
self.server.error_log(

Check warning on line 122 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L120-L122

Added lines #L120 - L122 were not covered by tests
Expand All @@ -200,6 +141,85 @@ def run(self):
self.server.interrupt = underlying_exc
raise

Check warning on line 142 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L141-L142

Added lines #L141 - L142 were not covered by tests

def _process_connections_until_interrupted(self):
"""Process incoming HTTP connections in an infinite loop.
Retrieves incoming connections from thread pool, processing
them one by one.
:raises SystemExit: on the internal requests to stop the
server instance
"""
while True:
conn = self.server.requests.get()
if conn is _SHUTDOWNREQUEST:
return

self.conn = conn
is_stats_enabled = self.server.stats['Enabled']
if is_stats_enabled:
self.start_time = time.time()

Check warning on line 161 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L161

Added line #L161 was not covered by tests
keep_conn_open = False
try:
keep_conn_open = conn.communicate()
except ConnectionError as connection_error:
keep_conn_open = False # Drop the connection cleanly
self.server.error_log(

Check warning on line 167 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L165-L167

Added lines #L165 - L167 were not covered by tests
'Got a connection error while handling a '
f'connection from {conn.remote_addr !s}:'
f'{conn.remote_port !s} ({connection_error !s})',
level=logging.INFO,
)
continue
except (KeyboardInterrupt, SystemExit) as shutdown_request:

Check warning on line 174 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L173-L174

Added lines #L173 - L174 were not covered by tests
# Shutdown request
keep_conn_open = False # Drop the connection cleanly
self.server.error_log(

Check warning on line 177 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L176-L177

Added lines #L176 - L177 were not covered by tests
'Got a server shutdown request while handling a '
f'connection from {conn.remote_addr !s}:'
f'{conn.remote_port !s} ({shutdown_request !s})',
level=logging.DEBUG,
)
raise SystemExit(

Check warning on line 183 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L183

Added line #L183 was not covered by tests
str(shutdown_request),
) from shutdown_request
except Exception as unhandled_error:

Check warning on line 186 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L186

Added line #L186 was not covered by tests
# NOTE: Only a shutdown request should bubble up to the
# NOTE: external cleanup code. Otherwise, this thread dies.
# NOTE: If this were to happen, the threadpool would still
# NOTE: list a dead thread without knowing its state. And
# NOTE: the calling code would fail to schedule processing
# NOTE: of new requests.
self.server.error_log(

Check warning on line 193 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L193

Added line #L193 was not covered by tests
'Unhandled error while processing an incoming '
f'connection {unhandled_error !r}',
level=logging.ERROR,
traceback=True,
)
continue # Prevent the thread from dying

Check warning on line 199 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L199

Added line #L199 was not covered by tests
finally:
# NOTE: Any exceptions coming from within `finally` may
# NOTE: kill the thread, causing the threadpool to only
# NOTE: contain references to dead threads rendering the
# NOTE: server defunct, effectively meaning a DoS.
# NOTE: Ideally, things called here should process
# NOTE: everything recoverable internally. Any unhandled
# NOTE: errors will bubble up into the outer try/except
# NOTE: block. They will be treated as fatal and turned
# NOTE: into server shutdown requests and then reraised
# NOTE: unconditionally.
if keep_conn_open:
self.server.put_conn(conn)
else:
conn.close()
if is_stats_enabled:
self.requests_seen += conn.requests_seen
self.bytes_read += conn.rfile.bytes_read
self.bytes_written += conn.wfile.bytes_written
self.work_time += time.time() - self.start_time
self.start_time = None

Check warning on line 220 in cheroot/workers/threadpool.py

View check run for this annotation

Codecov / codecov/patch

cheroot/workers/threadpool.py#L216-L220

Added lines #L216 - L220 were not covered by tests
self.conn = None


class ThreadPool:
"""A Request Queue for an HTTPServer which pools threads.
Expand Down

0 comments on commit 432a855

Please sign in to comment.