Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ./manage.py process_emails_from_sqs to continously process messages from SQS #1768

Merged
merged 21 commits into from
Apr 15, 2022

Conversation

jwhitlock
Copy link
Member

@jwhitlock jwhitlock commented Apr 11, 2022

This PR addresses Jira task MPP 1802, "Refactor email processing". It adds a management tasks that polls a queue (provided by AWS Simple Queue Service, or SQS) and processes the messages. The queue is populated by the Simple Notification Service (SNS), and is an alternate processing path to the POSTs to /emails/sns_inbound. Our hope is that processing emails this way will keep the website responsive during emails backlogs, such as ones around production pushes, and be a better fit for larger emails and more emails processing.

This adds two management commands, process_emails_from_sqs and check_health

./manage.py process_emails_from_sqs is heavily based on ./manage.py process_delayed_emails_from_sqs, but has been refactored for testability, monitoring, logging, and to run without stopping. The messages themselves appear identical to those placed on the dead-letter queue (DLQ), so this command could be used in place of process_delayed_emails_from_sqs.

The command can periodically write JSON to a file, that includes a timestamp. This can be checked with ./manage.py check_health [path_to_file], which returns a non-zero exit code if the file is too old. This can be used as a liveness probe in Kubernetes, to restart the process if it gets stuck.

This includes new settings:

  • AWS_SQS_EMAIL_QUEUE_URL - The URL of the SQS queue with the email messages
  • AWS_SQS_EMAIL_DLQ_URL The "dead-letter queue" (DLQ) for the SQS queue, when a message is retrieved but not deleted too many times. This is similar to AWS_SQS_QUEUE_URL, the DLQ for the SNS POST process, and the same queue can be used.

This includes new gauges:

  • email_queue_count - the approximate number of messages in the queue
  • email_queue_delayed - the approximate number of delayed messages in the queue, newly added but not available to receive
  • email_queue_count_not_visible - the approximate number of non-visible messages in the queue, recently sent to a receiver

These are tagged with queue:name, where name is taken from the last component of the SQS URL. This will allow us to use the same metric for the primary and the DLQ queues.

This includes two requirements changes:

This includes new documentation for setting up the SQS polling in the local development environment.

This PR includes tests for the new management commands, but mocks out any processing (_sns_inbound_logic) which is tested elsewhere.

How to test:

  • Follow the existing end-to-end local development documentation to get email delivery working in your development environment. It may be necessary to get S3 storage of emails working as well (same docs).
  • Follow the new steps in this PR to convert to SES → SNS → SQS, which includes updating settings and running ./manage.py process_emails_from_sqs

@netlify
Copy link

netlify bot commented Apr 11, 2022

Deploy Preview for fx-relay-demo canceled.

Name Link
🔨 Latest commit 90e92c1
🔍 Latest deploy log https://app.netlify.com/sites/fx-relay-demo/deploys/6259d4091f27ae0009cf7433

@jwhitlock
Copy link
Member Author

After writing the PR message, I realized I should tag the new gauge metrics, and added a commit to do that. I also added a section about the requirements changes.

This is a large PR! Maybe too large! If I need to break it up, I could potentially refactor process_delayed_emails_from_sqs over several PRs, to get to the same functionality. Other ideas are welcome.

Copy link
Member

@groovecoder groovecoder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whew, looking good!

Something I'm still not super clear about from the code or comments is the "cycle" vs. "loop" concepts? It seems like the "cycle" is the cycle of long polling, and the "loop" is the loop over queue messages?

