Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPP-3815: Handle broken email processing #4702

Merged
merged 9 commits into from
May 13, 2024

Conversation

jwhitlock
Copy link
Member

This PR makes several changes to ./manage.py process_emails_from_sqs, now that it is being used to process the dead letter queue (DLQ) as well as the incoming emails. This PR replaces PR #4689. The changes to processing are:

  • Messages are now processed in a subprocess, which allows updating the healthcheck file and also aborting processing when the processing seems stuck. A new setting PROCESS_EMAIL_MAX_SECONDS_PER_MESSAGE, default 120, controls how many seconds is too many. This will stop email tasks and the DLQ task from getting killed by Kubernetes after 120 seconds.
  • The processor now captures exceptions and sends them to Sentry. This will stop the email tasks and the DLQ tasks from existing on an unhandled exception.

Additional changes:

  • Remove process_delayed_emails_from_sqs.py, which is now unused and had no tests
  • Add dependencies mypy-boto3-sns, mypy-boto3-sqs, and mypy-boto3-s3 for more boto3 type hints
  • Add type hints to all code in process_emails_from_sqs.py and its tests
  • Add tests for above functionality

How to test

If you have your local environment setup for email processing from a queue, enable your AWS credentials, run ./manage.py process_emails_from_sqs and send some emails. You can also push the branch to Heroku and test it there.

jwhitlock added 9 commits May 13, 2024 09:34
This was replaced by process_emails_from_sqs with different parameters,
such as using the DLQ queue and deleting failed messages.
If this retry logic ran, it would emit counter metrics and error logs.
Since there are none, we can assume this never happens or is rare enough
to remove and wait until it happends again.
There are few changes needed for the code. One change is that the type
hints say that queue metrics, like ApproximateNumberOfMessages, are
strings, so this converts them to integers.
Because of a reused variable, the queue data and the cycle number were
not present in the log.
Avoid .update() because 1) it requires more lines of code and 2) it is
not very compatible with TypedDict, and I hope to use that more in the
future.
multiprocessing runs the target function in a subprocess rather than a
thread. This is slower to start up than a thread, and requires
django.setup() to initialize the application. However, it does allow us
to terminate a stuck process, which happens sometimes in email
processing and frequently in DLQ processing.
@jwhitlock jwhitlock requested a review from groovecoder May 13, 2024 14:54
Copy link
Member

@groovecoder groovecoder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice code; looks good; spot-check works well; tests pass. Just a couple clarifying questions.

Comment on lines +373 to +374
The retry logic was removed in May 2024 when no throttling or pause errors were
registered in the previous 6 months.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: nice doc.

assert rec2_extra["success"] is True
assert rec2_extra["message_process_time_s"] < 120.0
assert rec2_extra["subprocess_setup_time_s"] == 1.0
assert mock_process_pool_future._timeouts == [1.0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (non-blocking): I don't understand this _timeouts property: why is there a 1.0 value in the _timeouts when the message succeeded? That seems like there should be no timeout?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I got the mock backwards. This line:

mock_process_pool_future._is_stalled.side_effect = [False, False, True]

should be:

mock_process_pool_future._is_stalled.side_effect = [True, True, False]

_timeouts is [1.0] because it was called one. When you call future.wait(1.0) on the mock future, it runs:

            def call_wait(timeout: float) -> None:
                mocked_clocks(timeout)
                mock_future._timeouts.append(timeout)
                if not mock_future._is_stalled():
                    mock_future._ready = True
                    try:
                        ret = func(*args)
                    except BaseException as e:
                        if error_callback:
                            error_callback(e)
                    else:
                        if callback:
                            callback(ret)

So, _timeouts always has a value when future.wait() is called.

assert rec2_extra["error"] == "Timed out after 120.0 seconds."
assert rec2_extra["message_process_time_s"] >= 120.0
assert rec2_extra["subprocess_setup_time_s"] == 1.0
assert mock_process_pool_future._timeouts == [1.0] * 60
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (non-blocking): Again I don't understand this _timeouts property: why is it a list of 60 1.0 values when the message timed out after 120 seconds? That seems like there should be 120 1.0 values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! The loop includes a call to time.monotonic, which also increments the mocked clock, so each loops is 2 fake seconds. However, it makes more sense if future.wait does not increment the clock. I'll make that change.

@groovecoder groovecoder added this pull request to the merge queue May 13, 2024
Merged via the queue into main with commit 7a76734 May 13, 2024
28 checks passed
@groovecoder groovecoder deleted the handle-broken-email-processing-mpp-3815 branch May 13, 2024 15:59
Copy link

sentry-io bot commented May 19, 2024

Suspect Issues

This pull request was deployed and Sentry observed the following issues:

  • ‼️ ParamValidationError: Parameter validation failed: emails.utils in ses_send_raw_email View Issue

Did you find this useful? React with a 👍 or 👎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants