From 9f5c8fdbfef2474972a50cfa2af850d4dc620434 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 1 Apr 2021 13:10:31 +0200 Subject: [PATCH 1/2] chore: fix streaming pull close test flakiness --- .../subscriber/test_streaming_pull_manager.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 9930e8f14..00384c617 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -550,6 +550,26 @@ def make_running_manager(**kwargs): ) +def await_manager_shutdown(timeout=None): # pragma: NO COVER + # NOTE: This method should be called after manager.close(), i.e. after the shutdown + # thread has been started. + shutdown_thread = next( + ( + thread + for thread in threading.enumerate() + if thread.name == streaming_pull_manager._REGULAR_SHUTDOWN_THREAD_NAME + ), + None, + ) + + if shutdown_thread is None: + return # Shutdown already finished. + + shutdown_thread.join(timeout=timeout) + if shutdown_thread.is_alive(): + pytest.fail("Shutdown not completed in time.") + + def test_close(): ( manager, @@ -561,6 +581,7 @@ def test_close(): ) = make_running_manager() manager.close() + await_manager_shutdown(timeout=3) consumer.stop.assert_called_once() leaser.stop.assert_called_once() @@ -583,6 +604,7 @@ def test_close_inactive_consumer(): consumer.is_active = False manager.close() + await_manager_shutdown(timeout=3) consumer.stop.assert_not_called() leaser.stop.assert_called_once() @@ -596,6 +618,7 @@ def test_close_idempotent(): manager.close() manager.close() + await_manager_shutdown(timeout=3) assert scheduler.shutdown.call_count == 1 @@ -640,6 +663,7 @@ def test_close_no_dispatcher_error(): dispatcher.start() manager.close() + await_manager_shutdown(timeout=3) error_callback.assert_not_called() @@ -651,6 +675,7 @@ def test_close_callbacks(): manager.add_close_callback(callback) manager.close(reason="meep") + await_manager_shutdown(timeout=3) callback.assert_called_once_with(manager, "meep") @@ -660,6 +685,7 @@ def test_close_blocking_scheduler_shutdown(): scheduler = manager._scheduler manager.close() + await_manager_shutdown(timeout=3) scheduler.shutdown.assert_called_once_with(await_msg_callbacks=True) @@ -669,6 +695,7 @@ def test_close_nonblocking_scheduler_shutdown(): scheduler = manager._scheduler manager.close() + await_manager_shutdown(timeout=3) scheduler.shutdown.assert_called_once_with(await_msg_callbacks=False) @@ -690,6 +717,7 @@ def fake_nack(self): manager._messages_on_hold._messages_on_hold.append(messages[2]) manager.close() + await_manager_shutdown(timeout=3) assert sorted(nacked_messages) == [b"msg1", b"msg2", b"msg3"] From 1e30875a77587c80160bc27e826783f516d238a8 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 2 Apr 2021 10:05:40 +0200 Subject: [PATCH 2/2] Store shutdown thread on the manager instance --- .../_protocol/streaming_pull_manager.py | 5 +-- .../subscriber/test_streaming_pull_manager.py | 35 ++++++++----------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index ac940de26..e244e871d 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -148,6 +148,7 @@ def __init__( self._closing = threading.Lock() self._closed = False self._close_callbacks = [] + self._regular_shutdown_thread = None # Created on intentional shutdown. # Generate a random client id tied to this object. All streaming pull # connections (initial and re-connects) will then use the same client @@ -539,13 +540,13 @@ def close(self, reason=None): an "intentional" shutdown. This is passed to the callbacks specified via :meth:`add_close_callback`. """ - thread = threading.Thread( + self._regular_shutdown_thread = threading.Thread( name=_REGULAR_SHUTDOWN_THREAD_NAME, daemon=True, target=self._shutdown, kwargs={"reason": reason}, ) - thread.start() + self._regular_shutdown_thread.start() def _shutdown(self, reason=None): """Run the actual shutdown sequence (stop the stream and all helper threads). diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 00384c617..25ab4f0ae 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -550,23 +550,16 @@ def make_running_manager(**kwargs): ) -def await_manager_shutdown(timeout=None): # pragma: NO COVER +def await_manager_shutdown(manager, timeout=None): # NOTE: This method should be called after manager.close(), i.e. after the shutdown - # thread has been started. - shutdown_thread = next( - ( - thread - for thread in threading.enumerate() - if thread.name == streaming_pull_manager._REGULAR_SHUTDOWN_THREAD_NAME - ), - None, - ) + # thread has been created and started. + shutdown_thread = manager._regular_shutdown_thread - if shutdown_thread is None: - return # Shutdown already finished. + if shutdown_thread is None: # pragma: NO COVER + raise Exception("Shutdown thread does not exist on the manager instance.") shutdown_thread.join(timeout=timeout) - if shutdown_thread.is_alive(): + if shutdown_thread.is_alive(): # pragma: NO COVER pytest.fail("Shutdown not completed in time.") @@ -581,7 +574,7 @@ def test_close(): ) = make_running_manager() manager.close() - await_manager_shutdown(timeout=3) + await_manager_shutdown(manager, timeout=3) consumer.stop.assert_called_once() leaser.stop.assert_called_once() @@ -604,7 +597,7 @@ def test_close_inactive_consumer(): consumer.is_active = False manager.close() - await_manager_shutdown(timeout=3) + await_manager_shutdown(manager, timeout=3) consumer.stop.assert_not_called() leaser.stop.assert_called_once() @@ -618,7 +611,7 @@ def test_close_idempotent(): manager.close() manager.close() - await_manager_shutdown(timeout=3) + await_manager_shutdown(manager, timeout=3) assert scheduler.shutdown.call_count == 1 @@ -663,7 +656,7 @@ def test_close_no_dispatcher_error(): dispatcher.start() manager.close() - await_manager_shutdown(timeout=3) + await_manager_shutdown(manager, timeout=3) error_callback.assert_not_called() @@ -675,7 +668,7 @@ def test_close_callbacks(): manager.add_close_callback(callback) manager.close(reason="meep") - await_manager_shutdown(timeout=3) + await_manager_shutdown(manager, timeout=3) callback.assert_called_once_with(manager, "meep") @@ -685,7 +678,7 @@ def test_close_blocking_scheduler_shutdown(): scheduler = manager._scheduler manager.close() - await_manager_shutdown(timeout=3) + await_manager_shutdown(manager, timeout=3) scheduler.shutdown.assert_called_once_with(await_msg_callbacks=True) @@ -695,7 +688,7 @@ def test_close_nonblocking_scheduler_shutdown(): scheduler = manager._scheduler manager.close() - await_manager_shutdown(timeout=3) + await_manager_shutdown(manager, timeout=3) scheduler.shutdown.assert_called_once_with(await_msg_callbacks=False) @@ -717,7 +710,7 @@ def fake_nack(self): manager._messages_on_hold._messages_on_hold.append(messages[2]) manager.close() - await_manager_shutdown(timeout=3) + await_manager_shutdown(manager, timeout=3) assert sorted(nacked_messages) == [b"msg1", b"msg2", b"msg3"]