From 794ee25a979342de4d7ebccfecc2f46ba3319ca3 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Fri, 20 Dec 2024 13:31:57 -0800 Subject: [PATCH] Internally call shutdown to prevent a resource leak when calling terminate_workers --- Doc/library/concurrent.futures.rst | 7 ++++--- Lib/concurrent/futures/process.py | 12 ++++++++++-- .../test_concurrent_futures/test_process_pool.py | 9 +++++---- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index dd7f1e38357130..52aa16f38e3d91 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -419,11 +419,12 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. Attempt to terminate all living worker processes immediately by sending each of them the given signal. If the signal is not specified, the default - signal :data:`signal.SIGTERM` is used. + signal :data:`signal.SIGTERM` is used. Internally, it will also call + :meth:`Executor.shutdown` to ensure that all other resources associated with + the executor are freed. After calling this method the caller should no longer submit tasks to the - executor. It is also recommended to still call :meth:`Executor.shutdown` - to ensure that all other resources associated with the executor are freed. + executor. .. versionadded:: next diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index f6f2337246890f..60c1fcd72e3009 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -870,10 +870,18 @@ def terminate_workers(self, signal=signal.SIGTERM): signal: The signal to send to each worker process. Defaults to signal.SIGTERM. """ - if not self._processes: + processes = {} + if self._processes: + processes = self._processes.copy() + + # shutdown will invalidate ._processes, so we copy it right before calling. + # If we waited here, we would deadlock if a process decides not to exit. + self.shutdown(wait=False, cancel_futures=True) + + if not processes: return - for pid, proc in self._processes.items(): + for pid, proc in processes.items(): try: if not proc.is_alive(): continue diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 54c23dd5629545..ed10b9356be0ac 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -251,7 +251,9 @@ def test_process_pool_executor_terminate_workers_dead_workers(self): # Patching in here instead of at the function level since we only want # to patch it for this function call, not other parts of the flow. with unittest.mock.patch('concurrent.futures.process.os.kill') as mock_kill: - executor.terminate_workers() + with unittest.mock.patch.object(executor, 'shutdown') as mock_shutdown: + executor.terminate_workers() + mock_shutdown.assert_called_once_with(wait=False, cancel_futures=True) mock_kill.assert_not_called() @@ -269,8 +271,7 @@ def test_process_pool_executor_terminate_workers_stops_pool(self): executor.terminate_workers() - future = executor.submit(time.sleep, 0) - self.assertRaises(BrokenProcessPool, future.result) + self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) @unittest.mock.patch('concurrent.futures.process.os.kill') def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill): @@ -278,9 +279,9 @@ def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill): future = executor.submit(time.sleep, 0) future.result() + worker_process = list(executor._processes.values())[0] executor.terminate_workers(signal.SIGABRT) - worker_process = list(executor._processes.values())[0] mock_kill.assert_called_once_with(worker_process.pid, signal.SIGABRT) def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self):