diff --git a/demo/src/main/java/com/google/android/fhir/demo/data/DownloadWorkManagerImpl.kt b/demo/src/main/java/com/google/android/fhir/demo/data/DownloadWorkManagerImpl.kt index b235951743..cbc6ab07cd 100644 --- a/demo/src/main/java/com/google/android/fhir/demo/data/DownloadWorkManagerImpl.kt +++ b/demo/src/main/java/com/google/android/fhir/demo/data/DownloadWorkManagerImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021 Google LLC + * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,16 +30,17 @@ import org.hl7.fhir.r4.model.ResourceType class DownloadWorkManagerImpl : DownloadWorkManager { private val resourceTypeList = ResourceType.values().map { it.name } private val urls = LinkedList(listOf("Patient?address-city=NAIROBI")) + override var nextRequestUrl: String? = null override suspend fun getNextRequestUrl(context: SyncDownloadContext): String? { - var url = urls.poll() ?: return null + nextRequestUrl = urls.poll() ?: return null val resourceTypeToDownload = - ResourceType.fromCode(url.findAnyOf(resourceTypeList, ignoreCase = true)!!.second) + ResourceType.fromCode(nextRequestUrl?.findAnyOf(resourceTypeList, ignoreCase = true)!!.second) context.getLatestTimestampFor(resourceTypeToDownload)?.let { - url = affixLastUpdatedTimestamp(url!!, it) + nextRequestUrl = affixLastUpdatedTimestamp(nextRequestUrl!!, it) } - return url + return nextRequestUrl } override suspend fun processResponse(response: Resource): Collection { diff --git a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt index 3bb878c0b9..896cb41c2d 100644 --- a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt +++ b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt @@ -34,7 +34,9 @@ import com.google.android.fhir.sync.Resolved import com.google.android.fhir.toTimeZoneString import java.time.OffsetDateTime import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import org.hl7.fhir.r4.model.Bundle import org.hl7.fhir.r4.model.Resource import org.hl7.fhir.r4.model.ResourceType @@ -87,12 +89,13 @@ internal class FhirEngineImpl(private val database: Database, private val contex conflictResolver: ConflictResolver, download: suspend (SyncDownloadContext) -> Flow> ) { + download( - object : SyncDownloadContext { - override suspend fun getLatestTimestampFor(type: ResourceType) = database.lastUpdate(type) - } - ) - .collect { resources -> + object : SyncDownloadContext { + override suspend fun getLatestTimestampFor(type: ResourceType) = database.lastUpdate(type) + } + ) + .onEach { resources -> database.withTransaction { val resolved = resolveConflictingResources( @@ -104,6 +107,10 @@ internal class FhirEngineImpl(private val database: Database, private val contex saveResolvedResourcesToDatabase(resolved) } } + .catch { throwable: Throwable -> + Timber.e(throwable, "Error saving remote resource to database") + } + .collect() } private suspend fun saveResolvedResourcesToDatabase(resolved: List?) { @@ -115,9 +122,11 @@ internal class FhirEngineImpl(private val database: Database, private val contex private suspend fun saveRemoteResourcesToDatabase(resources: List) { val timeStamps = - resources.groupBy { it.resourceType }.entries.map { - SyncedResourceEntity(it.key, it.value.maxOf { it.meta.lastUpdated }.toTimeZoneString()) - } + resources + .groupBy { it.resourceType } + .entries.map { + SyncedResourceEntity(it.key, it.value.maxOf { it.meta.lastUpdated }.toTimeZoneString()) + } database.insertSyncedResources(timeStamps, resources) } @@ -207,7 +216,8 @@ internal class FhirEngineImpl(private val database: Database, private val contex */ private val Bundle.BundleEntryResponseComponent.resourceIdAndType: Pair? get() = - location?.split("/")?.takeIf { it.size > 3 }?.let { - it[it.size - 3] to ResourceType.fromCode(it[it.size - 4]) - } + location + ?.split("/") + ?.takeIf { it.size > 3 } + ?.let { it[it.size - 3] to ResourceType.fromCode(it[it.size - 4]) } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/DownloadWorkManager.kt b/engine/src/main/java/com/google/android/fhir/sync/DownloadWorkManager.kt index 994f99fc9d..2144e4c277 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/DownloadWorkManager.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/DownloadWorkManager.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021 Google LLC + * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,9 @@ import org.hl7.fhir.r4.model.Resource * manager be created or should there be an API to restart a new download job. */ interface DownloadWorkManager { + + var nextRequestUrl: String? + /** * Returns the URL for the next download request, or `null` if there is no more download request * to be issued. diff --git a/engine/src/main/java/com/google/android/fhir/sync/download/DownloaderImpl.kt b/engine/src/main/java/com/google/android/fhir/sync/download/DownloaderImpl.kt index d66b0dc73b..d244a9d20f 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/download/DownloaderImpl.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/download/DownloaderImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021 Google LLC + * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import com.google.android.fhir.sync.DownloadWorkManager import com.google.android.fhir.sync.Downloader import com.google.android.fhir.sync.ResourceSyncException import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.flow import org.hl7.fhir.r4.model.ResourceType @@ -38,25 +39,29 @@ internal class DownloaderImpl( ) : Downloader { private val resourceTypeList = ResourceType.values().map { it.name } - override suspend fun download(context: SyncDownloadContext): Flow = flow { - var resourceTypeToDownload: ResourceType = ResourceType.Bundle - emit(DownloadState.Started(resourceTypeToDownload)) - var url = downloadWorkManager.getNextRequestUrl(context) - while (url != null) { - try { - resourceTypeToDownload = - ResourceType.fromCode(url.findAnyOf(resourceTypeList, ignoreCase = true)!!.second) - - emit( - DownloadState.Success( - downloadWorkManager.processResponse(dataSource.download(url!!)).toList() + override suspend fun download(context: SyncDownloadContext): Flow = + flow { + val resourceTypeToDownload: ResourceType = ResourceType.Bundle + emit(DownloadState.Started(resourceTypeToDownload)) + var url = downloadWorkManager.getNextRequestUrl(context) + while (url != null) { + emit( + DownloadState.Success( + downloadWorkManager.processResponse(dataSource.download(url)).toList() + ) + ) + url = downloadWorkManager.getNextRequestUrl(context) + } + } + .catch { throwable: Throwable -> + val resourceTypeToDownload = + ResourceType.fromCode( + downloadWorkManager.nextRequestUrl + ?.findAnyOf(resourceTypeList, ignoreCase = true) + ?.second ) + emit( + DownloadState.Failure(ResourceSyncException(resourceTypeToDownload, Exception(throwable))) ) - } catch (exception: Exception) { - emit(DownloadState.Failure(ResourceSyncException(resourceTypeToDownload, exception))) } - - url = downloadWorkManager.getNextRequestUrl(context) - } - } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/download/ResourceParamsBasedDownloadWorkManager.kt b/engine/src/main/java/com/google/android/fhir/sync/download/ResourceParamsBasedDownloadWorkManager.kt index 0ae5b55bdc..39d7a9001e 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/download/ResourceParamsBasedDownloadWorkManager.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/download/ResourceParamsBasedDownloadWorkManager.kt @@ -38,6 +38,7 @@ typealias ResourceSearchParams = Map */ class ResourceParamsBasedDownloadWorkManager(syncParams: ResourceSearchParams) : DownloadWorkManager { + override var nextRequestUrl: String? = null private val resourcesToDownloadWithSearchParams = LinkedList(syncParams.entries) private val urlOfTheNextPagesToDownloadForAResource = LinkedList() @@ -57,7 +58,8 @@ class ResourceParamsBasedDownloadWorkManager(syncParams: ResourceSearchParams) : } } - "${resourceType.name}?${newParams.concatParams()}" + nextRequestUrl = "${resourceType.name}?${newParams.concatParams()}" + nextRequestUrl } } @@ -67,9 +69,9 @@ class ResourceParamsBasedDownloadWorkManager(syncParams: ResourceSearchParams) : } return if (response is Bundle && response.type == Bundle.BundleType.SEARCHSET) { - response.link.firstOrNull { component -> component.relation == "next" }?.url?.let { next -> - urlOfTheNextPagesToDownloadForAResource.add(next) - } + response.link + .firstOrNull { component -> component.relation == "next" } + ?.url?.let { next -> urlOfTheNextPagesToDownloadForAResource.add(next) } response.entry.map { it.resource } } else { diff --git a/engine/src/test-common/java/com/google/android/fhir/resource/TestingUtils.kt b/engine/src/test-common/java/com/google/android/fhir/resource/TestingUtils.kt index 87c2880c92..d42cb05689 100644 --- a/engine/src/test-common/java/com/google/android/fhir/resource/TestingUtils.kt +++ b/engine/src/test-common/java/com/google/android/fhir/resource/TestingUtils.kt @@ -31,7 +31,6 @@ import java.time.OffsetDateTime import java.util.Date import java.util.LinkedList import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect import org.hl7.fhir.r4.model.Bundle import org.hl7.fhir.r4.model.Meta import org.hl7.fhir.r4.model.Patient @@ -104,7 +103,12 @@ class TestingUtils constructor(private val iParser: IParser) { ) : DownloadWorkManager { private val urls = LinkedList(queries) - override suspend fun getNextRequestUrl(context: SyncDownloadContext): String? = urls.poll() + override var nextRequestUrl: String? = null + + override suspend fun getNextRequestUrl(context: SyncDownloadContext): String? { + nextRequestUrl = urls.poll() + return nextRequestUrl + } override suspend fun processResponse(response: Resource): Collection { val patient = Patient().setMeta(Meta().setLastUpdated(Date())) @@ -142,12 +146,12 @@ class TestingUtils constructor(private val iParser: IParser) { download: suspend (SyncDownloadContext) -> Flow> ) { download( - object : SyncDownloadContext { - override suspend fun getLatestTimestampFor(type: ResourceType): String { - return "123456788" + object : SyncDownloadContext { + override suspend fun getLatestTimestampFor(type: ResourceType): String { + return "123456788" + } } - } - ) + ) .collect {} } override suspend fun count(search: Search): Long { diff --git a/engine/src/test/java/com/google/android/fhir/sync/download/ResourceParamsBasedDownloadWorkManagerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/download/ResourceParamsBasedDownloadWorkManagerTest.kt index ff6acdc43d..f94651fbdd 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/download/ResourceParamsBasedDownloadWorkManagerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/download/ResourceParamsBasedDownloadWorkManagerTest.kt @@ -115,87 +115,91 @@ class ResourceParamsBasedDownloadWorkManagerTest { @Test fun getNextRequestUrl_withLastUpdatedTimeProvidedInContext_ShouldAppendGtPrefixToLastUpdatedSearchParam() = - runBlockingTest { - val downloadManager = - ResourceParamsBasedDownloadWorkManager(mapOf(ResourceType.Patient to emptyMap())) - val url = - downloadManager.getNextRequestUrl( - object : SyncDownloadContext { - override suspend fun getLatestTimestampFor(type: ResourceType) = "2022-06-28" - } - ) - assertThat(url).isEqualTo("Patient?_sort=_lastUpdated&_lastUpdated=gt2022-06-28") - } + runBlockingTest { + val downloadManager = + ResourceParamsBasedDownloadWorkManager(mapOf(ResourceType.Patient to emptyMap())) + val url = + downloadManager.getNextRequestUrl( + object : SyncDownloadContext { + override suspend fun getLatestTimestampFor(type: ResourceType) = "2022-06-28" + } + ) + assertThat(url).isEqualTo("Patient?_sort=_lastUpdated&_lastUpdated=gt2022-06-28") + } @Test fun getNextRequestUrl_withLastUpdatedSyncParamProvided_shouldReturnUrlWithExactProvidedLastUpdatedSyncParam() = - runBlockingTest { - val downloadManager = - ResourceParamsBasedDownloadWorkManager( - mapOf( - ResourceType.Patient to - mapOf( - SyncDataParams.LAST_UPDATED_KEY to "2022-06-28", - SyncDataParams.SORT_KEY to "status" - ) + runBlockingTest { + val downloadManager = + ResourceParamsBasedDownloadWorkManager( + mapOf( + ResourceType.Patient to + mapOf( + SyncDataParams.LAST_UPDATED_KEY to "2022-06-28", + SyncDataParams.SORT_KEY to "status" + ) + ) ) - ) - val url = - downloadManager.getNextRequestUrl( - object : SyncDownloadContext { - override suspend fun getLatestTimestampFor(type: ResourceType) = "2022-07-07" - } - ) - assertThat(url).isEqualTo("Patient?_lastUpdated=2022-06-28&_sort=status") - } + val url = + downloadManager.getNextRequestUrl( + object : SyncDownloadContext { + override suspend fun getLatestTimestampFor(type: ResourceType) = "2022-07-07" + } + ) + assertThat(url).isEqualTo("Patient?_lastUpdated=2022-06-28&_sort=status") + } @Test fun getNextRequestUrl_withLastUpdatedSyncParamHavingGtPrefix_shouldReturnUrlWithExactProvidedLastUpdatedSyncParam() = - runBlockingTest { - val downloadManager = - ResourceParamsBasedDownloadWorkManager( - mapOf(ResourceType.Patient to mapOf(SyncDataParams.LAST_UPDATED_KEY to "gt2022-06-28")) - ) - val url = - downloadManager.getNextRequestUrl( - object : SyncDownloadContext { - override suspend fun getLatestTimestampFor(type: ResourceType) = "2022-07-07" - } - ) - assertThat(url).isEqualTo("Patient?_lastUpdated=gt2022-06-28&_sort=_lastUpdated") - } + runBlockingTest { + val downloadManager = + ResourceParamsBasedDownloadWorkManager( + mapOf(ResourceType.Patient to mapOf(SyncDataParams.LAST_UPDATED_KEY to "gt2022-06-28")) + ) + val url = + downloadManager.getNextRequestUrl( + object : SyncDownloadContext { + override suspend fun getLatestTimestampFor(type: ResourceType) = "2022-07-07" + } + ) + assertThat(url).isEqualTo("Patient?_lastUpdated=gt2022-06-28&_sort=_lastUpdated") + } @Test fun getNextRequestUrl_withNullUpdatedTimeStamp_shouldReturnUrlWithoutLastUpdatedQueryParam() = - runBlockingTest { - val downloadManager = - ResourceParamsBasedDownloadWorkManager( - mapOf(ResourceType.Patient to mapOf(Patient.ADDRESS_CITY.paramName to "NAIROBI")) - ) - val actual = - downloadManager.getNextRequestUrl( - object : SyncDownloadContext { - override suspend fun getLatestTimestampFor(type: ResourceType) = null - } - ) - assertThat(actual).isEqualTo("Patient?address-city=NAIROBI&_sort=_lastUpdated") - } + runBlockingTest { + val downloadManager = + ResourceParamsBasedDownloadWorkManager( + mapOf(ResourceType.Patient to mapOf(Patient.ADDRESS_CITY.paramName to "NAIROBI")) + ) + val actual = + downloadManager.getNextRequestUrl( + object : SyncDownloadContext { + override suspend fun getLatestTimestampFor(type: ResourceType) = null + } + ) + val expectedUrl = "Patient?address-city=NAIROBI&_sort=_lastUpdated" + assertThat(downloadManager.nextRequestUrl).isEqualTo(expectedUrl) + assertThat(actual).isEqualTo(expectedUrl) + } @Test fun getNextRequestUrl_withEmptyUpdatedTimeStamp_shouldReturnUrlWithoutLastUpdatedQueryParam() = - runBlockingTest { - val downloadManager = - ResourceParamsBasedDownloadWorkManager( - mapOf(ResourceType.Patient to mapOf(Patient.ADDRESS_CITY.paramName to "NAIROBI")) - ) - val actual = - downloadManager.getNextRequestUrl( - object : SyncDownloadContext { - override suspend fun getLatestTimestampFor(type: ResourceType) = "" - } - ) - assertThat(actual).isEqualTo("Patient?address-city=NAIROBI&_sort=_lastUpdated") - } + runBlockingTest { + val downloadManager = + ResourceParamsBasedDownloadWorkManager( + mapOf(ResourceType.Patient to mapOf(Patient.ADDRESS_CITY.paramName to "NAIROBI")) + ) + val actual = + downloadManager.getNextRequestUrl( + object : SyncDownloadContext { + override suspend fun getLatestTimestampFor(type: ResourceType) = "" + } + ) + val expectedUrl = "Patient?address-city=NAIROBI&_sort=_lastUpdated" + assertThat(downloadManager.nextRequestUrl).isEqualTo(expectedUrl) + assertThat(actual).isEqualTo(expectedUrl) + } @Test fun processResponse_withBundleTypeSearchSet_shouldReturnPatient() = runBlockingTest {