From f0d17214c618ca09bee1acb5bee5c5f9881d0861 Mon Sep 17 00:00:00 2001 From: Gavin Aguiar Date: Wed, 11 Sep 2024 15:32:42 -0500 Subject: [PATCH 1/3] Fixed default threadpool count for pyver> 3.9 --- azure_functions_worker/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index a9849b28..eedb6763 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -960,7 +960,7 @@ def tp_max_workers_validator(value: str) -> bool: # Starting Python 3.9, worker won't be putting a limit on the # max_workers count in the created threadpool. - default_value = None if sys.version_info.minor == 9 \ + default_value = None if sys.version_info.minor >= 9 \ else f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}' max_workers = get_app_setting(setting=PYTHON_THREADPOOL_THREAD_COUNT, From 31b2c361548c796e1d57f92cf27a1878cd0ad010 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Mon, 16 Sep 2024 16:44:23 -0500 Subject: [PATCH 2/3] ptptc test fixes --- azure_functions_worker/dispatcher.py | 4 +- tests/unittests/test_dispatcher.py | 100 +++++++++++++-------------- 2 files changed, 51 insertions(+), 53 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 3db99ee7..897a3499 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -24,6 +24,7 @@ from .bindings.shared_memory_data_transfer import SharedMemoryManager from .constants import ( APPLICATIONINSIGHTS_CONNECTION_STRING, + HTTP_URI, METADATA_PROPERTIES_WORKER_INDEXED, PYTHON_AZURE_MONITOR_LOGGER_NAME, PYTHON_AZURE_MONITOR_LOGGER_NAME_DEFAULT, @@ -39,8 +40,7 @@ PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, PYTHON_THREADPOOL_THREAD_COUNT_MAX_37, PYTHON_THREADPOOL_THREAD_COUNT_MIN, - REQUIRES_ROUTE_PARAMETERS, - HTTP_URI + REQUIRES_ROUTE_PARAMETERS ) from .extension import ExtensionManager from .http_v2 import ( diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index 2bb7efdb..8fa0aa0e 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -14,13 +14,16 @@ from azure_functions_worker import protos from azure_functions_worker.constants import ( + HTTP_URI, METADATA_PROPERTIES_WORKER_INDEXED, PYTHON_ENABLE_DEBUG_LOGGING, PYTHON_ENABLE_INIT_INDEXING, PYTHON_THREADPOOL_THREAD_COUNT, PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, + PYTHON_THREADPOOL_THREAD_COUNT_MAX, PYTHON_THREADPOOL_THREAD_COUNT_MAX_37, - PYTHON_THREADPOOL_THREAD_COUNT_MIN, HTTP_URI, REQUIRES_ROUTE_PARAMETERS, + PYTHON_THREADPOOL_THREAD_COUNT_MIN, + REQUIRES_ROUTE_PARAMETERS ) from azure_functions_worker.dispatcher import Dispatcher, ContextEnabledTask from azure_functions_worker.version import VERSION @@ -47,13 +50,15 @@ class TestThreadPoolSettingsPython37(testutils.AsyncTestCase): NEW_TYPING = sys.version_info[:3] >= (3, 7, 0) # PEP 560 """ - def setUp(self, version=SysVersionInfo(3, 7, 0, 'final', 0)): + def setUp(self, + version: Optional[any] = SysVersionInfo(3, 7, 0, 'final', 0), + default_workers: Optional[int] = PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, + max_workers: Optional[int] = PYTHON_THREADPOOL_THREAD_COUNT_MAX_37): self._ctrl = testutils.start_mockhost( script_root=DISPATCHER_FUNCTIONS_DIR) - self._default_workers: Optional[ - int] = PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT + self._default_workers: int = default_workers self._over_max_workers: int = 10000 - self._allowed_max_workers: int = PYTHON_THREADPOOL_THREAD_COUNT_MAX_37 + self._allowed_max_workers: int = max_workers self._pre_env = dict(os.environ) self.mock_version_info = patch( 'azure_functions_worker.dispatcher.sys.version_info', @@ -63,7 +68,6 @@ def setUp(self, version=SysVersionInfo(3, 7, 0, 'final', 0)): def tearDown(self): os.environ.clear() os.environ.update(self._pre_env) - self.mock_version_info.stop() async def test_dispatcher_initialize_worker(self): """Test if the dispatcher can be initialized worker successfully @@ -514,13 +518,14 @@ async def _check_if_async_function_is_ok(self, host) -> Tuple[str, str]: @unittest.skipIf(sys.version_info.minor != 8, - "Run the tests only for Python 3.8. In other platforms, " - "as the default passed is None, the cpu_count determines the " - "number of max_workers and we cannot mock the os.cpu_count() " - "in the concurrent.futures.ThreadPoolExecutor") + "Run the tests only for Python 3.8.") class TestThreadPoolSettingsPython38(TestThreadPoolSettingsPython37): - def setUp(self, version=SysVersionInfo(3, 8, 0, 'final', 0)): - super(TestThreadPoolSettingsPython38, self).setUp(version) + def setUp(self, version=None, default_workers=None, max_workers=None): + version = sys.version_info + super(TestThreadPoolSettingsPython38, self).setUp( + version, + PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, + PYTHON_THREADPOOL_THREAD_COUNT_MAX) self._allowed_max_workers: int = self._over_max_workers def tearDown(self): @@ -545,50 +550,43 @@ async def test_dispatcher_sync_threadpool_in_placeholder_above_max(self): self._default_workers) -@unittest.skipIf(sys.version_info.minor != 9, - "Run the tests only for Python 3.9. In other platforms, " - "as the default passed is None, the cpu_count determines the " - "number of max_workers and we cannot mock the os.cpu_count() " - "in the concurrent.futures.ThreadPoolExecutor") -class TestThreadPoolSettingsPython39(TestThreadPoolSettingsPython38): - def setUp(self, version=SysVersionInfo(3, 9, 0, 'final', 0)): - super(TestThreadPoolSettingsPython39, self).setUp(version) - - self.mock_os_cpu = patch( - 'os.cpu_count', return_value=2) - # 6 - based on 2 cores - min(32, (os.cpu_count() or 1) + 4) - 2 + 4 - self._default_workers: Optional[int] = 6 - self.mock_os_cpu.start() +@unittest.skipIf(sys.version_info.minor < 9, + "Run the tests only for Python 3.9+.") +class TestThreadPoolSettingsPython39(TestThreadPoolSettingsPython37): + """ + For 3.9 and above, there is no limit on the max_workers count in the + created threadpool. The default value is set to None in the worker. + Therefore, the default thread count will be set by the Python + ThreadPoolExecutor, which does so by the formula + min(32, (os.cpu_count() or 1) + 4). + """ + def setUp(self, version=None, default_workers=None, max_workers=None): + version = sys.version_info + default_workers = min(32, (os.cpu_count() or 1) + 4) + super(TestThreadPoolSettingsPython39, self).setUp( + version, default_workers, PYTHON_THREADPOOL_THREAD_COUNT_MAX) + self._allowed_max_workers: int = self._over_max_workers def tearDown(self): - self.mock_os_cpu.stop() super(TestThreadPoolSettingsPython39, self).tearDown() + async def test_dispatcher_sync_threadpool_in_placeholder_above_max(self): + """Test if the sync threadpool will use any value and there isn't any + artificial max value set. + """ + with patch('azure_functions_worker.dispatcher.logger'): + async with self._ctrl as host: + await self._check_if_function_is_ok(host) -@unittest.skipIf(sys.version_info.minor != 10, - "Run the tests only for Python 3.10. In other platforms, " - "as the default passed is None, the cpu_count determines the " - "number of max_workers and we cannot mock the os.cpu_count() " - "in the concurrent.futures.ThreadPoolExecutor") -class TestThreadPoolSettingsPython310(TestThreadPoolSettingsPython39): - def setUp(self, version=SysVersionInfo(3, 10, 0, 'final', 0)): - super(TestThreadPoolSettingsPython310, self).setUp(version) - - def tearDown(self): - super(TestThreadPoolSettingsPython310, self).tearDown() - - -@unittest.skipIf(sys.version_info.minor != 11, - "Run the tests only for Python 3.11. In other platforms, " - "as the default passed is None, the cpu_count determines the " - "number of max_workers and we cannot mock the os.cpu_count() " - "in the concurrent.futures.ThreadPoolExecutor") -class TestThreadPoolSettingsPython311(TestThreadPoolSettingsPython310): - def setUp(self, version=SysVersionInfo(3, 11, 0, 'final', 0)): - super(TestThreadPoolSettingsPython311, self).setUp(version) - - def tearDown(self): - super(TestThreadPoolSettingsPython310, self).tearDown() + # Reload environment variable on specialization + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: f'{self._over_max_workers}' + }) + await self._assert_workers_threadpool(self._ctrl, host, + self._allowed_max_workers) + self.assertNotEqual( + self._ctrl._worker.get_sync_tp_workers_set(), + self._default_workers) class TestDispatcherStein(testutils.AsyncTestCase): From 635b14741ce88b75f7ff245503b7a968375b00da Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Tue, 17 Sep 2024 12:23:55 -0500 Subject: [PATCH 3/3] test fixes + adding 312 test --- tests/unittests/test_dispatcher.py | 129 +++++++++++++++++++---------- 1 file changed, 85 insertions(+), 44 deletions(-) diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index 8fa0aa0e..ebaac2ce 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -20,7 +20,6 @@ PYTHON_ENABLE_INIT_INDEXING, PYTHON_THREADPOOL_THREAD_COUNT, PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, - PYTHON_THREADPOOL_THREAD_COUNT_MAX, PYTHON_THREADPOOL_THREAD_COUNT_MAX_37, PYTHON_THREADPOOL_THREAD_COUNT_MIN, REQUIRES_ROUTE_PARAMETERS @@ -50,15 +49,13 @@ class TestThreadPoolSettingsPython37(testutils.AsyncTestCase): NEW_TYPING = sys.version_info[:3] >= (3, 7, 0) # PEP 560 """ - def setUp(self, - version: Optional[any] = SysVersionInfo(3, 7, 0, 'final', 0), - default_workers: Optional[int] = PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, - max_workers: Optional[int] = PYTHON_THREADPOOL_THREAD_COUNT_MAX_37): + def setUp(self, version=SysVersionInfo(3, 7, 0, 'final', 0)): self._ctrl = testutils.start_mockhost( script_root=DISPATCHER_FUNCTIONS_DIR) - self._default_workers: int = default_workers + self._default_workers: Optional[ + int] = PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT self._over_max_workers: int = 10000 - self._allowed_max_workers: int = max_workers + self._allowed_max_workers: int = PYTHON_THREADPOOL_THREAD_COUNT_MAX_37 self._pre_env = dict(os.environ) self.mock_version_info = patch( 'azure_functions_worker.dispatcher.sys.version_info', @@ -68,6 +65,7 @@ def setUp(self, def tearDown(self): os.environ.clear() os.environ.update(self._pre_env) + self.mock_version_info.stop() async def test_dispatcher_initialize_worker(self): """Test if the dispatcher can be initialized worker successfully @@ -518,14 +516,13 @@ async def _check_if_async_function_is_ok(self, host) -> Tuple[str, str]: @unittest.skipIf(sys.version_info.minor != 8, - "Run the tests only for Python 3.8.") + "Run the tests only for Python 3.8. In other platforms, " + "as the default passed is None, the cpu_count determines the " + "number of max_workers and we cannot mock the os.cpu_count() " + "in the concurrent.futures.ThreadPoolExecutor") class TestThreadPoolSettingsPython38(TestThreadPoolSettingsPython37): - def setUp(self, version=None, default_workers=None, max_workers=None): - version = sys.version_info - super(TestThreadPoolSettingsPython38, self).setUp( - version, - PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, - PYTHON_THREADPOOL_THREAD_COUNT_MAX) + def setUp(self, version=SysVersionInfo(3, 8, 0, 'final', 0)): + super(TestThreadPoolSettingsPython38, self).setUp(version) self._allowed_max_workers: int = self._over_max_workers def tearDown(self): @@ -550,43 +547,87 @@ async def test_dispatcher_sync_threadpool_in_placeholder_above_max(self): self._default_workers) -@unittest.skipIf(sys.version_info.minor < 9, - "Run the tests only for Python 3.9+.") +@unittest.skipIf(sys.version_info.minor != 9, + "Run the tests only for Python 3.9. In other platforms, " + "as the default passed is None, the cpu_count determines the " + "number of max_workers and we cannot mock the os.cpu_count() " + "in the concurrent.futures.ThreadPoolExecutor") class TestThreadPoolSettingsPython39(TestThreadPoolSettingsPython37): - """ - For 3.9 and above, there is no limit on the max_workers count in the - created threadpool. The default value is set to None in the worker. - Therefore, the default thread count will be set by the Python - ThreadPoolExecutor, which does so by the formula - min(32, (os.cpu_count() or 1) + 4). - """ - def setUp(self, version=None, default_workers=None, max_workers=None): - version = sys.version_info - default_workers = min(32, (os.cpu_count() or 1) + 4) - super(TestThreadPoolSettingsPython39, self).setUp( - version, default_workers, PYTHON_THREADPOOL_THREAD_COUNT_MAX) + def setUp(self, version=SysVersionInfo(3, 9, 0, 'final', 0)): + super(TestThreadPoolSettingsPython39, self).setUp(version) + self.mock_os_cpu = patch( + 'os.cpu_count', return_value=2) + # 6 - based on 2 cores - min(32, (os.cpu_count() or 1) + 4) - 2 + 4 + self._default_workers: Optional[int] = 6 + self.mock_os_cpu.start() self._allowed_max_workers: int = self._over_max_workers def tearDown(self): + self.mock_os_cpu.stop() super(TestThreadPoolSettingsPython39, self).tearDown() - async def test_dispatcher_sync_threadpool_in_placeholder_above_max(self): - """Test if the sync threadpool will use any value and there isn't any - artificial max value set. - """ - with patch('azure_functions_worker.dispatcher.logger'): - async with self._ctrl as host: - await self._check_if_function_is_ok(host) - # Reload environment variable on specialization - await host.reload_environment(environment={ - PYTHON_THREADPOOL_THREAD_COUNT: f'{self._over_max_workers}' - }) - await self._assert_workers_threadpool(self._ctrl, host, - self._allowed_max_workers) - self.assertNotEqual( - self._ctrl._worker.get_sync_tp_workers_set(), - self._default_workers) +@unittest.skipIf(sys.version_info.minor != 10, + "Run the tests only for Python 3.10. In other platforms, " + "as the default passed is None, the cpu_count determines the " + "number of max_workers and we cannot mock the os.cpu_count() " + "in the concurrent.futures.ThreadPoolExecutor") +class TestThreadPoolSettingsPython310(TestThreadPoolSettingsPython37): + def setUp(self, version=SysVersionInfo(3, 10, 0, 'final', 0)): + super(TestThreadPoolSettingsPython310, self).setUp(version) + self._allowed_max_workers: int = self._over_max_workers + self.mock_os_cpu = patch( + 'os.cpu_count', return_value=2) + # 6 - based on 2 cores - min(32, (os.cpu_count() or 1) + 4) - 2 + 4 + self._default_workers: Optional[int] = 6 + self.mock_os_cpu.start() + self._allowed_max_workers: int = self._over_max_workers + + def tearDown(self): + self.mock_os_cpu.stop() + super(TestThreadPoolSettingsPython310, self).tearDown() + + +@unittest.skipIf(sys.version_info.minor != 11, + "Run the tests only for Python 3.11. In other platforms, " + "as the default passed is None, the cpu_count determines the " + "number of max_workers and we cannot mock the os.cpu_count() " + "in the concurrent.futures.ThreadPoolExecutor") +class TestThreadPoolSettingsPython311(TestThreadPoolSettingsPython37): + def setUp(self, version=SysVersionInfo(3, 11, 0, 'final', 0)): + super(TestThreadPoolSettingsPython311, self).setUp(version) + self._allowed_max_workers: int = self._over_max_workers + self.mock_os_cpu = patch( + 'os.cpu_count', return_value=2) + # 6 - based on 2 cores - min(32, (os.cpu_count() or 1) + 4) - 2 + 4 + self._default_workers: Optional[int] = 6 + self.mock_os_cpu.start() + self._allowed_max_workers: int = self._over_max_workers + + def tearDown(self): + self.mock_os_cpu.stop() + super(TestThreadPoolSettingsPython311, self).tearDown() + + +@unittest.skipIf(sys.version_info.minor != 12, + "Run the tests only for Python 3.12. In other platforms, " + "as the default passed is None, the cpu_count determines the " + "number of max_workers and we cannot mock the os.cpu_count() " + "in the concurrent.futures.ThreadPoolExecutor") +class TestThreadPoolSettingsPython312(TestThreadPoolSettingsPython37): + def setUp(self, version=SysVersionInfo(3, 12, 0, 'final', 0)): + super(TestThreadPoolSettingsPython312, self).setUp(version) + self._allowed_max_workers: int = self._over_max_workers + self.mock_os_cpu = patch( + 'os.cpu_count', return_value=2) + # 6 - based on 2 cores - min(32, (os.cpu_count() or 1) + 4) - 2 + 4 + self._default_workers: Optional[int] = 6 + self.mock_os_cpu.start() + self._allowed_max_workers: int = self._over_max_workers + + def tearDown(self): + self.mock_os_cpu.stop() + super(TestThreadPoolSettingsPython312, self).tearDown() class TestDispatcherStein(testutils.AsyncTestCase):