-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ./manage.py process_emails_from_sqs
to continously process messages from SQS
#1768
Conversation
✅ Deploy Preview for fx-relay-demo canceled.
|
After writing the PR message, I realized I should tag the new gauge metrics, and added a commit to do that. I also added a section about the requirements changes. This is a large PR! Maybe too large! If I need to break it up, I could potentially refactor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whew, looking good!
Something I'm still not super clear about from the code or comments is the "cycle" vs. "loop" concepts? It seems like the "cycle" is the cycle of long polling, and the "loop" is the loop over queue messages?
temp_error = ClientError( | ||
{ | ||
"Error": { | ||
"Message": "Maximum sending rate exceeded.", | ||
"Code": "ThrottlingException", | ||
} | ||
}, | ||
"", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (non-blocking): in pytest
, could this be made into its own fixture that is requested in each of these 2 tests that needs it? to eliminate the code duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a function make_client_error
to help construct the three ClientErrors
tested, and switched the second one to test AccountSendingPausedException
, which I'm guessing is the other temporary exception that has the phrase 'pause'
And while I'm writing this, I see one more ClientError
that could use make_client_error
...
def test_command_golden_path(mock_boto3_queue_constructor): | ||
"""The command runs successfully until completion.""" | ||
mock_boto3_queue_constructor.return_value = fake_queue() | ||
call_command("process_emails_from_sqs", "--max-seconds=4") | ||
mock_boto3_queue_constructor.assert_called_once_with( | ||
"https://sqs.us-east-2.amazonaws.example.com/111222333/queue-name" | ||
) | ||
assert mock_boto3_queue_constructor._mock_region == "us-east-2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (non-blocking): How about a super_golden_path
with lots of messages that all succeed, and a dumpster_fire_path
with lots of messages that all fail for different reasons?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I don't like "a list of a lot of things" tests. It is unclear what code each individual message is testing, so you get code coverage without a clear example of how to trigger the code path. I find them to be harder to refactor, and often get uglier as time goes by. When they break, I often have to manually break up the list and run several times to determine which one broke, and how to fix them.
When a test really calls for several similar but slightly different inputs, I prefer pytest.mark.parametrize
, so that each input is in a different test run. In many cases, this results in if
/then
clauses in the tests, which are another kind of test smell that I eliminated in the check_health
tests.
The purpose of this test and the next is to get coverage of the argument parsing and the handle
code. Most of the tests bypass these by directly calling the main_loop
. Email processing is completely mocked out, and the tests for that are elsewhere, or need to be written.
Staring at this test, I feel that the test is poorly named. It runs for 4 (fake) seconds and processes no messages at all. I was expecting more cases when I named it, but it turned out there was just two branches to test. A rename is in order, something like test_command_setup_success
Thanks for the feedback @groovecoder! I've fixed some small things, I'll pick up the larger ones or the ones I need to think about in the morning. There's two loops:
There's a lot in the details (timing, try/except blocks, etc), but maybe there's a structure and names that could make more sense. It also gets easier if we only request one message at a time - I'm not sure if there is a benefit to getting multiple at a time, like the existing DLQ does. |
Several changes from the previous review round: In
In
In
In
In
Open discussions:
There's some other (non-blocking) discussions, let me know if you want to continue them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! I got it working with my local Relay app by following the new steps in the doc. Woohoo!
Just a couple more things:
suggestion: Let's add the S3 and SQS steps as optional in the "Overview" section?
6. (Optional) Convert to store in S3
7. (Optional) Convert to back-end processing
docs/end-to-end-local-dev.md
Outdated
AWS<<-Local app: SQS fetch | ||
Local app->>AWS: POST /SendRawEmail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got a rendering error on GitHub with this?
Error rendering embedded code
Parse error on line 6:
... AWS<<-Local app: SQS fetch Loca
----------------------^
Expecting 'SOLID_OPEN_ARROW', 'DOTTED_OPEN_ARROW', 'SOLID_ARROW', 'DOTTED_ARROW', 'SOLID_CROSS', 'DOTTED_CROSS', 'SOLID_POINT', 'DOTTED_POINT', got 'TXT'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ha mermaid can't read a arrow that goes left
This command is based on process_delayed_emails_from_sqs, but is designed to run until interrupted (or for a maximum duration) rather than until the queue is empty. The command logs to eventsinfo.process_emails_from_sqs: * At the start and end of execution (INFO) * For each polling loop (INFO if messages, DEBUG if none) * For each processed message (INFO if failed, DEBUG if success) * For AWS errors (ERROR) Adds two new environment variables: * AWS_SQS_EMAIL_QUEUE_URL - The SQS queue for incoming emails * AWS_SQS_EMAIL_DLQ_URL - The dead letter queue for that queue Adds three new stats gauges: * email_queue_count - Estimated messages in SQS * email_queue_count_delayed - ...that are not yet availabled * email_queue_count_not_visible - ...that are reserved by a receiver The current SNS workflow uses AWS_SQS_QUEUE_URL as its dead-letter queue, and AWS_SQS_EMAIL_DLQ_URL could have the same value. AWS_SQS_QUEUE_URL can be deprected and dropped after the transition from SNS POSTs to SQS pulls. Adds docs to test these changes in a local developer environment, and tests to cover the new commands.
"./manage.py check_health <path_to_healthcheck.json>" reads the (optional) file written by process_emails_from_sqs, checks if the timestamp is recent (120 seconds by default), and exits with a code of 1 if too old. This can be used as a Kubernetes liveness check of this and other processes without a webserver.
Co-authored-by: luke crouch <[email protected]>
This argument type opens the files as part of command line processing, so errors can be detected before the handle code runs. The code refers to it as self.healthcheck_file after it has been converted to a TextIOWrapper object.
* Use the pytest caplog fixture instead of custom fixture * Move common setup to write_healthcheck function * Un-parametrize tests, making it clearer what happens with the default verbosity levels.
Adjust the test names and docstrings, and add some additional checks.
Split cycle loop into logical parts * poll_queue_for_messages - requests a batch of messages, or times out * process_message_batch - processes the returned messages, if any
f13a1df
to
330c24e
Compare
Force push to pick up requirements merges on main, and then:
|
This PR addresses Jira task MPP 1802, "Refactor email processing". It adds a management tasks that polls a queue (provided by AWS Simple Queue Service, or SQS) and processes the messages. The queue is populated by the Simple Notification Service (SNS), and is an alternate processing path to the
POST
s to/emails/sns_inbound
. Our hope is that processing emails this way will keep the website responsive during emails backlogs, such as ones around production pushes, and be a better fit for larger emails and more emails processing.This adds two management commands,
process_emails_from_sqs
andcheck_health
./manage.py process_emails_from_sqs
is heavily based on./manage.py process_delayed_emails_from_sqs
, but has been refactored for testability, monitoring, logging, and to run without stopping. The messages themselves appear identical to those placed on the dead-letter queue (DLQ), so this command could be used in place ofprocess_delayed_emails_from_sqs
.The command can periodically write JSON to a file, that includes a timestamp. This can be checked with
./manage.py check_health [path_to_file]
, which returns a non-zero exit code if the file is too old. This can be used as a liveness probe in Kubernetes, to restart the process if it gets stuck.This includes new settings:
AWS_SQS_EMAIL_QUEUE_URL
- The URL of the SQS queue with the email messagesAWS_SQS_EMAIL_DLQ_URL
The "dead-letter queue" (DLQ) for the SQS queue, when a message is retrieved but not deleted too many times. This is similar toAWS_SQS_QUEUE_URL
, the DLQ for the SNSPOST
process, and the same queue can be used.This includes new gauges:
email_queue_count
- the approximate number of messages in the queueemail_queue_delayed
- the approximate number of delayed messages in the queue, newly added but not available to receiveemail_queue_count_not_visible
- the approximate number of non-visible messages in the queue, recently sent to a receiverThese are tagged with
queue:name
, wherename
is taken from the last component of the SQS URL. This will allow us to use the same metric for the primary and the DLQ queues.This includes two requirements changes:
codetiming
, which is the package of the tutorial Python Timer Functions: Three Ways to Monitor Your Codemarkus
to 4.0.0, to get theassert_gauge
method and support Python 3.9 and 3.10.This includes new documentation for setting up the SQS polling in the local development environment.
This PR includes tests for the new management commands, but mocks out any processing (
_sns_inbound_logic
) which is tested elsewhere.How to test:
./manage.py process_emails_from_sqs