-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MPP-3815: Handle broken email processing #4702
Conversation
This was replaced by process_emails_from_sqs with different parameters, such as using the DLQ queue and deleting failed messages.
If this retry logic ran, it would emit counter metrics and error logs. Since there are none, we can assume this never happens or is rare enough to remove and wait until it happends again.
There are few changes needed for the code. One change is that the type hints say that queue metrics, like ApproximateNumberOfMessages, are strings, so this converts them to integers.
Because of a reused variable, the queue data and the cycle number were not present in the log.
Avoid .update() because 1) it requires more lines of code and 2) it is not very compatible with TypedDict, and I hope to use that more in the future.
multiprocessing runs the target function in a subprocess rather than a thread. This is slower to start up than a thread, and requires django.setup() to initialize the application. However, it does allow us to terminate a stuck process, which happens sometimes in email processing and frequently in DLQ processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice code; looks good; spot-check works well; tests pass. Just a couple clarifying questions.
The retry logic was removed in May 2024 when no throttling or pause errors were | ||
registered in the previous 6 months. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise: nice doc.
assert rec2_extra["success"] is True | ||
assert rec2_extra["message_process_time_s"] < 120.0 | ||
assert rec2_extra["subprocess_setup_time_s"] == 1.0 | ||
assert mock_process_pool_future._timeouts == [1.0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (non-blocking): I don't understand this _timeouts
property: why is there a 1.0
value in the _timeouts
when the message succeeded? That seems like there should be no timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! I got the mock backwards. This line:
mock_process_pool_future._is_stalled.side_effect = [False, False, True]
should be:
mock_process_pool_future._is_stalled.side_effect = [True, True, False]
_timeouts
is [1.0]
because it was called one. When you call future.wait(1.0)
on the mock future, it runs:
def call_wait(timeout: float) -> None:
mocked_clocks(timeout)
mock_future._timeouts.append(timeout)
if not mock_future._is_stalled():
mock_future._ready = True
try:
ret = func(*args)
except BaseException as e:
if error_callback:
error_callback(e)
else:
if callback:
callback(ret)
So, _timeouts
always has a value when future.wait()
is called.
assert rec2_extra["error"] == "Timed out after 120.0 seconds." | ||
assert rec2_extra["message_process_time_s"] >= 120.0 | ||
assert rec2_extra["subprocess_setup_time_s"] == 1.0 | ||
assert mock_process_pool_future._timeouts == [1.0] * 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (non-blocking): Again I don't understand this _timeouts
property: why is it a list of 60 1.0
values when the message timed out after 120 seconds? That seems like there should be 120 1.0
values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! The loop includes a call to time.monotonic
, which also increments the mocked clock, so each loops is 2 fake seconds. However, it makes more sense if future.wait
does not increment the clock. I'll make that change.
Suspect IssuesThis pull request was deployed and Sentry observed the following issues:
Did you find this useful? React with a 👍 or 👎 |
This PR makes several changes to
./manage.py process_emails_from_sqs
, now that it is being used to process the dead letter queue (DLQ) as well as the incoming emails. This PR replaces PR #4689. The changes to processing are:PROCESS_EMAIL_MAX_SECONDS_PER_MESSAGE
, default 120, controls how many seconds is too many. This will stop email tasks and the DLQ task from getting killed by Kubernetes after 120 seconds.Additional changes:
process_delayed_emails_from_sqs.py
, which is now unused and had no testsmypy-boto3-sns
,mypy-boto3-sqs
, andmypy-boto3-s3
for more boto3 type hintsprocess_emails_from_sqs.py
and its testsHow to test
If you have your local environment setup for email processing from a queue, enable your AWS credentials, run
./manage.py process_emails_from_sqs
and send some emails. You can also push the branch to Heroku and test it there.