Skip to content

Commit

Permalink
Merge pull request #2059 from OneSignal/improve/handle_incorrect_404
Browse files Browse the repository at this point in the history
[Fix]Handle incorrect 404 responses; add a delay after creates and retries on 404 of new ids
  • Loading branch information
jkasten2 authored Apr 22, 2024
2 parents cd9830c + a9dfc09 commit 12e8359
Show file tree
Hide file tree
Showing 31 changed files with 618 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,32 @@ class ConfigModel : Model() {
setLongProperty(::opRepoPostWakeDelay.name, value)
}

/**
* The number of milliseconds to delay after an operation completes
* that creates or changes ids.
* This is a "cold down" period to avoid a caveat with OneSignal's backend
* replication, where you may incorrectly get a 404 when attempting a GET
* or PATCH REST API call on something just after it is created.
*/
var opRepoPostCreateDelay: Long
get() = getLongProperty(::opRepoPostCreateDelay.name) { 5_000 }
set(value) {
setLongProperty(::opRepoPostCreateDelay.name, value)
}

/**
* The number of milliseconds to retry operations for new models.
* This is a fallback to opRepoPostCreateDelay, where it's delay may
* not be enough. The server may be unusually overloaded so we will
* retry these (back-off rules apply to all retries) as we only want
* to re-create records as a last resort.
*/
var opRepoPostCreateRetryUpTo: Long
get() = getLongProperty(::opRepoPostCreateRetryUpTo.name) { 60_000 }
set(value) {
setLongProperty(::opRepoPostCreateRetryUpTo.name, value)
}

/**
* The minimum number of milliseconds required to pass to allow the fetching of IAM to occur.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ abstract class Operation(name: String) : Model() {
this.name = name
}

/**
* This is a unique id that points to a record this operation will affect.
* Example: If the operation is updating tags on a User this will be the onesignalId.
*/
abstract val applyToRecordId: String

/**
* The key of this operation for when the starting operation has a [groupComparisonType]
* of [GroupComparisonType.CREATE]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.onesignal.core.internal.operations.impl

import com.onesignal.common.threading.WaiterWithValue
import com.onesignal.common.threading.suspendifyOnThread
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.ExecutionResult
import com.onesignal.core.internal.operations.GroupComparisonType
Expand All @@ -12,7 +11,11 @@ import com.onesignal.core.internal.startup.IStartableService
import com.onesignal.core.internal.time.ITime
import com.onesignal.debug.LogLevel
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.user.internal.operations.impl.states.NewRecordsState
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withTimeoutOrNull
import java.util.UUID
import kotlin.reflect.KClass
Expand All @@ -22,6 +25,7 @@ internal class OperationRepo(
private val _operationModelStore: OperationModelStore,
private val _configModelStore: ConfigModelStore,
private val _time: ITime,
private val _newRecordState: NewRecordsState,
) : IOperationRepo, IStartableService {
internal class OperationQueueItem(
val operation: Operation,
Expand All @@ -34,10 +38,16 @@ internal class OperationRepo(
}
}

internal class LoopWaiterMessage(
val force: Boolean,
val previousWaitedTime: Long = 0,
)

private val executorsMap: Map<String, IOperationExecutor>
private val queue = mutableListOf<OperationQueueItem>()
private val waiter = WaiterWithValue<Boolean>()
private val waiter = WaiterWithValue<LoopWaiterMessage>()
private var paused = false
private var coroutineScope = CoroutineScope(newSingleThreadContext(name = "OpRepo"))

/** *** Buckets ***
* Purpose: Bucketing is a pattern we are using to help save network
Expand All @@ -56,13 +66,11 @@ internal class OperationRepo(
* network calls.
*/
private var enqueueIntoBucket = 0
private val executeBucket: Int get() {
return if (enqueueIntoBucket == 0) 0 else enqueueIntoBucket - 1
}
private val executeBucket get() =
if (enqueueIntoBucket == 0) 0 else enqueueIntoBucket - 1

init {
val executorsMap: MutableMap<String, IOperationExecutor> = mutableMapOf()

for (executor in executors) {
for (operation in executor.operations) {
executorsMap[operation] = executor
Expand All @@ -83,9 +91,7 @@ internal class OperationRepo(

override fun start() {
paused = false
suspendifyOnThread(name = "OpRepo") {
processQueueForever()
}
coroutineScope.launch { processQueueForever() }
}

override fun enqueue(
Expand Down Expand Up @@ -123,7 +129,7 @@ internal class OperationRepo(
}
}

waiter.wake(flush)
waiter.wake(LoopWaiterMessage(flush, 0))
}

/**
Expand Down Expand Up @@ -160,16 +166,16 @@ internal class OperationRepo(
*/
private suspend fun waitForNewOperationAndExecutionInterval() {
// 1. Wait for an operation to be enqueued
var force = waiter.waitForWake()
var wakeMessage = waiter.waitForWake()

// 2. Wait at least the time defined in opRepoExecutionInterval
// so operations can be grouped, unless one of them used
// flush=true (AKA force)
var lastTime = _time.currentTimeMillis
var remainingTime = _configModelStore.model.opRepoExecutionInterval
while (!force && remainingTime > 0) {
var remainingTime = _configModelStore.model.opRepoExecutionInterval - wakeMessage.previousWaitedTime
while (!wakeMessage.force && remainingTime > 0) {
withTimeoutOrNull(remainingTime) {
force = waiter.waitForWake()
wakeMessage = waiter.waitForWake()
}
remainingTime -= _time.currentTimeMillis - lastTime
lastTime = _time.currentTimeMillis
Expand All @@ -195,6 +201,14 @@ internal class OperationRepo(
synchronized(queue) {
queue.forEach { it.operation.translateIds(response.idTranslations) }
}
response.idTranslations.values.forEach { _newRecordState.add(it) }
coroutineScope.launch {
val waitTime = _configModelStore.model.opRepoPostCreateDelay
delay(waitTime)
synchronized(queue) {
if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime))
}
}
}

when (response.result) {
Expand Down Expand Up @@ -278,7 +292,9 @@ internal class OperationRepo(
return synchronized(queue) {
val startingOp =
queue.firstOrNull {
it.operation.canStartExecute && it.bucket <= bucketFilter
it.operation.canStartExecute &&
_newRecordState.canAccess(it.operation.applyToRecordId) &&
it.bucket <= bucketFilter
}

if (startingOp != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.onesignal.user.internal.operations.impl.executors.UpdateUserOperation
import com.onesignal.user.internal.operations.impl.listeners.IdentityModelStoreListener
import com.onesignal.user.internal.operations.impl.listeners.PropertiesModelStoreListener
import com.onesignal.user.internal.operations.impl.listeners.SubscriptionModelStoreListener
import com.onesignal.user.internal.operations.impl.states.NewRecordsState
import com.onesignal.user.internal.properties.PropertiesModelStore
import com.onesignal.user.internal.service.UserRefreshService
import com.onesignal.user.internal.subscriptions.ISubscriptionManager
Expand Down Expand Up @@ -68,5 +69,8 @@ internal class UserModule : IModule {
builder.register<UserRefreshService>().provides<IStartableService>()

builder.register<RecoverFromDroppedLoginBug>().provides<IStartableService>()

// Shared state between Executors
builder.register<NewRecordsState>().provides<NewRecordsState>()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class CreateSubscriptionOperation() : Operation(SubscriptionOperationExecutor.CR
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId.Subscription.$subscriptionId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String, subscriptionId: String, type: SubscriptionType, enabled: Boolean, address: String, status: SubscriptionStatus) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class DeleteAliasOperation() : Operation(IdentityOperationExecutor.DELETE_ALIAS)
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId.Alias.$label"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.NONE
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String, label: String) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class DeleteSubscriptionOperation() : Operation(SubscriptionOperationExecutor.DE
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId.Subscription.$subscriptionId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.NONE
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId) && !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = subscriptionId

constructor(appId: String, onesignalId: String, subscriptionId: String) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class DeleteTagOperation() : Operation(UpdateUserOperationExecutor.DELETE_TAG) {
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String, key: String) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class LoginUserFromSubscriptionOperation() : Operation(LoginUserFromSubscription
override val modifyComparisonKey: String get() = "$appId.Subscription.$subscriptionId.Login"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.NONE
override val canStartExecute: Boolean = true
override val applyToRecordId: String get() = subscriptionId

constructor(appId: String, onesignalId: String, subscriptionId: String) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class LoginUserOperation() : Operation(LoginUserOperationExecutor.LOGIN_USER) {
override val modifyComparisonKey: String = ""
override val groupComparisonType: GroupComparisonType = GroupComparisonType.CREATE
override val canStartExecute: Boolean get() = existingOnesignalId == null || !IDManager.isLocalId(existingOnesignalId!!)
override val applyToRecordId: String get() = existingOnesignalId ?: onesignalId

constructor(appId: String, onesignalId: String, externalId: String?, existingOneSignalId: String? = null) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class RefreshUserOperation() : Operation(RefreshUserOperationExecutor.REFRESH_US
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId.Refresh"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.CREATE
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class SetAliasOperation() : Operation(IdentityOperationExecutor.SET_ALIAS) {
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId.Identity.$label"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String, label: String, value: String) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class SetPropertyOperation() : Operation(UpdateUserOperationExecutor.SET_PROPERT
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String, property: String, value: Any?) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class SetTagOperation() : Operation(UpdateUserOperationExecutor.SET_TAG) {
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String, key: String, value: String) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class TrackPurchaseOperation() : Operation(UpdateUserOperationExecutor.TRACK_PUR
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String, treatNewAsExisting: Boolean, amountSpent: BigDecimal, purchases: List<PurchaseInfo>) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TrackSessionEndOperation() : Operation(UpdateUserOperationExecutor.TRACK_S
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String, sessionTime: Long) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class TrackSessionStartOperation() : Operation(UpdateUserOperationExecutor.TRACK
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = onesignalId

constructor(appId: String, onesignalId: String) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class TransferSubscriptionOperation() : Operation(SubscriptionOperationExecutor.
override val modifyComparisonKey: String get() = "$appId.Subscription.$subscriptionId.Transfer"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.NONE
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId) && !IDManager.isLocalId(subscriptionId)
override val applyToRecordId: String get() = subscriptionId

constructor(appId: String, subscriptionId: String, onesignalId: String) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class UpdateSubscriptionOperation() : Operation(SubscriptionOperationExecutor.UP
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId.Subscription.$subscriptionId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId) && !IDManager.isLocalId(onesignalId)
override val applyToRecordId: String get() = subscriptionId

constructor(appId: String, onesignalId: String, subscriptionId: String, type: SubscriptionType, enabled: Boolean, address: String, status: SubscriptionStatus) : this() {
this.appId = appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import com.onesignal.user.internal.builduser.IRebuildUserService
import com.onesignal.user.internal.identity.IdentityModelStore
import com.onesignal.user.internal.operations.DeleteAliasOperation
import com.onesignal.user.internal.operations.SetAliasOperation
import com.onesignal.user.internal.operations.impl.states.NewRecordsState

internal class IdentityOperationExecutor(
private val _identityBackend: IIdentityBackendService,
private val _identityModelStore: IdentityModelStore,
private val _buildUserService: IRebuildUserService,
private val _newRecordState: NewRecordsState,
) : IOperationExecutor {
override val operations: List<String>
get() = listOf(SET_ALIAS, DELETE_ALIAS)
Expand Down Expand Up @@ -67,11 +69,15 @@ internal class IdentityOperationExecutor(
NetworkUtils.ResponseStatusType.UNAUTHORIZED ->
ExecutionResponse(ExecutionResult.FAIL_UNAUTHORIZED)
NetworkUtils.ResponseStatusType.MISSING -> {
val operations = _buildUserService.getRebuildOperationsIfCurrentUser(lastOperation.appId, lastOperation.onesignalId)
if (operations == null) {
if (ex.statusCode == 404 && _newRecordState.isInMissingRetryWindow(lastOperation.onesignalId)) {
return ExecutionResponse(ExecutionResult.FAIL_RETRY)
}

val rebuildOps = _buildUserService.getRebuildOperationsIfCurrentUser(lastOperation.appId, lastOperation.onesignalId)
if (rebuildOps == null) {
return ExecutionResponse(ExecutionResult.FAIL_NORETRY)
} else {
return ExecutionResponse(ExecutionResult.FAIL_RETRY, operations = operations)
return ExecutionResponse(ExecutionResult.FAIL_RETRY, operations = rebuildOps)
}
}
}
Expand Down Expand Up @@ -103,10 +109,14 @@ internal class IdentityOperationExecutor(
NetworkUtils.ResponseStatusType.UNAUTHORIZED ->
ExecutionResponse(ExecutionResult.FAIL_UNAUTHORIZED)
NetworkUtils.ResponseStatusType.MISSING -> {
// This means either the User or the Alias was already
// deleted, either way the end state is the same, the
// alias no longer exists on that User.
ExecutionResponse(ExecutionResult.SUCCESS)
return if (ex.statusCode == 404 && _newRecordState.isInMissingRetryWindow(lastOperation.onesignalId)) {
ExecutionResponse(ExecutionResult.FAIL_RETRY)
} else {
// This means either the User or the Alias was already
// deleted, either way the end state is the same, the
// alias no longer exists on that User.
ExecutionResponse(ExecutionResult.SUCCESS)
}
}
}
}
Expand Down
Loading

0 comments on commit 12e8359

Please sign in to comment.