Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sync failure state exception handling #1656

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,16 +30,17 @@ import org.hl7.fhir.r4.model.ResourceType
class DownloadWorkManagerImpl : DownloadWorkManager {
private val resourceTypeList = ResourceType.values().map { it.name }
private val urls = LinkedList(listOf("Patient?address-city=NAIROBI"))
override var nextRequestUrl: String? = null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it private. ideally this should not be modifiable outside this class

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. This was exposed to be used in com.google.android.fhir.sync.download.DownloaderImpl#download to get the resource type that is currently being synced. The catch operator can only access the flow instance and not the internal states within the flow { } closure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the resource type in ResourceSyncException data class? Most instance creation of that class have ResourceType.Bundle provided as the resource type.

Copy link
Collaborator

@maimoonak maimoonak Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it does emit the failed resource type. the test DownloaderImplTest.downloader with patient and observations should continue to download observations if patient download fail has an assertion for this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigating the failing tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured the cause of the filing tests. So apparently the catch operator completes the flow thus the remaining states are not emitted.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test was verifying whether emissions still happens even when there is a failure. Which I think should be the correct behavior. I will revert and remove the emission in the catch section of the try-catch block.


override suspend fun getNextRequestUrl(context: SyncDownloadContext): String? {
var url = urls.poll() ?: return null
nextRequestUrl = urls.poll() ?: return null

val resourceTypeToDownload =
ResourceType.fromCode(url.findAnyOf(resourceTypeList, ignoreCase = true)!!.second)
ResourceType.fromCode(nextRequestUrl?.findAnyOf(resourceTypeList, ignoreCase = true)!!.second)
context.getLatestTimestampFor(resourceTypeToDownload)?.let {
url = affixLastUpdatedTimestamp(url!!, it)
nextRequestUrl = affixLastUpdatedTimestamp(nextRequestUrl!!, it)
}
return url
return nextRequestUrl
}

override suspend fun processResponse(response: Resource): Collection<Resource> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import com.google.android.fhir.sync.Resolved
import com.google.android.fhir.toTimeZoneString
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
Expand Down Expand Up @@ -87,12 +89,13 @@ internal class FhirEngineImpl(private val database: Database, private val contex
conflictResolver: ConflictResolver,
download: suspend (SyncDownloadContext) -> Flow<List<Resource>>
) {

download(
object : SyncDownloadContext {
override suspend fun getLatestTimestampFor(type: ResourceType) = database.lastUpdate(type)
}
)
.collect { resources ->
object : SyncDownloadContext {
override suspend fun getLatestTimestampFor(type: ResourceType) = database.lastUpdate(type)
}
)
.onEach { resources ->
database.withTransaction {
val resolved =
resolveConflictingResources(
Expand All @@ -104,6 +107,10 @@ internal class FhirEngineImpl(private val database: Database, private val contex
saveResolvedResourcesToDatabase(resolved)
}
}
.catch { throwable: Throwable ->
Timber.e(throwable, "Error saving remote resource to database")
}
.collect()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are using on each and doing the processing above. do we need collect now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes collect operator is required to process all emitted values and handle exceptions that might occur in the upstream flow or during processing.

}

private suspend fun saveResolvedResourcesToDatabase(resolved: List<Resource>?) {
Expand All @@ -115,9 +122,11 @@ internal class FhirEngineImpl(private val database: Database, private val contex

private suspend fun saveRemoteResourcesToDatabase(resources: List<Resource>) {
val timeStamps =
resources.groupBy { it.resourceType }.entries.map {
SyncedResourceEntity(it.key, it.value.maxOf { it.meta.lastUpdated }.toTimeZoneString())
}
resources
.groupBy { it.resourceType }
.entries.map {
SyncedResourceEntity(it.key, it.value.maxOf { it.meta.lastUpdated }.toTimeZoneString())
}
database.insertSyncedResources(timeStamps, resources)
}

Expand Down Expand Up @@ -207,7 +216,8 @@ internal class FhirEngineImpl(private val database: Database, private val contex
*/
private val Bundle.BundleEntryResponseComponent.resourceIdAndType: Pair<String, ResourceType>?
get() =
location?.split("/")?.takeIf { it.size > 3 }?.let {
it[it.size - 3] to ResourceType.fromCode(it[it.size - 4])
}
location
?.split("/")
?.takeIf { it.size > 3 }
?.let { it[it.size - 3] to ResourceType.fromCode(it[it.size - 4]) }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,9 @@ import org.hl7.fhir.r4.model.Resource
* manager be created or should there be an API to restart a new download job.
*/
interface DownloadWorkManager {

var nextRequestUrl: String?

/**
* Returns the URL for the next download request, or `null` if there is no more download request
* to be issued.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@ import com.google.android.fhir.sync.DownloadWorkManager
import com.google.android.fhir.sync.Downloader
import com.google.android.fhir.sync.ResourceSyncException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import org.hl7.fhir.r4.model.ResourceType

Expand All @@ -38,25 +39,29 @@ internal class DownloaderImpl(
) : Downloader {
private val resourceTypeList = ResourceType.values().map { it.name }

override suspend fun download(context: SyncDownloadContext): Flow<DownloadState> = flow {
var resourceTypeToDownload: ResourceType = ResourceType.Bundle
emit(DownloadState.Started(resourceTypeToDownload))
var url = downloadWorkManager.getNextRequestUrl(context)
while (url != null) {
try {
resourceTypeToDownload =
ResourceType.fromCode(url.findAnyOf(resourceTypeList, ignoreCase = true)!!.second)

emit(
DownloadState.Success(
downloadWorkManager.processResponse(dataSource.download(url!!)).toList()
override suspend fun download(context: SyncDownloadContext): Flow<DownloadState> =
flow {
val resourceTypeToDownload: ResourceType = ResourceType.Bundle
emit(DownloadState.Started(resourceTypeToDownload))
var url = downloadWorkManager.getNextRequestUrl(context)
while (url != null) {
emit(
DownloadState.Success(
downloadWorkManager.processResponse(dataSource.download(url)).toList()
)
)
url = downloadWorkManager.getNextRequestUrl(context)
}
}
.catch { throwable: Throwable ->
val resourceTypeToDownload =
ResourceType.fromCode(
downloadWorkManager.nextRequestUrl
?.findAnyOf(resourceTypeList, ignoreCase = true)
?.second
)
emit(
DownloadState.Failure(ResourceSyncException(resourceTypeToDownload, Exception(throwable)))
)
} catch (exception: Exception) {
emit(DownloadState.Failure(ResourceSyncException(resourceTypeToDownload, exception)))
}

url = downloadWorkManager.getNextRequestUrl(context)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typealias ResourceSearchParams = Map<ResourceType, ParamMap>
*/
class ResourceParamsBasedDownloadWorkManager(syncParams: ResourceSearchParams) :
DownloadWorkManager {
override var nextRequestUrl: String? = null
private val resourcesToDownloadWithSearchParams = LinkedList(syncParams.entries)
private val urlOfTheNextPagesToDownloadForAResource = LinkedList<String>()

Expand All @@ -57,7 +58,8 @@ class ResourceParamsBasedDownloadWorkManager(syncParams: ResourceSearchParams) :
}
}

"${resourceType.name}?${newParams.concatParams()}"
nextRequestUrl = "${resourceType.name}?${newParams.concatParams()}"
nextRequestUrl
}
}

Expand All @@ -67,9 +69,9 @@ class ResourceParamsBasedDownloadWorkManager(syncParams: ResourceSearchParams) :
}

return if (response is Bundle && response.type == Bundle.BundleType.SEARCHSET) {
response.link.firstOrNull { component -> component.relation == "next" }?.url?.let { next ->
urlOfTheNextPagesToDownloadForAResource.add(next)
}
response.link
.firstOrNull { component -> component.relation == "next" }
?.url?.let { next -> urlOfTheNextPagesToDownloadForAResource.add(next) }

response.entry.map { it.resource }
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import java.time.OffsetDateTime
import java.util.Date
import java.util.LinkedList
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Meta
import org.hl7.fhir.r4.model.Patient
Expand Down Expand Up @@ -104,7 +103,12 @@ class TestingUtils constructor(private val iParser: IParser) {
) : DownloadWorkManager {
private val urls = LinkedList(queries)

override suspend fun getNextRequestUrl(context: SyncDownloadContext): String? = urls.poll()
override var nextRequestUrl: String? = null

override suspend fun getNextRequestUrl(context: SyncDownloadContext): String? {
nextRequestUrl = urls.poll()
return nextRequestUrl
}

override suspend fun processResponse(response: Resource): Collection<Resource> {
val patient = Patient().setMeta(Meta().setLastUpdated(Date()))
Expand Down Expand Up @@ -142,12 +146,12 @@ class TestingUtils constructor(private val iParser: IParser) {
download: suspend (SyncDownloadContext) -> Flow<List<Resource>>
) {
download(
object : SyncDownloadContext {
override suspend fun getLatestTimestampFor(type: ResourceType): String {
return "123456788"
object : SyncDownloadContext {
override suspend fun getLatestTimestampFor(type: ResourceType): String {
return "123456788"
}
}
}
)
)
.collect {}
}
override suspend fun count(search: Search): Long {
Expand Down
Loading