From 60b35636a53cf3715c239c680a4eca4917e9e4bf Mon Sep 17 00:00:00 2001 From: Chris Hopkins <102232401+chopkinsmade@users.noreply.github.com> Date: Thu, 12 Dec 2024 11:10:42 +0000 Subject: [PATCH] remove logic to delete a file on task completion (#5855) --- datahub/company/tasks/contact.py | 36 +++++------ .../company/test/tasks/test_contact_task.py | 60 +++++-------------- 2 files changed, 31 insertions(+), 65 deletions(-) diff --git a/datahub/company/tasks/contact.py b/datahub/company/tasks/contact.py index f8a60d24b..61a7f19ce 100644 --- a/datahub/company/tasks/contact.py +++ b/datahub/company/tasks/contact.py @@ -173,15 +173,18 @@ def _list_objects(self, client, bucket_name, prefix): Bucket=bucket_name, Prefix=prefix, ) - # Get the list of files, oldest first. Process in that order, so any changes in newer - # files take precedence + # Get the list of files, ordered by LastModified descending. sorted_files = sorted( [object for object in response.get('Contents', {})], key=lambda x: x['LastModified'], - reverse=False, + reverse=True, ) return [file['Key'] for file in sorted_files] + def _get_most_recent_object(self, client, bucket_name, prefix): + files_in_bucket = self._list_objects(client, bucket_name, prefix) + return files_in_bucket[0] if len(files_in_bucket) > 0 else None + def _log_at_interval(self, index: int, message: str): """ Log in a way that is suitable for both small and large datasets. Initially @@ -194,8 +197,8 @@ def _log_at_interval(self, index: int, message: str): def ingest(self): logger.info('Checking for new contact consent data files') s3_client = get_s3_client(REGION) - file_keys = self._list_objects(s3_client, BUCKET, CONSENT_PREFIX) - if len(file_keys) == 0: + file_key = self._get_most_recent_object(s3_client, BUCKET, CONSENT_PREFIX) + if not file_key: logger.info( 'No contact consent files found in bucket %s matching prefix %s', BUCKET, @@ -203,16 +206,14 @@ def ingest(self): ) return - for file_key in file_keys: - try: - self.sync_file_with_database(s3_client, file_key) - self.delete_file(s3_client, file_key) - except Exception as exc: - logger.exception( - f'Error ingesting contact consent file {file_key}', - stack_info=True, - ) - raise exc + try: + self.sync_file_with_database(s3_client, file_key) + except Exception as exc: + logger.exception( + f'Error ingesting contact consent file {file_key}', + stack_info=True, + ) + raise exc def get_grouped_contacts(self) -> dict[str, List[Contact]]: contacts_qs = Contact.objects.all() @@ -319,8 +320,3 @@ def sync_file_with_database(self, client, file_key): i, path, ) - - def delete_file(self, client, file_key): - logger.info('Deleting contact consent file %s', file_key) - client.delete_object(Bucket=BUCKET, Key=file_key) - logger.info('Successfully deleted contact consent file %s', file_key) diff --git a/datahub/company/test/tasks/test_contact_task.py b/datahub/company/test/tasks/test_contact_task.py index 086fbeee2..df086e638 100644 --- a/datahub/company/test/tasks/test_contact_task.py +++ b/datahub/company/test/tasks/test_contact_task.py @@ -428,8 +428,17 @@ def test_files(): @mock_aws def setup_s3_bucket(bucket_name, test_files): mock_s3_client = _create_bucket(bucket_name) + + last_modfied = datetime.datetime.now() for file in test_files: - mock_s3_client.put_object(Bucket=bucket_name, Key=file, Body=json.dumps('Test contents')) + # use freeze_time to allow uploaded files to have a different LastModified date + with freeze_time(last_modfied): + mock_s3_client.put_object( + Bucket=bucket_name, + Key=file, + Body=json.dumps('Test contents'), + ) + last_modfied = last_modfied + datetime.timedelta(seconds=3) def _create_bucket(bucket_name): @@ -503,7 +512,7 @@ def test_ingest_with_exception_logs_error_and_reraises_original_exception(self, task.ingest() @mock_aws - def test_ingest_with_empty_s3_bucket_does_not_call_sync_or_delete(self): + def test_ingest_with_empty_s3_bucket_does_not_call_sync(self): """ Test that the task can handle an empty S3 bucket """ @@ -512,15 +521,13 @@ def test_ingest_with_empty_s3_bucket_does_not_call_sync_or_delete(self): with mock.patch.multiple( task, sync_file_with_database=mock.DEFAULT, - delete_file=mock.DEFAULT, ): task.ingest() task.sync_file_with_database.assert_not_called() - task.delete_file.assert_not_called() @mock_aws @override_settings(S3_LOCAL_ENDPOINT_URL=None) - def test_ingest_calls_sync_with_correct_files_order(self, test_files): + def test_ingest_calls_sync_with_newest_file_order(self, test_files): """ Test that the ingest calls the sync with the files in correct order """ @@ -529,32 +536,11 @@ def test_ingest_calls_sync_with_correct_files_order(self, test_files): with mock.patch.multiple( task, sync_file_with_database=mock.DEFAULT, - delete_file=mock.DEFAULT, ): task.ingest() - task.sync_file_with_database.assert_has_calls( - [mock.call(mock.ANY, file) for file in test_files], - ) - - @mock_aws - @override_settings(S3_LOCAL_ENDPOINT_URL=None) - def test_ingest_calls_delete_for_all_files( - self, - test_files, - ): - """ - Test that the ingest calls delete with the files in correct order - """ - setup_s3_bucket(BUCKET, test_files) - task = ContactConsentIngestionTask() - with mock.patch.multiple( - task, - sync_file_with_database=mock.DEFAULT, - delete_file=mock.DEFAULT, - ): - task.ingest() - task.delete_file.assert_has_calls( - [mock.call(mock.ANY, file) for file in test_files], + task.sync_file_with_database.assert_called_once_with( + mock.ANY, + test_files[-1], ) @mock_aws @@ -884,19 +870,3 @@ def test_should_update_contact_with_row_date_newer_than_contact_date_should_retu contact, row, ) - - @mock_aws - def test_delete_file_removes_file_using_boto3(self): - """ - Test that the file is deleted from the bucket - """ - filename = f'{CONSENT_PREFIX}file_{uuid.uuid4()}.jsonl' - upload_file_to_s3(BUCKET, filename, 'test') - client = boto3.client('s3', REGION) - - ContactConsentIngestionTask().delete_file(client, filename) - with pytest.raises(client.exceptions.NoSuchKey): - client.get_object( - Bucket=BUCKET, - Key=filename, - )