.env-dist Show resolved Hide resolved
emails/management/commands/check_health.py Show resolved Hide resolved
emails/management/commands/check_health.py Outdated Show resolved Hide resolved
emails/management/commands/check_health.py Outdated Show resolved Hide resolved
emails/tests/mgmt_check_health_tests.py Outdated Show resolved Hide resolved
emails/management/commands/process_emails_from_sqs.py Outdated Show resolved Hide resolved
emails/management/commands/process_emails_from_sqs.py Outdated Show resolved Hide resolved
emails/tests/mgmt_process_emails_from_sqs_tests.py Outdated Show resolved Hide resolved
Comment on lines 200 to 208
temp_error = ClientError(
{
"Error": {
"Message": "Maximum sending rate exceeded.",
"Code": "ThrottlingException",
}
},
"",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (non-blocking): in pytest, could this be made into its own fixture that is requested in each of these 2 tests that needs it? to eliminate the code duplication?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a function make_client_error to help construct the three ClientErrors tested, and switched the second one to test AccountSendingPausedException, which I'm guessing is the other temporary exception that has the phrase 'pause'

And while I'm writing this, I see one more ClientError that could use make_client_error...

Comment on lines 258 to 257
def test_command_golden_path(mock_boto3_queue_constructor):
"""The command runs successfully until completion."""
mock_boto3_queue_constructor.return_value = fake_queue()
call_command("process_emails_from_sqs", "--max-seconds=4")
mock_boto3_queue_constructor.assert_called_once_with(
"https://sqs.us-east-2.amazonaws.example.com/111222333/queue-name"
)
assert mock_boto3_queue_constructor._mock_region == "us-east-2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (non-blocking): How about a super_golden_path with lots of messages that all succeed, and a dumpster_fire_path with lots of messages that all fail for different reasons?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I don't like "a list of a lot of things" tests. It is unclear what code each individual message is testing, so you get code coverage without a clear example of how to trigger the code path. I find them to be harder to refactor, and often get uglier as time goes by. When they break, I often have to manually break up the list and run several times to determine which one broke, and how to fix them.

When a test really calls for several similar but slightly different inputs, I prefer pytest.mark.parametrize, so that each input is in a different test run. In many cases, this results in if/then clauses in the tests, which are another kind of test smell that I eliminated in the check_health tests.

The purpose of this test and the next is to get coverage of the argument parsing and the handle code. Most of the tests bypass these by directly calling the main_loop. Email processing is completely mocked out, and the tests for that are elsewhere, or need to be written.

Staring at this test, I feel that the test is poorly named. It runs for 4 (fake) seconds and processes no messages at all. I was expecting more cases when I named it, but it turned out there was just two branches to test. A rename is in order, something like test_command_setup_success

@jwhitlock
Copy link
Member Author

Thanks for the feedback @groovecoder! I've fixed some small things, I'll pick up the larger ones or the ones I need to think about in the morning.

There's two loops:

until cancelled:
    request N messages
    for each message:
        process message, and retry for some errors

There's a lot in the details (timing, try/except blocks, etc), but maybe there's a structure and names that could make more sense. It also gets easier if we only request one message at a time - I'm not sure if there is a benefit to getting multiple at a time, like the existing DLQ does.

@jwhitlock
Copy link
Member Author

Several changes from the previous review round:

In docs/end-to-end-local-dev.md:

  • Documentation fix, "was are switching" to "we are switching"

In emails/management/commands/check_health.py:

  • Switch healthcheck_path parameter from string to argparse.FileType. There is no change for callers, but a few code changes to expect an open file instead of a string.

In emails/management/commands/process_emails_from_sqs.py:

  • Switch --healthcheck_path from string to argparse.FileType as well
  • Clarify that --sqs-url gets default from settings.AWS_SQS_EMAIL_QUEUE_URL
  • Rename method main_loop() to process_queue()
  • Move queue backlog metrics to method refresh_and_emit_queue_count_metrics
  • Split method cycle_loop` into poll_queue_for_messages()andprocess_message_batch``
  • Remove json_body from logs when SNS verification fails
  • Remove bad metric increment

In emails/tests/mgmt_check_health_tests.py:

  • Migrate from custom fixture mock_logger to pytest fixture caplop
  • Move common healthcheck creation code to write_healthcheck
  • Split 2 tests with two variants into 4 simple tests

In emails/tests/mgmt_process_emails_from_sqs_tests.py:

  • Removed unused import
  • Add helper function make_client_error to help make botocore.exceptions.ClientError instances
  • Rename tests and adjust docstrings for rename of main_loop to process_queue
  • Add assertions that mocked methods are called when processing a message
  • Rename test_command_golden_path to test_command_successful_setup

Open discussions:

  • Should the log include items when they are zero (for example, "pause_count": 0 when message processing did not pause for a retryable exception)?
  • Should timers be float seconds (sqs_poll_s=5.001) or integer milliseconds (sqs_poll_ms=5001)?
  • Should I add typing and mypy now or later?

There's some other (non-blocking) discussions, let me know if you want to continue them.

Copy link
Member

@groovecoder groovecoder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! I got it working with my local Relay app by following the new steps in the doc. Woohoo!

Just a couple more things:

suggestion: Let's add the S3 and SQS steps as optional in the "Overview" section?

6. (Optional) Convert to store in S3
7. (Optional) Convert to back-end processing

Comment on lines 364 to 365
AWS<<-Local app: SQS fetch
Local app->>AWS: POST /SendRawEmail
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got a rendering error on GitHub with this?

Error rendering embedded code

Parse error on line 6:
... AWS<<-Local app: SQS fetch Loca
----------------------^
Expecting 'SOLID_OPEN_ARROW', 'DOTTED_OPEN_ARROW', 'SOLID_ARROW', 'DOTTED_ARROW', 'SOLID_CROSS', 'DOTTED_CROSS', 'SOLID_POINT', 'DOTTED_POINT', got 'TXT'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha mermaid can't read a arrow that goes left

jwhitlock and others added 20 commits April 15, 2022 14:58
This command is based on process_delayed_emails_from_sqs, but is
designed to run until interrupted (or for a maximum duration)
rather than until the queue is empty.

The command logs to eventsinfo.process_emails_from_sqs:
* At the start and end of execution (INFO)
* For each polling loop (INFO if messages, DEBUG if none)
* For each processed message (INFO if failed, DEBUG if success)
* For AWS errors (ERROR)

Adds two new environment variables:
* AWS_SQS_EMAIL_QUEUE_URL - The SQS queue for incoming emails
* AWS_SQS_EMAIL_DLQ_URL - The dead letter queue for that queue

Adds three new stats gauges:
* email_queue_count - Estimated messages in SQS
* email_queue_count_delayed - ...that are not yet availabled
* email_queue_count_not_visible - ...that are reserved by a receiver

The current SNS workflow uses AWS_SQS_QUEUE_URL as its dead-letter
queue, and AWS_SQS_EMAIL_DLQ_URL could have the same value.
AWS_SQS_QUEUE_URL can be deprected and dropped after the transition
from SNS POSTs to SQS pulls.

Adds docs to test these changes in a local developer environment, and
tests to cover the new commands.
"./manage.py check_health <path_to_healthcheck.json>" reads the
(optional) file written by process_emails_from_sqs, checks if the
timestamp is recent (120 seconds by default), and exits with a code of 1
if too old. This can be used as a Kubernetes liveness check of this and
other processes without a webserver.
This argument type opens the files as part of command line processing, so errors
can be detected before the handle code runs. The code refers to it as
self.healthcheck_file after it has been converted to a TextIOWrapper object.
* Use the pytest caplog fixture instead of custom fixture
* Move common setup to write_healthcheck function
* Un-parametrize tests, making it clearer what happens with the default
  verbosity levels.
Adjust the test names and docstrings, and add some additional checks.
Split cycle loop into logical parts
* poll_queue_for_messages - requests a batch of messages, or times out
* process_message_batch - processes the returned messages, if any
@jwhitlock jwhitlock force-pushed the process_email_task_mpp_1802 branch from f13a1df to 330c24e Compare April 15, 2022 20:06
@jwhitlock
Copy link
Member Author

Force push to pick up requirements merges on main, and then:

  • Mark sections to setup S3 and SQS Push as optional
  • Fix SQS push sequence diagram

@jwhitlock jwhitlock requested a review from groovecoder April 15, 2022 20:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants