Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async processing for patient bulk uploads #4722

Merged
merged 50 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f7400ec
batch save for patients and phone numbers
georgehager Nov 10, 2022
f3286fe
fix backend linting 1
georgehager Nov 10, 2022
4fbccb8
fix backend linting 2
georgehager Nov 10, 2022
de603aa
remove unnecessary comments
georgehager Nov 14, 2022
5ae43fd
move adding patient to list
georgehager Nov 14, 2022
1d307c3
cleanup
georgehager Nov 14, 2022
47f5315
try request context
emmastephenson Nov 14, 2022
4eb912a
extract async service
emmastephenson Nov 14, 2022
0a49bda
fix java linting
georgehager Nov 14, 2022
3a40e12
try/catch scope
emmastephenson Nov 14, 2022
772af8c
add new person constructor and test for phone numbers assignment
georgehager Nov 15, 2022
4686e3e
clean up code reuse
georgehager Nov 15, 2022
89f6f5c
refactor person constructor and deduplicate persons
georgehager Nov 15, 2022
739d159
cleanup
georgehager Nov 15, 2022
d1ffe32
add test for no phone type
georgehager Nov 15, 2022
dcf5189
try request context
emmastephenson Nov 14, 2022
c5cbaf0
extract async service
emmastephenson Nov 14, 2022
91e10a5
try/catch scope
emmastephenson Nov 14, 2022
a907285
remove unnecessary executor bean and friends
emmastephenson Nov 15, 2022
16a5842
merge with remote?
emmastephenson Nov 15, 2022
7e04b7a
sync with latest changes from george/4662-batch-save-patients
emmastephenson Nov 15, 2022
958c90d
have I mentioned how much I dislike rebasing
emmastephenson Nov 16, 2022
ab2364f
merge with main
emmastephenson Nov 17, 2022
3769a8d
update async service with latest changes
emmastephenson Nov 17, 2022
acd9c5c
testing wip
emmastephenson Nov 18, 2022
7abf48e
integration test wip
emmastephenson Nov 22, 2022
5c95280
tests still failing, but different fails
emmastephenson Nov 23, 2022
a8f6ce5
tests wip
emmastephenson Nov 23, 2022
3e78e5f
Merge branch 'main' of https://github.com/CDCgov/prime-simplereport i…
emmastephenson Nov 23, 2022
8a83001
some service tests working
emmastephenson Nov 23, 2022
c93fa69
patientBulkUploadServiceTest updated
emmastephenson Nov 24, 2022
88c98a5
wip, tests use BaseFullStackTest
emmastephenson Nov 30, 2022
71162cd
ugly but WORKING
emmastephenson Dec 2, 2022
a8fa5d5
tests passing; comments cleaned
emmastephenson Dec 3, 2022
9c4aa47
merge with main
emmastephenson Dec 3, 2022
7228cc2
update imports for bulkuploadasync
emmastephenson Dec 3, 2022
e4073fb
hopefully fix the frontend linter?
emmastephenson Dec 5, 2022
eaf4a4d
Merge branch 'main' of https://github.com/CDCgov/prime-simplereport i…
emmastephenson Dec 5, 2022
0c843ed
update hikari pool size
emmastephenson Dec 6, 2022
4a1ef26
increase total database connections available
emmastephenson Dec 12, 2022
e550312
Modify db setup scripts as central location for increasing number of …
rin-skylight Dec 13, 2022
11c15c1
Add the ALTER SYSTEM call.
rin-skylight Dec 13, 2022
3e0bc7e
turn on jdbc batching
georgehager Dec 14, 2022
35fe937
add naive batching for saves
emmastephenson Dec 16, 2022
c174101
fix code smells
emmastephenson Dec 16, 2022
cc0d1ae
take off order_inserts
emmastephenson Dec 16, 2022
0d47551
fix duplicate checkiing
emmastephenson Dec 16, 2022
03a120e
remove comment
emmastephenson Dec 16, 2022
df2ed6d
Merge branch 'main' into emma/async-3
georgehager Dec 20, 2022
79eebd9
optimize batching
georgehager Dec 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 5
--name sr_test_db
ports:
- 5432:5432
steps:
Expand All @@ -60,6 +61,7 @@ jobs:
run: |
chmod 0600 $PGPASSFILE
db-setup/create-db.sh
docker restart --time 0 sr_test_db
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the point of this docker cmd? I can see it restarts the sr_test_db container, not sure why, also I thought we werent using docker for the db in ci

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A restart is necessary due to the change made in reset-db.sql, which increases the number of connections allocated to the database. That setting does not take effect until the database is forcibly restarted.

