Skip to content

Commit

Permalink
remove logic to delete a file on task completion (#5855)
Browse files Browse the repository at this point in the history
  • Loading branch information
chopkinsmade authored Dec 12, 2024
1 parent 7b5fc1a commit 60b3563
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 65 deletions.
36 changes: 16 additions & 20 deletions datahub/company/tasks/contact.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,18 @@ def _list_objects(self, client, bucket_name, prefix):
Bucket=bucket_name,
Prefix=prefix,
)
# Get the list of files, oldest first. Process in that order, so any changes in newer
# files take precedence
# Get the list of files, ordered by LastModified descending.
sorted_files = sorted(
[object for object in response.get('Contents', {})],
key=lambda x: x['LastModified'],
reverse=False,
reverse=True,
)
return [file['Key'] for file in sorted_files]

def _get_most_recent_object(self, client, bucket_name, prefix):
files_in_bucket = self._list_objects(client, bucket_name, prefix)
return files_in_bucket[0] if len(files_in_bucket) > 0 else None

def _log_at_interval(self, index: int, message: str):
"""
Log in a way that is suitable for both small and large datasets. Initially
Expand All @@ -194,25 +197,23 @@ def _log_at_interval(self, index: int, message: str):
def ingest(self):
logger.info('Checking for new contact consent data files')
s3_client = get_s3_client(REGION)
file_keys = self._list_objects(s3_client, BUCKET, CONSENT_PREFIX)
if len(file_keys) == 0:
file_key = self._get_most_recent_object(s3_client, BUCKET, CONSENT_PREFIX)
if not file_key:
logger.info(
'No contact consent files found in bucket %s matching prefix %s',
BUCKET,
CONSENT_PREFIX,
)
return

for file_key in file_keys:
try:
self.sync_file_with_database(s3_client, file_key)
self.delete_file(s3_client, file_key)
except Exception as exc:
logger.exception(
f'Error ingesting contact consent file {file_key}',
stack_info=True,
)
raise exc
try:
self.sync_file_with_database(s3_client, file_key)
except Exception as exc:
logger.exception(
f'Error ingesting contact consent file {file_key}',
stack_info=True,
)
raise exc

def get_grouped_contacts(self) -> dict[str, List[Contact]]:
contacts_qs = Contact.objects.all()
Expand Down Expand Up @@ -319,8 +320,3 @@ def sync_file_with_database(self, client, file_key):
i,
path,
)

def delete_file(self, client, file_key):
logger.info('Deleting contact consent file %s', file_key)
client.delete_object(Bucket=BUCKET, Key=file_key)
logger.info('Successfully deleted contact consent file %s', file_key)
60 changes: 15 additions & 45 deletions datahub/company/test/tasks/test_contact_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,17 @@ def test_files():
@mock_aws
def setup_s3_bucket(bucket_name, test_files):
mock_s3_client = _create_bucket(bucket_name)

last_modfied = datetime.datetime.now()
for file in test_files:
mock_s3_client.put_object(Bucket=bucket_name, Key=file, Body=json.dumps('Test contents'))
# use freeze_time to allow uploaded files to have a different LastModified date
with freeze_time(last_modfied):
mock_s3_client.put_object(
Bucket=bucket_name,
Key=file,
Body=json.dumps('Test contents'),
)
last_modfied = last_modfied + datetime.timedelta(seconds=3)


def _create_bucket(bucket_name):
Expand Down Expand Up @@ -503,7 +512,7 @@ def test_ingest_with_exception_logs_error_and_reraises_original_exception(self,
task.ingest()

@mock_aws
def test_ingest_with_empty_s3_bucket_does_not_call_sync_or_delete(self):
def test_ingest_with_empty_s3_bucket_does_not_call_sync(self):
"""
Test that the task can handle an empty S3 bucket
"""
Expand All @@ -512,15 +521,13 @@ def test_ingest_with_empty_s3_bucket_does_not_call_sync_or_delete(self):
with mock.patch.multiple(
task,
sync_file_with_database=mock.DEFAULT,
delete_file=mock.DEFAULT,
):
task.ingest()
task.sync_file_with_database.assert_not_called()
task.delete_file.assert_not_called()

@mock_aws
@override_settings(S3_LOCAL_ENDPOINT_URL=None)
def test_ingest_calls_sync_with_correct_files_order(self, test_files):
def test_ingest_calls_sync_with_newest_file_order(self, test_files):
"""
Test that the ingest calls the sync with the files in correct order
"""
Expand All @@ -529,32 +536,11 @@ def test_ingest_calls_sync_with_correct_files_order(self, test_files):
with mock.patch.multiple(
task,
sync_file_with_database=mock.DEFAULT,
delete_file=mock.DEFAULT,
):
task.ingest()
task.sync_file_with_database.assert_has_calls(
[mock.call(mock.ANY, file) for file in test_files],
)

@mock_aws
@override_settings(S3_LOCAL_ENDPOINT_URL=None)
def test_ingest_calls_delete_for_all_files(
self,
test_files,
):
"""
Test that the ingest calls delete with the files in correct order
"""
setup_s3_bucket(BUCKET, test_files)
task = ContactConsentIngestionTask()
with mock.patch.multiple(
task,
sync_file_with_database=mock.DEFAULT,
delete_file=mock.DEFAULT,
):
task.ingest()
task.delete_file.assert_has_calls(
[mock.call(mock.ANY, file) for file in test_files],
task.sync_file_with_database.assert_called_once_with(
mock.ANY,
test_files[-1],
)

@mock_aws
Expand Down Expand Up @@ -884,19 +870,3 @@ def test_should_update_contact_with_row_date_newer_than_contact_date_should_retu
contact,
row,
)

@mock_aws
def test_delete_file_removes_file_using_boto3(self):
"""
Test that the file is deleted from the bucket
"""
filename = f'{CONSENT_PREFIX}file_{uuid.uuid4()}.jsonl'
upload_file_to_s3(BUCKET, filename, 'test')
client = boto3.client('s3', REGION)

ContactConsentIngestionTask().delete_file(client, filename)
with pytest.raises(client.exceptions.NoSuchKey):
client.get_object(
Bucket=BUCKET,
Key=filename,
)

0 comments on commit 60b3563

Please sign in to comment.