Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/tet 812 ingest stova data #5866

Merged
merged 12 commits into from
Dec 23, 2024
Merged
6 changes: 6 additions & 0 deletions cron-scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from datahub.company.tasks.export_potential import update_company_export_potential_from_csv
from datahub.company_activity.tasks.ingest_company_activity import ingest_activity_data
from datahub.company_activity.tasks.ingest_stova_events import ingest_stova_event_data
from datahub.core.queues.constants import (
EVERY_EIGHT_AM,
EVERY_EIGHT_THIRTY_AM_ON_FIRST_EACH_MONTH,
Expand Down Expand Up @@ -139,6 +140,11 @@ def schedule_jobs():
cron=EVERY_HOUR,
description='Check S3 for new EYB triage data files and schedule ingestion',
)
job_scheduler(
function=ingest_stova_event_data,
cron=EVERY_HOUR,
description='Check S3 for new Stova Event files and schedule ingestion',
)

if settings.ENABLE_ESTIMATED_LAND_DATE_REMINDERS:
job_scheduler(
Expand Down
16 changes: 14 additions & 2 deletions datahub/company_activity/models/stova_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def save(self, *args, **kwargs) -> None:
self.create_or_update_datahub_event()

def create_or_update_datahub_event(self) -> Event:
# Dates are converted from string to datetime after saving, so refresh self otherwise
# dates will still be string.
# Dates are converted from string to datetime after saving, so refresh otherwise dates will
# still be string.
# https://docs.djangoproject.com/en/4.2/ref/models/instances/#what-happens-when-you-save
self.refresh_from_db()
event, _ = Event.objects.update_or_create(
Expand All @@ -82,11 +82,23 @@ def create_or_update_datahub_event(self) -> Event:
'address_county': self.location_state,
'address_postcode': self.location_postcode,
'address_country': get_country_by_country_name(self.country, 'GB'),
'service_id': self.get_stova_event_service_id(),
'notes': self.description,
},
)
return event

@staticmethod
def get_stova_event_service_id() -> str:
"""
The frontend expects a service which we don't get from Stova. This service is created for
ingested stova events called "Stova Event Service". These service are metadata created in
migrations files so we can guarantee it exists before the application is run.

:returns: A id of a `Service` with the name "Stova Event Service".
"""
return 'f6671176-6493-43ba-a92d-899281efcb55'

