-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async processing for patient bulk uploads #4722
Changes from 48 commits
f7400ec
f3286fe
4fbccb8
de603aa
5ae43fd
1d307c3
47f5315
4eb912a
0a49bda
3a40e12
772af8c
4686e3e
89f6f5c
739d159
d1ffe32
dcf5189
c5cbaf0
91e10a5
a907285
16a5842
7e04b7a
958c90d
ab2364f
3769a8d
acd9c5c
7abf48e
5c95280
a8f6ce5
3e78e5f
8a83001
c93fa69
88c98a5
71162cd
a8fa5d5
9c4aa47
7228cc2
e4073fb
eaf4a4d
0c843ed
4a1ef26
e550312
11c15c1
3e0bc7e
35fe937
c174101
cc0d1ae
0d47551
03a120e
df2ed6d
79eebd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,75 +1,39 @@ | ||
package gov.cdc.usds.simplereport.service; | ||
|
||
import static gov.cdc.usds.simplereport.api.Translators.parsePersonRole; | ||
import static gov.cdc.usds.simplereport.api.Translators.parsePhoneType; | ||
import static gov.cdc.usds.simplereport.api.Translators.parseUserShortDate; | ||
import static gov.cdc.usds.simplereport.api.Translators.parseYesNo; | ||
import static gov.cdc.usds.simplereport.validators.CsvValidatorUtils.convertEthnicityToDatabaseValue; | ||
import static gov.cdc.usds.simplereport.validators.CsvValidatorUtils.convertRaceToDatabaseValue; | ||
import static gov.cdc.usds.simplereport.validators.CsvValidatorUtils.convertSexToDatabaseValue; | ||
|
||
import com.fasterxml.jackson.databind.MappingIterator; | ||
import gov.cdc.usds.simplereport.api.model.errors.CsvProcessingException; | ||
import gov.cdc.usds.simplereport.api.model.filerow.PatientUploadRow; | ||
import gov.cdc.usds.simplereport.api.uploads.PatientBulkUploadResponse; | ||
import gov.cdc.usds.simplereport.config.AuthorizationConfiguration; | ||
import gov.cdc.usds.simplereport.db.model.Facility; | ||
import gov.cdc.usds.simplereport.db.model.Organization; | ||
import gov.cdc.usds.simplereport.db.model.Person; | ||
import gov.cdc.usds.simplereport.db.model.PhoneNumber; | ||
import gov.cdc.usds.simplereport.db.model.auxiliary.StreetAddress; | ||
import gov.cdc.usds.simplereport.db.model.auxiliary.UploadStatus; | ||
import gov.cdc.usds.simplereport.service.model.reportstream.FeedbackMessage; | ||
import gov.cdc.usds.simplereport.validators.CsvValidatorUtils; | ||
import gov.cdc.usds.simplereport.validators.FileValidator; | ||
import java.io.ByteArrayInputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.springframework.stereotype.Service; | ||
import org.springframework.transaction.annotation.Transactional; | ||
|
||
/** | ||
* Service to upload a roster of patient data given a CSV input. Formerly restricted to superusers | ||
* but now (almost) available to end users. | ||
* | ||
* <p>Updated by emmastephenson on 10/24/2022 | ||
* but now available to end users. | ||
*/ | ||
@Service | ||
@Transactional | ||
@RequiredArgsConstructor | ||
@Slf4j | ||
public class PatientBulkUploadService { | ||
|
||
private final PersonService _personService; | ||
private final AddressValidationService _addressValidationService; | ||
private final OrganizationService _organizationService; | ||
private final FileValidator<PatientUploadRow> _patientBulkUploadFileValidator; | ||
private final FileValidator<PatientUploadRow> patientUploadRowFileValidator; | ||
private final PatientBulkUploadServiceAsync patientBulkUploadServiceAsync; | ||
|
||
// This authorization will change once we open the feature to end users | ||
@AuthorizationConfiguration.RequireGlobalAdminUser | ||
@AuthorizationConfiguration.RequirePermissionCreatePatientAtFacility | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this authorization change premature here? my understanding is the feature will be available to org or facility managers, which this change would turn on, right? Do we want to wait to update this or go ahead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch - it is a little premature, and this will indeed open up the endpoint to org/facility managers. The reason I made the change is because having the existing permissions made testing nearly impossible - you needed a user that had global permissions to add the patients, but then to verify the patients were added you needed a user with permission to view the patients at a facility. Rather than try to work that permissioning out I just made this change, since it was next up in the work stream regardless. There's still no UI available for it (that little checkbox on the frontend does check for superuser permissions before displaying), so org/facility users shouldn't notice a change. They would be able to hit the endpoint directly with a CURL request, but I still think that's a low-risk enough activity to allow for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I thought I remembered this was necessary for tests. I'm fine with it, we may just be able to close the other ticket out at the same time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ticket in question is #4371 |
||
public PatientBulkUploadResponse processPersonCSV(InputStream csvStream, UUID facilityId) | ||
throws IllegalArgumentException { | ||
|
||
PatientBulkUploadResponse result = new PatientBulkUploadResponse(); | ||
|
||
Organization currentOrganization = _organizationService.getCurrentOrganization(); | ||
|
||
// Patients do not need to be assigned to a facility, but if an id is given it must be valid | ||
Optional<Facility> assignedFacility = | ||
Optional.ofNullable(facilityId).map(_organizationService::getFacilityInCurrentOrg); | ||
|
||
Set<Person> patientsList = new HashSet<>(); | ||
List<PhoneNumber> phoneNumbersList = new ArrayList<>(); | ||
|
||
byte[] content; | ||
|
||
try { | ||
|
@@ -80,110 +44,16 @@ public PatientBulkUploadResponse processPersonCSV(InputStream csvStream, UUID fa | |
} | ||
|
||
List<FeedbackMessage> errors = | ||
_patientBulkUploadFileValidator.validate(new ByteArrayInputStream(content)); | ||
patientUploadRowFileValidator.validate(new ByteArrayInputStream(content)); | ||
|
||
if (!errors.isEmpty()) { | ||
result.setStatus(UploadStatus.FAILURE); | ||
result.setErrors(errors.toArray(FeedbackMessage[]::new)); | ||
return result; | ||
} | ||
|
||
// This is the point where we need to figure out multithreading | ||
// because what needs to happen is that we return a success message to the end user | ||
// but continue to process the csv (create person records) in the background. | ||
// Putting a pin in it for now. | ||
|
||
final MappingIterator<Map<String, String>> valueIterator = | ||
CsvValidatorUtils.getIteratorForCsv(new ByteArrayInputStream(content)); | ||
|
||
while (valueIterator.hasNext()) { | ||
final Map<String, String> row = CsvValidatorUtils.getNextRow(valueIterator); | ||
|
||
try { | ||
|
||
PatientUploadRow extractedData = new PatientUploadRow(row); | ||
|
||
// Fetch address information | ||
StreetAddress address = | ||
_addressValidationService.getValidatedAddress( | ||
extractedData.getStreet().getValue(), | ||
extractedData.getStreet2().getValue(), | ||
extractedData.getCity().getValue(), | ||
extractedData.getState().getValue(), | ||
extractedData.getZipCode().getValue(), | ||
null); | ||
|
||
String country = | ||
extractedData.getCountry().getValue() == null | ||
? "USA" | ||
: extractedData.getCountry().getValue(); | ||
|
||
if (_personService.isDuplicatePatient( | ||
extractedData.getFirstName().getValue(), | ||
extractedData.getLastName().getValue(), | ||
parseUserShortDate(extractedData.getDateOfBirth().getValue()), | ||
currentOrganization, | ||
assignedFacility)) { | ||
continue; | ||
} | ||
|
||
// create new person with current organization, then add to new patients list | ||
Person newPatient = | ||
new Person( | ||
currentOrganization, | ||
assignedFacility.orElse(null), | ||
null, // lookupid | ||
extractedData.getFirstName().getValue(), | ||
extractedData.getMiddleName().getValue(), | ||
extractedData.getLastName().getValue(), | ||
extractedData.getSuffix().getValue(), | ||
parseUserShortDate(extractedData.getDateOfBirth().getValue()), | ||
address, | ||
country, | ||
parsePersonRole(extractedData.getRole().getValue(), false), | ||
List.of(extractedData.getEmail().getValue()), | ||
convertRaceToDatabaseValue(extractedData.getRace().getValue()), | ||
convertEthnicityToDatabaseValue(extractedData.getEthnicity().getValue()), | ||
null, // tribalAffiliation | ||
convertSexToDatabaseValue(extractedData.getBiologicalSex().getValue()), | ||
parseYesNo(extractedData.getResidentCongregateSetting().getValue()), | ||
parseYesNo(extractedData.getEmployedInHealthcare().getValue()), | ||
null, // preferredLanguage | ||
null // testResultDeliveryPreference | ||
); | ||
|
||
if (!patientsList.contains(newPatient)) { | ||
// collect phone numbers and associate them with the patient | ||
// then add to phone numbers list and set primary phone, if exists | ||
List<PhoneNumber> newPhoneNumbers = | ||
_personService.assignPhoneNumbersToPatient( | ||
newPatient, | ||
List.of( | ||
new PhoneNumber( | ||
parsePhoneType(extractedData.getPhoneNumberType().getValue()), | ||
extractedData.getPhoneNumber().getValue()))); | ||
phoneNumbersList.addAll(newPhoneNumbers); | ||
newPhoneNumbers.stream().findFirst().ifPresent(newPatient::setPrimaryPhone); | ||
|
||
patientsList.add(newPatient); | ||
} | ||
} catch (IllegalArgumentException e) { | ||
String errorMessage = "Error uploading patient roster"; | ||
log.error( | ||
errorMessage | ||
+ " for organization " | ||
+ currentOrganization.getExternalId() | ||
+ " and facility " | ||
+ facilityId); | ||
throw new IllegalArgumentException(errorMessage); | ||
} | ||
} | ||
|
||
_personService.addPatientsAndPhoneNumbers(patientsList, phoneNumbersList); | ||
|
||
log.info("CSV patient upload completed for {}", currentOrganization.getOrganizationName()); | ||
patientBulkUploadServiceAsync.savePatients(content, facilityId); | ||
result.setStatus(UploadStatus.SUCCESS); | ||
// eventually want to send an email here instead of return success | ||
return result; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the point of this docker cmd? I can see it restarts the sr_test_db container, not sure why, also I thought we werent using docker for the db in ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A restart is necessary due to the change made in
reset-db.sql
, which increases the number of connections allocated to the database. That setting does not take effect until the database is forcibly restarted.We are using docker for the database in the testing flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, it makes more sense to me to just start the container with the correct max-connections set,
If the github actions magical syntax doesn't support the passing of arguments, can we just add a separate step at the beginning for running the docker command directly at the beginning of the steps?
docker run -d -p 5432:5432 --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 -e POSTGRES_PASSWORD=this_is_a_super_secure_admin_password postgres:13-alpine3.16 -c max_connections=500
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benefit of doing it this way is that we could set the max value in the
reset-db
script (whichcreate-db.sh
uses) and that max value is used both here and in the docker test setup (used for local testing). If we change the docker run command directly here, we'd still need to change the value for local docker-compose. There was a prior commit that used that approach (back when we were testing locally and hadn't yet figured out that CI is different), 4a1ef269, but I prefer this way because it keeps the local test setup and the CI setup more in alignment.The real solution would be to use the full docker-compose script in CI instead of this custom action, but that's out of scope for today 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree about the docker-compose solution, can you please write a ticket to document and implement that solution, the current solution is certainly a half measure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done: #4856