We are using docker for the database in the testing flow.

Copy link
Contributor

@zdeveloper zdeveloper Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, it makes more sense to me to just start the container with the correct max-connections set,

If the github actions magical syntax doesn't support the passing of arguments, can we just add a separate step at the beginning for running the docker command directly at the beginning of the steps?

docker run -d -p 5432:5432 --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 -e POSTGRES_PASSWORD=this_is_a_super_secure_admin_password postgres:13-alpine3.16 -c max_connections=500

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benefit of doing it this way is that we could set the max value in the reset-db script (which create-db.sh uses) and that max value is used both here and in the docker test setup (used for local testing). If we change the docker run command directly here, we'd still need to change the value for local docker-compose. There was a prior commit that used that approach (back when we were testing locally and hadn't yet figured out that CI is different), 4a1ef269, but I prefer this way because it keeps the local test setup and the CI setup more in alignment.

The real solution would be to use the full docker-compose script in CI instead of this custom action, but that's out of scope for today 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about the docker-compose solution, can you please write a ticket to document and implement that solution, the current solution is certainly a half measure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #4856

- name: Run tests
env:
OKTA_TESTING_DISABLEHTTPS: true
Expand Down
2 changes: 2 additions & 0 deletions backend/db-setup/reset-db.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
ALTER SYSTEM SET max_connections = 500;

DROP SCHEMA IF EXISTS simple_report CASCADE;

CREATE SCHEMA IF NOT EXISTS simple_report;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import gov.cdc.usds.simplereport.service.OrganizationInitializingService;
import gov.cdc.usds.simplereport.service.ScheduledTasksService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand All @@ -24,7 +25,9 @@
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.security.core.context.SecurityContextHolder;