@staticmethod
def get_or_create_stova_event_type() -> EventType:
"""
Expand Down
8 changes: 6 additions & 2 deletions datahub/company_activity/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from datahub.company_activity.tasks.ingest_great_data import ingest_great_data
from datahub.company_activity.tasks.ingest_stova_events import ingest_stova_data
from datahub.company_activity.tasks.ingest_stova_events import (
ingest_stova_event_data,
stova_ingestion_task,
)

__all__ = (
ingest_great_data,
ingest_stova_data,
ingest_stova_event_data,
stova_ingestion_task,
)
151 changes: 87 additions & 64 deletions datahub/company_activity/tasks/ingest_stova_events.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,101 @@
import json
import logging

from smart_open import open
from django.core.exceptions import ValidationError
from django.db import IntegrityError

from datahub.company_activity.models import StovaEvent
from datahub.company_activity.tasks.constants import STOVA_EVENT_PREFIX
from datahub.ingest.boto3 import S3ObjectProcessor
from datahub.ingest.tasks import BaseObjectIdentificationTask, BaseObjectIngestionTask

from datahub.company_activity.models import IngestedFile, StovaEvent

logger = logging.getLogger(__name__)
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'


def ingest_stova_data(bucket, file):
logger.info(f'Ingesting file: {file} started')
task = StovaEventIngestionTask()
task.ingest(bucket, file)
logger.info(f'Ingesting file: {file} finished')
def ingest_stova_event_data() -> None:
"""Identifies the most recent file to be ingested and schedules a task to ingest it"""
logger.info('Stova event identification task started.')
identification_task = StovaEventIndentificationTask(prefix=STOVA_EVENT_PREFIX)
identification_task.identify_new_objects(stova_ingestion_task)
logger.info('Stova event identification task finished.')


class StovaEventIngestionTask:
def __init__(self):
self._existing_ids = []
def stova_ingestion_task(object_key: str) -> None:
"""Ingest the given key (file) from S3"""
logger.info(f'Stova event ingestion task started for file {object_key}.')
ingestion_task = StovaEventIngestionTask(
object_key=object_key,
s3_processor=S3ObjectProcessor(prefix=STOVA_EVENT_PREFIX),
)
ingestion_task.ingest_object()
logger.info(f'Stova event ingestion task finished for file {object_key}.')


class StovaEventIndentificationTask(BaseObjectIdentificationTask):
pass


class StovaEventIngestionTask(BaseObjectIngestionTask):

existing_ids = []

def _process_record(self, record: dict) -> None:
"""Saves an event from Stova from the S3 bucket into a `StovaEvent`"""
if not self.existing_ids:
self.existing_ids = set(StovaEvent.objects.values_list('stova_event_id', flat=True))

stova_event_id = record.get('id')
if stova_event_id in self.existing_ids:
logger.info(f'Record already exists for stova_event_id: {stova_event_id}')
return

def ingest(self, bucket: str, file: str) -> None:
path = f's3://{bucket}/{file}'
try:
with open(path) as s3_file:
for line in s3_file:
jsn = json.loads(line)
if not self._already_ingested(jsn.get('id')):
self.json_to_model(jsn)
except Exception as e:
raise e
IngestedFile.objects.create(filepath=file)

def _already_ingested(self, id: int) -> bool:
if not self._existing_ids:
self._existing_ids = list(StovaEvent.objects.values_list('stova_event_id', flat=True))
return id in self._existing_ids

@staticmethod
def json_to_model(jsn: dict) -> None:
values = {
'stova_event_id': jsn.get('id'),
'url': jsn.get('url', ''),
'city': jsn.get('city', ''),
'code': jsn.get('code', ''),
'name': jsn.get('name', ''),
'state': jsn.get('state', ''),
'country': jsn.get('country', ''),
'max_reg': jsn.get('max_reg'),
'end_date': jsn.get('end_date'),
'timezone': jsn.get('timezone', ''),
'folder_id': jsn.get('folder_id'),
'live_date': jsn.get('live_date'),
'close_date': jsn.get('close_date'),
'created_by': jsn.get('created_by', ''),
'price_type': jsn.get('price_type', ''),
'start_date': jsn.get('start_date'),
'description': jsn.get('description', ''),
'modified_by': jsn.get('modified_by', ''),
'contact_info': jsn.get('contact_info', ''),
'created_date': jsn.get('created_date'),
'location_city': jsn.get('location_city', ''),
'location_name': jsn.get('location_name', ''),
'modified_date': jsn.get('modified_date'),
'client_contact': jsn.get('client_contact', ''),
'location_state': jsn.get('location_state', ''),
'default_language': jsn.get('default_language', ''),
'location_country': jsn.get('location_country', ''),
'approval_required': jsn.get('approval_required'),
'location_address1': jsn.get('location_address1', ''),
'location_address2': jsn.get('location_address2', ''),
'location_address3': jsn.get('location_address3', ''),
'location_postcode': jsn.get('location_postcode', ''),
'standard_currency': jsn.get('standard_currency', ''),
'stova_event_id': stova_event_id,
'url': record.get('url', ''),
'city': record.get('city', ''),
'code': record.get('code', ''),
'name': record.get('name', ''),
'state': record.get('state', ''),
'country': record.get('country', ''),
'max_reg': record.get('max_reg'),
'end_date': record.get('end_date'),
'timezone': record.get('timezone', ''),
'folder_id': record.get('folder_id'),
'live_date': record.get('live_date'),
'close_date': record.get('close_date'),
'created_by': record.get('created_by'),
'price_type': record.get('price_type', ''),
'start_date': record.get('start_date'),
'description': record.get('description', ''),
'modified_by': record.get('modified_by'),
'contact_info': record.get('contact_info', ''),
'created_date': record.get('created_date'),
'location_city': record.get('location_city', ''),
'location_name': record.get('location_name', ''),
'modified_date': record.get('modified_date'),
'client_contact': record.get('client_contact'),
'location_state': record.get('location_state', ''),
'default_language': record.get('default_language', ''),
'location_country': record.get('location_country', ''),
'approval_required': record.get('approval_required'),
'location_address1': record.get('location_address1', ''),
'location_address2': record.get('location_address2', ''),
'location_address3': record.get('location_address3', ''),
'location_postcode': record.get('location_postcode', ''),
'standard_currency': record.get('standard_currency', ''),
}

StovaEvent.objects.create(**values)
try:
StovaEvent.objects.create(**values)
except IntegrityError as error:
logger.error(
f'Error processing Stova event record, stova_event_id: {stova_event_id}. '
f'Error: {error}',
)
except ValidationError as error:
logger.error(
'Got unexpected value for a field when processing Stova event record, '
f'stova_event_id: {stova_event_id}. '
f'Error: {error}',
)
2 changes: 2 additions & 0 deletions datahub/company_activity/tests/models/test_stova_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def test_save(self):

assert datahub_event.address_country.name == 'United States'

assert str(datahub_event.service_id) == 'f6671176-6493-43ba-a92d-899281efcb55'

stova_event.delete()
assert not Event.objects.all().exists()

Expand Down
Loading
Loading