Skip to content

Commit

Permalink
Refactor ResourcePool to potentially fix contention bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
saket committed Jan 7, 2025
1 parent 8a14941 commit a11265e
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@file:Suppress("FunctionName", "NAME_SHADOWING")
@file:Suppress("FunctionName")

package me.saket.telephoto.subsamplingimage.internal

Expand All @@ -9,7 +9,10 @@ import androidx.annotation.VisibleForTesting
import androidx.compose.ui.unit.IntRect
import androidx.compose.ui.unit.IntSize
import androidx.core.content.getSystemService
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import me.saket.telephoto.subsamplingimage.SubSamplingImageSource
import me.saket.telephoto.subsamplingimage.internal.ImageRegionDecoder.DecodeResult

Expand All @@ -30,7 +33,7 @@ internal class PooledAndroidImageRegionDecoder private constructor(
}

override fun close() {
decoders.resources.forEach { it.close() }
decoders.tryClose()
}

companion object {
Expand Down Expand Up @@ -79,17 +82,23 @@ internal class PooledAndroidImageRegionDecoder private constructor(
}
}

private class ResourcePool<T>(val resources: List<T>) {
private val channel = Channel<T>(Channel.UNLIMITED).apply {
resources.forEach(::trySend)
}
internal class ResourcePool<T>(resources: List<T>) {
private val resources = ArrayDeque(resources)
private val semaphore = Semaphore(permits = resources.size)
private val mutex = Mutex()

suspend fun <R> borrow(handler: suspend (T) -> R): R {
val borrowed = channel.receive()
return try {
handler(borrowed)
} finally {
channel.send(borrowed)
return semaphore.withPermit {
val borrowed = mutex.withLock { resources.removeFirst() }
try {
handler(borrowed)
} finally {
mutex.withLock { resources.addLast(borrowed) }
}
}
}

fun tryClose() {
resources.clear()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package me.saket.telephoto.subsamplingimage.internal

import assertk.assertThat
import assertk.assertions.containsExactly
import assertk.assertions.hasSize
import assertk.assertions.isEqualTo
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Test
import kotlin.random.Random

class ResourcePoolTest {

@Test fun `first in last out`() = runBlocking {
val pool = ResourcePool(listOf("a", "b"))
repeat(5) {
assertThat(pool.borrow { it }).isEqualTo("a")
assertThat(pool.borrow { it }).isEqualTo("b")
}
}

@Test fun `resources should be released even if borrows are canceled`() = runBlocking {
val pool = ResourcePool(listOf("resource1", "resource2"))
val borrow1 = launch {
pool.borrow { delay(500) }
}
val borrow2 = launch {
pool.borrow { delay(500) }
}

// Cancel one of the jobs to simulate a coroutine being cancelled.
delay(100)
borrow1.cancel()
borrow2.join()

assertThat(pool.removeItems(2)).containsExactly("resource1", "resource2")
}

@Test fun `simulate high contention for a single resource`() = runBlocking {
val pool = ResourcePool(listOf("resource1"))

// Launch multiple coroutines borrowing the single resource.
val jobs = List(10) {
launch(Dispatchers.Default) {
pool.borrow {
delay(Random.nextLong(100, 500)) // Simulate work.
}
}
}

// Randomly cancel some jobs to simulate stress-induced cancellation?
delay(500)
jobs.shuffled().take(jobs.size / 2).forEach { it.cancel() }

jobs.joinAll()
assertThat(pool.removeItems(1)).hasSize(1)
}

private suspend fun <T> ResourcePool<T>.removeItems(size: Int): List<T> {
val pool = this
return List(size) { pool.borrow { it } }.also {
pool.tryClose()
}
}
}

0 comments on commit a11265e

Please sign in to comment.