Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/dont-delete-consent-files-after-processing #5855

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions datahub/company/tasks/contact.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,18 @@ def _list_objects(self, client, bucket_name, prefix):
Bucket=bucket_name,
Prefix=prefix,
)
# Get the list of files, oldest first. Process in that order, so any changes in newer
# files take precedence
# Get the list of files, ordered by LastModified descending.
sorted_files = sorted(
[object for object in response.get('Contents', {})],
key=lambda x: x['LastModified'],
reverse=False,
reverse=True,
)
return [file['Key'] for file in sorted_files]

def _get_most_recent_object(self, client, bucket_name, prefix):
files_in_bucket = self._list_objects(client, bucket_name, prefix)
return files_in_bucket[0] if len(files_in_bucket) > 0 else None

def _log_at_interval(self, index: int, message: str):
"""
Log in a way that is suitable for both small and large datasets. Initially
Expand All @@ -194,25 +197,23 @@ def _log_at_interval(self, index: int, message: str):
def ingest(self):
logger.info('Checking for new contact consent data files')
s3_client = get_s3_client(REGION)
file_keys = self._list_objects(s3_client, BUCKET, CONSENT_PREFIX)
if len(file_keys) == 0:
file_key = self._get_most_recent_object(s3_client, BUCKET, CONSENT_PREFIX)
if not file_key:
logger.info(
'No contact consent files found in bucket %s matching prefix %s',
BUCKET,
CONSENT_PREFIX,
)
return

for file_key in file_keys:
try:
self.sync_file_with_database(s3_client, file_key)
self.delete_file(s3_client, file_key)
except Exception as exc:
logger.exception(
f'Error ingesting contact consent file {file_key}',
stack_info=True,
)
raise exc
try:
self.sync_file_with_database(s3_client, file_key)
except Exception as exc:
logger.exception(
f'Error ingesting contact consent file {file_key}',
stack_info=True,
)
raise exc

def get_grouped_contacts(self) -> dict[str, List[Contact]]:
contacts_qs = Contact.objects.all()
Expand Down Expand Up @@ -319,8 +320,3 @@ def sync_file_with_database(self, client, file_key):
i,
path,
)

def delete_file(self, client, file_key):
logger.info('Deleting contact consent file %s', file_key)
client.delete_object(Bucket=BUCKET, Key=file_key)
logger.info('Successfully deleted contact consent file %s', file_key)
60 changes: 15 additions & 45 deletions datahub/company/test/tasks/test_contact_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,17 @@ def test_files():
@mock_aws
def setup_s3_bucket(bucket_name, test_files):
mock_s3_client = _create_bucket(bucket_name)

last_modfied = datetime.datetime.now()
for file in test_files:
mock_s3_client.put_object(Bucket=bucket_name, Key=file, Body=json.dumps('Test contents'))
# use freeze_time to allow uploaded files to have a different LastModified date
with freeze_time(last_modfied):
mock_s3_client.put_object(
Bucket=bucket_name,
Key=file,
Body=json.dumps('Test contents'),
)
last_modfied = last_modfied + datetime.timedelta(seconds=3)


def _create_bucket(bucket_name):
Expand Down Expand Up @@ -503,7 +512,7 @@ def test_ingest_with_exception_logs_error_and_reraises_original_exception(self,
task.ingest()

@mock_aws
def test_ingest_with_empty_s3_bucket_does_not_call_sync_or_delete(self):
def test_ingest_with_empty_s3_bucket_does_not_call_sync(self):
"""
Test that the task can handle an empty S3 bucket
"""
Expand All @@ -512,15 +521,13 @@ def test_ingest_with_empty_s3_bucket_does_not_call_sync_or_delete(self):
with mock.patch.multiple(
task,
sync_file_with_database=mock.DEFAULT,
delete_file=mock.DEFAULT,
):
task.ingest()
task.sync_file_with_database.assert_not_called()
task.delete_file.assert_not_called()

@mock_aws
@override_settings(S3_LOCAL_ENDPOINT_URL=None)
def test_ingest_calls_sync_with_correct_files_order(self, test_files):
def test_ingest_calls_sync_with_newest_file_order(self, test_files):
"""
Test that the ingest calls the sync with the files in correct order
"""
Expand All @@ -529,32 +536,11 @@ def test_ingest_calls_sync_with_correct_files_order(self, test_files):
with mock.patch.multiple(
task,
sync_file_with_database=mock.DEFAULT,
delete_file=mock.DEFAULT,
):
task.ingest()
task.sync_file_with_database.assert_has_calls(
[mock.call(mock.ANY, file) for file in test_files],
)

@mock_aws
@override_settings(S3_LOCAL_ENDPOINT_URL=None)
def test_ingest_calls_delete_for_all_files(
self,
test_files,
):
"""
Test that the ingest calls delete with the files in correct order
"""
setup_s3_bucket(BUCKET, test_files)
task = ContactConsentIngestionTask()
with mock.patch.multiple(
task,
sync_file_with_database=mock.DEFAULT,
delete_file=mock.DEFAULT,
):
task.ingest()
task.delete_file.assert_has_calls(
[mock.call(mock.ANY, file) for file in test_files],
task.sync_file_with_database.assert_called_once_with(
mock.ANY,
test_files[-1],
)

@mock_aws
Expand Down Expand Up @@ -884,19 +870,3 @@ def test_should_update_contact_with_row_date_newer_than_contact_date_should_retu
contact,
row,
)

@mock_aws
def test_delete_file_removes_file_using_boto3(self):
"""
Test that the file is deleted from the bucket
"""
filename = f'{CONSENT_PREFIX}file_{uuid.uuid4()}.jsonl'
upload_file_to_s3(BUCKET, filename, 'test')
client = boto3.client('s3', REGION)

ContactConsentIngestionTask().delete_file(client, filename)
with pytest.raises(client.exceptions.NoSuchKey):
client.get_object(
Bucket=BUCKET,
Key=filename,
)
Loading