-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor queue resume/stop/start #786
Conversation
Whoops meant for this to be a WIP PR. Submitting for early review, please focus on the docstrings and lemme know if they make sense. In the meantime I'll work on tests and updating architecture docs. |
e86b13e
to
a18b202
Compare
This is now ready for review and can be merged after #787 is done. |
a18b202
to
1acb649
Compare
securedrop_client/queue.py
Outdated
super().__init__() | ||
self.api_client = api_client | ||
self.session_maker = session_maker | ||
# If the maxsize provided to PriorityQueue is 0, there will be no upper bound | ||
self.queue = PriorityQueue(maxsize=size) # type: PriorityQueue[Tuple[int, ApiJob]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we were passing maxsize
only for the metadata queue (which had maxsize=1) we can also remove this maxsize
functionality if we want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(can be done in a followup or at a later date)
also handling the Full
exception in the job adding methods is only relevant when the queue can be full, so that can also be removed
i removed support for Now when timeout occurs
instead of...
Now when we resume
instaed of...
The reason we see the |
See a CI error, which I'll have to investigate tomorrow unless someone gets to it first:
|
58c8aa6
to
ee424f5
Compare
(rebased after ci fix) |
rebased and retested, everything still works! |
Emit the paused signal if the main queue has been paused. | ||
''' | ||
logger.debug('Paused main queue') | ||
self.paused.emit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a little hard to follow because both ApiJobQueue
and RunnableQueue
have signals called paused
on them which do slightly different things: the first (signal defined on ApiJobQueue
) signals the Controller object "hey i paused the queues, update the GUI", the other (signal defined on RunnableQueue
) is from a queue to signal ApiJobQueue
"hey i hit a consistent request timeout, let's stop the queues and let the GUI know".
However it looks like these queues are pausing independently: one can be stopped while the other is running. Meanwhile the GUI shows that the server is unreachable. Is this intentional? Or is the idea that if one queue is stopped due to network failure the other will eventually stop too?
lmk if you think I'm missing something here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is the idea that if one queue is stopped due to network failure the other will eventually stop too?
yes, this was the idea
the way it has been implemented is that there is a paused
signal in RunnableQueue
, which maintains a priority queue, and will pause when it encounters a request timeout error. meanwhile, other queues do not know about this, and cannot know unless ApiJobQueue
, which really should be renamed to ApiJobQueueManager
, tells them that another queue was paused. To keep things simple, the queue manager just sends a signal back to the GUI so the user can retry. It could also tell the other queue to pause by enqueuing a PauseQueueJob. I think this is risky because the other queue might at the same time be adding a PauseQueueJob because it too has encountered a timeout error.
i think changing the behavior of how this works should be discussed further and if we decide to change it i would advocate for doing so outside of this pr in order to leave it as a refactor-only pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one thing we could do as a small additional refactor in this PR is replace using a signal from RunnableQueue
with using a callback that ApiJobQueue
passes to it, so this:
--- a/securedrop_client/queue.py
+++ b/securedrop_client/queue.py
@@ -53,16 +53,15 @@ class RunnableQueue(QObject):
ReplyDownloadJob: 17,
}
- # Signal that is emitted when processing stops
- paused = pyqtSignal()
-
# Signal that is emitted to resume processing jobs
resume = pyqtSignal()
- def __init__(self, api_client: API, session_maker: scoped_session) -> None:
+ def __init__(self, api_client: API, session_maker: scoped_session, on_pause_callback) -> None:
super().__init__()
self.api_client = api_client
self.session_maker = session_maker
+ self.on_pause_callback = on_pause_callback
+
self.queue = PriorityQueue() # type: PriorityQueue[Tuple[int, ApiJob]]
# `order_number` ensures jobs with equal priority are retrived in FIFO order. This is needed
# because PriorityQueue is implemented using heapq which does not have sort stability. For
@@ -113,7 +112,7 @@ class RunnableQueue(QObject):
priority, job = self.queue.get(block=True)
if isinstance(job, PauseQueueJob):
- self.paused.emit()
+ self.on_pause_callback()
return
try:
@@ -154,8 +153,9 @@ class ApiJobQueue(QObject):
self.main_thread = QThread()
self.download_file_thread = QThread()
- self.main_queue = RunnableQueue(api_client, session_maker)
- self.download_file_queue = RunnableQueue(api_client, session_maker)
+ self.main_queue = RunnableQueue(api_client, session_maker, self.on_main_queue_paused)
+ self.download_file_queue = RunnableQueue(
+ api_client, session_maker, self.on_file_download_queue_paused)
self.main_queue.moveToThread(self.main_thread)
self.download_file_queue.moveToThread(self.download_file_thread)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your patience, I'm gonna approve this, we can continue to test the pause/unpause behavior more on master and decide what to do regarding the queues pausing independently (I agree that it's been this way since the beginning, I'm only just realizing it now)
Description
This is purely a refactor.
resume_queues
inon_authentication_success
since we were already starting the queues in that methodclear_error_status
from all the job success handlers and added it to the Controller'slogout
andresume_queues
methods.on_queue_paused
intoon_file_download_queue_paused
andon_main_queue_paused
so we could have clear debug logs on which queue we're pausing