@Slf4j
@SpringBootApplication
Expand All @@ -40,13 +43,20 @@
CorsProperties.class,
AzureStorageQueueReportingProperties.class
})
@EnableAsync
@EnableScheduling
@EnableFeignClients
public class SimpleReportApplication {
public static void main(String[] args) {
SpringApplication.run(SimpleReportApplication.class, args);
}

@Bean
public InitializingBean initializingBean() {
return () ->
SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
}

@Bean
public CommandLineRunner initDiseasesOnStartup(DiseaseService initService) {
return args -> initService.initDiseases();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.support.ScopeNotActiveException;
import org.springframework.context.annotation.Profile;
import org.springframework.http.HttpStatus;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service;

/** Handles all user/organization management in Okta */
Expand Down Expand Up @@ -309,13 +312,20 @@ private Optional<OrganizationRoleClaims> getOrganizationRoleClaimsFromTenantData

public Optional<OrganizationRoleClaims> getOrganizationRoleClaimsForUser(String username) {
// when accessing tenant data, bypass okta and get org from the altered authorities
if (tenantDataContextHolder.hasBeenPopulated()
&& username.equals(tenantDataContextHolder.getUsername())) {
return getOrganizationRoleClaimsFromTenantDataAccess(
tenantDataContextHolder.getAuthorities());
try {
if (tenantDataContextHolder.hasBeenPopulated()
&& username.equals(tenantDataContextHolder.getUsername())) {
return getOrganizationRoleClaimsFromTenantDataAccess(
tenantDataContextHolder.getAuthorities());
}
return Optional.ofNullable(usernameOrgRolesMap.get(username));
} catch (ScopeNotActiveException e) {
Set<String> authorities =
SecurityContextHolder.getContext().getAuthentication().getAuthorities().stream()
.map(GrantedAuthority::getAuthority)
.collect(Collectors.toSet());
return getOrganizationRoleClaimsFromTenantDataAccess(authorities);
}

return Optional.ofNullable(usernameOrgRolesMap.get(username));
}

public void reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.ScopeNotActiveException;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
Expand Down Expand Up @@ -505,13 +506,17 @@ private ApiUser getCurrentApiUser() {
return getCurrentApiUserNoCache();
}

if (_apiUserContextHolder.hasBeenPopulated()) {
log.debug("Retrieving user from request context");
return _apiUserContextHolder.getCurrentApiUser();
try {
if (_apiUserContextHolder.hasBeenPopulated()) {
log.debug("Retrieving user from request context");
return _apiUserContextHolder.getCurrentApiUser();
}
ApiUser user = getCurrentApiUserNoCache();
_apiUserContextHolder.setCurrentApiUser(user);
return user;
} catch (ScopeNotActiveException e) {
return getCurrentApiUserNoCache();
georgehager marked this conversation as resolved.
Show resolved Hide resolved
}
ApiUser user = getCurrentApiUserNoCache();
_apiUserContextHolder.setCurrentApiUser(user);
return user;
}

private ApiUser getCurrentApiUserNoCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.support.ScopeNotActiveException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -51,12 +52,16 @@ public void resetOrganizationRolesContext() {
}

public Optional<OrganizationRoles> getCurrentOrganizationRoles() {
if (organizationRolesContext.hasBeenPopulated()) {
return organizationRolesContext.getOrganizationRoles();
try {
if (organizationRolesContext.hasBeenPopulated()) {
return organizationRolesContext.getOrganizationRoles();
}
var result = fetchCurrentOrganizationRoles();
organizationRolesContext.setOrganizationRoles(result);
return result;
} catch (ScopeNotActiveException e) {
return fetchCurrentOrganizationRoles();
georgehager marked this conversation as resolved.
Show resolved Hide resolved
}
var result = fetchCurrentOrganizationRoles();
organizationRolesContext.setOrganizationRoles(result);
return result;
}

private Optional<OrganizationRoles> fetchCurrentOrganizationRoles() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,75 +1,39 @@
package gov.cdc.usds.simplereport.service;

import static gov.cdc.usds.simplereport.api.Translators.parsePersonRole;
import static gov.cdc.usds.simplereport.api.Translators.parsePhoneType;
import static gov.cdc.usds.simplereport.api.Translators.parseUserShortDate;
import static gov.cdc.usds.simplereport.api.Translators.parseYesNo;
import static gov.cdc.usds.simplereport.validators.CsvValidatorUtils.convertEthnicityToDatabaseValue;
import static gov.cdc.usds.simplereport.validators.CsvValidatorUtils.convertRaceToDatabaseValue;
import static gov.cdc.usds.simplereport.validators.CsvValidatorUtils.convertSexToDatabaseValue;

import com.fasterxml.jackson.databind.MappingIterator;
import gov.cdc.usds.simplereport.api.model.errors.CsvProcessingException;
import gov.cdc.usds.simplereport.api.model.filerow.PatientUploadRow;
import gov.cdc.usds.simplereport.api.uploads.PatientBulkUploadResponse;
import gov.cdc.usds.simplereport.config.AuthorizationConfiguration;
import gov.cdc.usds.simplereport.db.model.Facility;
import gov.cdc.usds.simplereport.db.model.Organization;
import gov.cdc.usds.simplereport.db.model.Person;
import gov.cdc.usds.simplereport.db.model.PhoneNumber;
import gov.cdc.usds.simplereport.db.model.auxiliary.StreetAddress;
import gov.cdc.usds.simplereport.db.model.auxiliary.UploadStatus;
import gov.cdc.usds.simplereport.service.model.reportstream.FeedbackMessage;
import gov.cdc.usds.simplereport.validators.CsvValidatorUtils;
import gov.cdc.usds.simplereport.validators.FileValidator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
* Service to upload a roster of patient data given a CSV input. Formerly restricted to superusers
* but now (almost) available to end users.
*
* <p>Updated by emmastephenson on 10/24/2022
* but now available to end users.
*/
@Service
@Transactional
@RequiredArgsConstructor
@Slf4j
public class PatientBulkUploadService {

private final PersonService _personService;
private final AddressValidationService _addressValidationService;
private final OrganizationService _organizationService;
private final FileValidator<PatientUploadRow> _patientBulkUploadFileValidator;
private final FileValidator<PatientUploadRow> patientUploadRowFileValidator;
private final PatientBulkUploadServiceAsync patientBulkUploadServiceAsync;

// This authorization will change once we open the feature to end users
@AuthorizationConfiguration.RequireGlobalAdminUser
@AuthorizationConfiguration.RequirePermissionCreatePatientAtFacility
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this authorization change premature here? my understanding is the feature will be available to org or facility managers, which this change would turn on, right? Do we want to wait to update this or go ahead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - it is a little premature, and this will indeed open up the endpoint to org/facility managers. The reason I made the change is because having the existing permissions made testing nearly impossible - you needed a user that had global permissions to add the patients, but then to verify the patients were added you needed a user with permission to view the patients at a facility. Rather than try to work that permissioning out I just made this change, since it was next up in the work stream regardless. There's still no UI available for it (that little checkbox on the frontend does check for superuser permissions before displaying), so org/facility users shouldn't notice a change. They would be able to hit the endpoint directly with a CURL request, but I still think that's a low-risk enough activity to allow for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought I remembered this was necessary for tests. I'm fine with it, we may just be able to close the other ticket out at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ticket in question is #4371

public PatientBulkUploadResponse processPersonCSV(InputStream csvStream, UUID facilityId)
throws IllegalArgumentException {

PatientBulkUploadResponse result = new PatientBulkUploadResponse();

Organization currentOrganization = _organizationService.getCurrentOrganization();

// Patients do not need to be assigned to a facility, but if an id is given it must be valid
Optional<Facility> assignedFacility =
Optional.ofNullable(facilityId).map(_organizationService::getFacilityInCurrentOrg);

Set<Person> patientsList = new HashSet<>();
List<PhoneNumber> phoneNumbersList = new ArrayList<>();

byte[] content;

try {
Expand All @@ -80,110 +44,16 @@ public PatientBulkUploadResponse processPersonCSV(InputStream csvStream, UUID fa
}

List<FeedbackMessage> errors =
_patientBulkUploadFileValidator.validate(new ByteArrayInputStream(content));
patientUploadRowFileValidator.validate(new ByteArrayInputStream(content));

if (!errors.isEmpty()) {
result.setStatus(UploadStatus.FAILURE);
result.setErrors(errors.toArray(FeedbackMessage[]::new));
return result;
}

// This is the point where we need to figure out multithreading
// because what needs to happen is that we return a success message to the end user
// but continue to process the csv (create person records) in the background.
// Putting a pin in it for now.

final MappingIterator<Map<String, String>> valueIterator =
CsvValidatorUtils.getIteratorForCsv(new ByteArrayInputStream(content));

while (valueIterator.hasNext()) {
final Map<String, String> row = CsvValidatorUtils.getNextRow(valueIterator);

try {

PatientUploadRow extractedData = new PatientUploadRow(row);

// Fetch address information
StreetAddress address =
_addressValidationService.getValidatedAddress(
extractedData.getStreet().getValue(),
extractedData.getStreet2().getValue(),
extractedData.getCity().getValue(),
extractedData.getState().getValue(),
extractedData.getZipCode().getValue(),
null);

String country =
extractedData.getCountry().getValue() == null
? "USA"
: extractedData.getCountry().getValue();

if (_personService.isDuplicatePatient(
extractedData.getFirstName().getValue(),
extractedData.getLastName().getValue(),
parseUserShortDate(extractedData.getDateOfBirth().getValue()),
currentOrganization,
assignedFacility)) {
continue;
}

// create new person with current organization, then add to new patients list
Person newPatient =
new Person(
currentOrganization,
assignedFacility.orElse(null),
null, // lookupid
extractedData.getFirstName().getValue(),
extractedData.getMiddleName().getValue(),
extractedData.getLastName().getValue(),
extractedData.getSuffix().getValue(),
parseUserShortDate(extractedData.getDateOfBirth().getValue()),
address,
country,
parsePersonRole(extractedData.getRole().getValue(), false),
List.of(extractedData.getEmail().getValue()),
convertRaceToDatabaseValue(extractedData.getRace().getValue()),
convertEthnicityToDatabaseValue(extractedData.getEthnicity().getValue()),
null, // tribalAffiliation
convertSexToDatabaseValue(extractedData.getBiologicalSex().getValue()),
parseYesNo(extractedData.getResidentCongregateSetting().getValue()),
parseYesNo(extractedData.getEmployedInHealthcare().getValue()),
null, // preferredLanguage
null // testResultDeliveryPreference
);

if (!patientsList.contains(newPatient)) {
// collect phone numbers and associate them with the patient
// then add to phone numbers list and set primary phone, if exists
List<PhoneNumber> newPhoneNumbers =
_personService.assignPhoneNumbersToPatient(
newPatient,
List.of(
new PhoneNumber(
parsePhoneType(extractedData.getPhoneNumberType().getValue()),
extractedData.getPhoneNumber().getValue())));
phoneNumbersList.addAll(newPhoneNumbers);
newPhoneNumbers.stream().findFirst().ifPresent(newPatient::setPrimaryPhone);

patientsList.add(newPatient);
}
} catch (IllegalArgumentException e) {
String errorMessage = "Error uploading patient roster";
log.error(
errorMessage
+ " for organization "
+ currentOrganization.getExternalId()
+ " and facility "
+ facilityId);
throw new IllegalArgumentException(errorMessage);
}
}

_personService.addPatientsAndPhoneNumbers(patientsList, phoneNumbersList);

log.info("CSV patient upload completed for {}", currentOrganization.getOrganizationName());
patientBulkUploadServiceAsync.savePatients(content, facilityId);
result.setStatus(UploadStatus.SUCCESS);
// eventually want to send an email here instead of return success
return result;
}
}
Loading