From 8bffa2c2067f43da746c8f4e890c5a007bab0a12 Mon Sep 17 00:00:00 2001 From: weiwei Date: Mon, 29 Jan 2024 16:57:54 +0800 Subject: [PATCH] add on_ready_counter to reduction --- billiard/pool.py | 1 + t/unit/test_pool.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/billiard/pool.py b/billiard/pool.py index 82bc53a..da4cc4e 100644 --- a/billiard/pool.py +++ b/billiard/pool.py @@ -271,6 +271,7 @@ def __reduce__(self): self.inq, self.outq, self.synq, self.initializer, self.initargs, self.maxtasks, self._shutdown, self.on_exit, self.sigprotection, self.wrap_exception, self.max_memory_per_child, + self.on_ready_counter ) def __call__(self): diff --git a/t/unit/test_pool.py b/t/unit/test_pool.py index 0518ee0..fba1546 100644 --- a/t/unit/test_pool.py +++ b/t/unit/test_pool.py @@ -1,4 +1,5 @@ import billiard.pool +from billiard import get_context import time import pytest @@ -9,6 +10,12 @@ def func(x): return x +def get_on_ready_count(): + import inspect + worker = inspect.stack()[1].frame.f_locals['self'] + return worker.on_ready_counter.value + + class test_pool: def test_raises(self): pool = billiard.pool.Pool() @@ -39,3 +46,13 @@ def test_exception_traceback_present(self): if i == 2: with pytest.raises(ValueError): res.get() + + def test_on_ready_counter_is_synchronized(self): + for ctx in ('spawn', 'fork', 'forkserver'): + pool = billiard.pool.Pool(processes=1, context=get_context(ctx)) + pool.apply_async(func, (1,)).get(1) + on_ready_counter = pool.apply_async(get_on_ready_count, ).get(1) + assert on_ready_counter == 1 + pool.close() + pool.join() + pool.terminate()