Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPP-3932: Add flag 'developer_mode', use to simulate complaint and log notifications #5090

Merged
merged 19 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 166 additions & 46 deletions emails/tests/views_tests.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import base64
import glob
import json
import logging
import os
import re
import zlib
from copy import deepcopy
from datetime import UTC, datetime, timedelta
from email import message_from_string
Expand Down Expand Up @@ -176,11 +178,16 @@ def create_notification_from_email(email_text: str) -> AWS_SNSMessageJSON:
message_id = None
# This function cannot handle malformed To: addresses
assert not getattr(email["To"], "defects")
email_date = (
getattr(email["Date"], "datetime")
if "Date" in email
else (datetime.now() - timedelta(minutes=5))
)

sns_message = {
"notificationType": "Received",
"mail": {
"timestamp": getattr(email["Date"], "datetime").isoformat(),
"timestamp": email_date.isoformat(),
# To handle invalid From address, find 'first' address with what looks like
# an email portion and use that email, or fallback to [email protected]
"source": next(
Expand Down Expand Up @@ -209,9 +216,7 @@ def create_notification_from_email(email_text: str) -> AWS_SNSMessageJSON:
},
},
"receipt": {
"timestamp": (
getattr(email["Date"], "datetime") + timedelta(seconds=1)
).isoformat(),
"timestamp": (email_date + timedelta(seconds=1)).isoformat(),
"processingTimeMillis": 1001,
"recipients": [
addr.addr_spec for addr in getattr(email["To"], "addresses")
Expand All @@ -232,9 +237,7 @@ def create_notification_from_email(email_text: str) -> AWS_SNSMessageJSON:
"TopicArn": topic_arn,
"Subject": str(getattr(email["Subject"], "as_unstructured")),
"Message": json.dumps(sns_message),
"Timestamp": (
getattr(email["Date"], "datetime") + timedelta(seconds=2)
).isoformat(),
"Timestamp": (email_date + timedelta(seconds=2)).isoformat(),
"SignatureVersion": "1",
"Signature": "invalid-signature",
"SigningCertURL": f"{base_url}/SimpleNotificationService-abcd1234.pem",
Expand All @@ -246,7 +249,10 @@ def create_notification_from_email(email_text: str) -> AWS_SNSMessageJSON:


def assert_email_equals_fixture(
output_email: str, fixture_name: str, replace_mime_boundaries: bool = False
output_email: str,
fixture_name: str,
replace_mime_boundaries: bool = False,
fixture_replace: tuple[str, str] | None = None,
) -> None:
"""
Assert the output equals the expected email, after optional replacements.
Expand All @@ -267,6 +273,12 @@ def assert_email_equals_fixture(
else:
test_output_email = output_email

# If requested, replace a string
if fixture_replace:
orig_str, new_str = fixture_replace
expected = expected.replace(orig_str, new_str)
fixture_name += "_MODIFIED"

if test_output_email != expected: # pragma: no cover
# Write the actual output as an aid for debugging or fixture updates
path = os.path.join(real_abs_cwd, "fixtures", fixture_name + "_actual.email")
Expand Down Expand Up @@ -361,6 +373,7 @@ def setUp(self) -> None:
def check_sent_email_matches_fixture(
self,
fixture_name: str,
fixture_replace: tuple[str, str] | None = None,
replace_mime_boundaries: bool = False,
expected_source: str | None = None,
expected_destination: str | None = None,
Expand All @@ -384,7 +397,9 @@ def check_sent_email_matches_fixture(
assert source == expected_source
if expected_destination is not None:
assert destinations[0] == expected_destination
assert_email_equals_fixture(raw_message, fixture_name, replace_mime_boundaries)
assert_email_equals_fixture(
raw_message, fixture_name, replace_mime_boundaries, fixture_replace
)


class SNSNotificationIncomingTest(SNSNotificationTestBase):
Expand Down Expand Up @@ -764,6 +779,60 @@ def test_header_with_encoded_trailing_newline_is_forwarded(
extra={"issues": {"headers": expected_header_errors}},
)

@override_flag("developer_mode", active=True)
@patch("emails.views.info_logger")
def test_developer_mode_no_label(self, mock_logger: Mock) -> None:
"""Developer mode does nothing special without mask label"""
_sns_notification(EMAIL_SNS_BODIES["single_recipient"])
self.check_sent_email_matches_fixture(
"single_recipient",
expected_source="[email protected]",
expected_destination="[email protected]",
)
self.ra.refresh_from_db()
assert self.ra.num_forwarded == 1
assert self.ra.last_used_at is not None
mock_logger.info.assert_not_called()

@override_flag("developer_mode", active=True)
@patch("emails.views.info_logger")
def test_developer_mode_simulate_complaint(self, mock_logger: Mock) -> None:
"""Developer mode with 'DEV:simulate_complaint' label sends to simulator"""
self.ra.description = "test123 DEV:simulate_complaint"
self.ra.save()

_sns_notification(EMAIL_SNS_BODIES["single_recipient"])
self.check_sent_email_matches_fixture(
"single_recipient",
expected_source="[email protected]",
expected_destination="[email protected]",
fixture_replace=(
"To: [email protected]",
"To: [email protected]",
),
)
self.ra.refresh_from_db()
assert self.ra.num_forwarded == 1
assert self.ra.last_used_at is not None
parts = ["", "", "", ""]
for callnum, call in enumerate(mock_logger.info.mock_calls):
assert call.args == ("_handle_received: developer_mode",)
extra = call.kwargs["extra"]
assert extra["mask_id"] == self.ra.metrics_id
assert extra["dev_action"] == "simulate_complaint"
assert extra["part"] == callnum
assert extra["parts"] == 4
parts[extra["part"]] = extra["notification_gza85"]
log_notification = json.loads(
zlib.decompress(
base64.a85decode(("\n".join(parts)).encode("ascii"))
).decode()
)
expected_log_notification = json.loads(
EMAIL_SNS_BODIES["single_recipient"]["Message"]
)
assert log_notification == expected_log_notification


class SNSNotificationRepliesTest(SNSNotificationTestBase):
"""Tests for _sns_notification for replies from Relay users"""
Expand Down Expand Up @@ -1068,6 +1137,15 @@ def setUp(self):
self.sa: SocialAccount = baker.make(
SocialAccount, user=self.user, provider="fxa", uid=str(uuid4())
)
self.ra = baker.make(
RelayAddress, user=self.user, address="ebsbdsan7", domain=2
)

russian_spam_notification = create_notification_from_email(
EMAIL_EXPECTED["russian_spam"]
)
spam_mail_content = json.loads(russian_spam_notification["Message"])["mail"]

complaint = {
"notificationType": "Complaint",
"complaint": {
Expand All @@ -1080,6 +1158,7 @@ def setUp(self):
"000001378603177f-18c07c78-fa81-4a58-9dd1-fedc3cb8f49a-000000"
),
},
"mail": spam_mail_content,
}
self.complaint_body = {"Message": json.dumps(complaint)}
ses_client_patcher = patch(
Expand All @@ -1106,6 +1185,10 @@ def test_notification_type_complaint(self):
self.user.profile.refresh_from_db()
assert self.user.profile.auto_block_spam is True

self.ra.refresh_from_db()
assert self.ra.enabled
self.mock_ses_client.send_raw_email.assert_not_called()

mm.assert_incr_once(
"fx.private.relay.email_complaint",
tags=[
Expand Down Expand Up @@ -1136,6 +1219,13 @@ def test_complaint_log_with_optout(self) -> None:
with self.assertLogs(INFO_LOG) as logs:
_sns_notification(self.complaint_body)

self.user.profile.refresh_from_db()
assert self.user.profile.auto_block_spam is True

self.ra.refresh_from_db()
assert self.ra.enabled
self.mock_ses_client.send_raw_email.assert_not_called()

log_data = log_extra(logs.records[0])
assert log_data["user_match"] == "found"
assert not log_data["fxa_id"]
Expand All @@ -1147,49 +1237,79 @@ def test_complaint_disables_mask(self):
1. sets enabled=False on the mask, and
2. returns 200.
"""
self.ra = baker.make(
RelayAddress, user=self.user, address="ebsbdsan7", domain=2
)

# The top-level JSON object for complaints includes a "mail" field
# which contains information about the original mail to which the notification
# pertains. So, add a "mail" field with content from our russian_spam fixture
russian_spam_notification = create_notification_from_email(
EMAIL_INCOMING["russian_spam"]
)
spam_mail_content = json.loads(
russian_spam_notification.get("Message", "")
).get("mail", {})
spam_mail_content["source"] = (
f"[email protected] [via Relay] <{self.ra.full_address}>"
)
complaint_body_message = json.loads(self.complaint_body["Message"])
complaint_body_message["mail"] = spam_mail_content
complaint_body_with_spam_mail = {"Message": json.dumps(complaint_body_message)}
assert self.ra.enabled is True

response = _sns_notification(complaint_body_with_spam_mail)
with self.assertLogs(INFO_LOG) as logs, MetricsMock() as mm:
response = _sns_notification(self.complaint_body)
assert response.status_code == 200

self.ra.refresh_from_db()
source = self.mock_ses_client.send_raw_email.call_args.kwargs["Source"]
destinations = self.mock_ses_client.send_raw_email.call_args.kwargs[
"Destinations"
]
raw_message = self.mock_ses_client.send_raw_email.call_args.kwargs["RawMessage"]
data_without_newlines = raw_message["Data"].replace("\n", "")
self.user.profile.refresh_from_db()
assert self.user.profile.auto_block_spam is True

self.ra.refresh_from_db()
assert self.ra.enabled is False

self.mock_ses_client.send_raw_email.assert_called_once()
assert source == settings.RELAY_FROM_ADDRESS
assert destinations == [self.ra.user.email]
assert "To prevent further spam" in data_without_newlines
assert self.ra.full_address in data_without_newlines
call = self.mock_ses_client.send_raw_email.call_args
assert call.kwargs["Source"] == settings.RELAY_FROM_ADDRESS
assert call.kwargs["Destinations"] == [self.user.email]
msg_without_newlines = call.kwargs["RawMessage"]["Data"].replace("\n", "")
assert "To prevent further spam" in msg_without_newlines
assert self.ra.full_address in msg_without_newlines

# re-enable the mask for other tests
self.ra.enabled = True
self.ra.save()
self.ra.refresh_from_db()
mm.assert_incr_once(
"fx.private.relay.email_complaint",
tags=[
"complaint_subtype:none",
"complaint_feedback:abuse",
"user_match:found",
"relay_action:auto_block_spam",
],
)
assert len(logs.records) == 1
record = logs.records[0]
assert record.msg == "complaint_notification"
log_data = log_extra(record)
assert log_data == {
"complaint_feedback": "abuse",
"complaint_subtype": None,
"complaint_user_agent": "ExampleCorp Feedback Loop (V0.01)",
"domain": "test.com",
"relay_action": "auto_block_spam",
"user_match": "found",
"fxa_id": self.sa.uid,
}

@override_flag("developer_mode", active=True)
def test_complaint_mpp_3932(self):
"""MPP-3932: Log notification for all complaints for developer_mode users."""
with self.assertLogs(INFO_LOG) as logs:
response = _sns_notification(self.complaint_body)
assert response.status_code == 200

self.user.profile.refresh_from_db()
assert self.user.profile.auto_block_spam is True
self.mock_ses_client.send_raw_email.assert_not_called()

assert len(logs.records) == 2
record_mpp_3932 = logs.records[0]
assert record_mpp_3932.msg == "_handle_complaint: MPP-3932"
assert getattr(record_mpp_3932, "mask_id") == "unknown"
assert getattr(record_mpp_3932, "dev_action") == "log"
assert getattr(record_mpp_3932, "parts") == 1
assert getattr(record_mpp_3932, "part") == 0
log_complaint = json.loads(
zlib.decompress(
base64.a85decode(
getattr(record_mpp_3932, "notification_gza85").encode("ascii")
)
).decode()
)
expected_log_complaint = json.loads(self.complaint_body["Message"])
assert log_complaint == expected_log_complaint

record = logs.records[1]
assert record.msg == "complaint_notification"

def test_build_disabled_mask_for_spam_email(self):
free_user = make_free_test_user("[email protected]")
Expand Down Expand Up @@ -1877,7 +1997,7 @@ def test_unknown_domain_address_is_created(self) -> None:
An unknown but valid domain address is created.

This supports creating domain addresses on third-party sites, when
emailing a checkout reciept, or other situations when the email
emailing a checkout receipt, or other situations when the email
cannot be pre-created.
"""
assert DomainAddress.objects.filter(user=self.user).count() == 1
Expand Down Expand Up @@ -1910,7 +2030,7 @@ def test_uppercase_local_part_of_unknown_domain_address(self) -> None:
This creates a new domain address with lower-cased letters. It supports
creating domain addresses by third-parties that would not be allowed
on the relay dashboard due to the upper-case characters, but are still
consistent with dashboard-created domain adddresses.
consistent with dashboard-created domain addresses.
"""
assert DomainAddress.objects.filter(user=self.user).count() == 1
with self.assertLogs(GLEAN_LOG, "INFO") as caplog:
Expand Down
Loading