Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress bar failures when handling async, concurrent updates #204

Closed
aSemy opened this issue Aug 17, 2024 · 5 comments · Fixed by #205
Closed

Progress bar failures when handling async, concurrent updates #204

aSemy opened this issue Aug 17, 2024 · 5 comments · Fixed by #205

Comments

@aSemy
Copy link
Contributor

aSemy commented Aug 17, 2024

I would like to use a progress bar to track the status of many asynchronous tasks.

However, when I try to use it, I unfortunately encounter a ConcurrentModificationException.

From looking at the current code, I see some code that I think would cause issues. For example, updating completed doesn't have any of the recommended protections for mutable state.

    progress.update {
        context = state
        completed += 1
    }

Could progress bar be updated to be thread-safe?

ConcurrentModificationException Example

import com.github.ajalt.mordant.animation.coroutines.animateInCoroutine
import com.github.ajalt.mordant.animation.progress.advance
import com.github.ajalt.mordant.rendering.TextColors.magenta
import com.github.ajalt.mordant.terminal.Terminal
import com.github.ajalt.mordant.widgets.progress.*
import kotlinx.coroutines.*
import kotlin.random.Random
import kotlin.random.nextInt
import kotlin.time.Duration.Companion.milliseconds

fun main(): Unit = runBlocking {

    val terminal = Terminal()

    val size = 10_000

    val progress = progressBarLayout {
        percentage()
        progressBar()
        completed(style = terminal.theme.success)
        speed("op/s", style = terminal.theme.info)
        timeRemaining(style = magenta)
    }.animateInCoroutine(
        terminal,
        total = size.toLong(),
    )

    launch { progress.execute() }

    withContext(Dispatchers.IO) {
        repeat(size) {
            launch {
                fooTask()
                progress.advance(1)
//                progress.update { completed++ }
            }
        }
    }
}


suspend fun fooTask() {
    delay(Random.nextInt(5..100).milliseconds)
}
  7%  ━━━━━━━━━━ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━     0.7/10.0K    2.6Kop/s  eta 0:00:03
Exception in thread "main" java.util.ConcurrentModificationException: Failed to update state due to concurrent updates
	at com.github.ajalt.mordant.animation.progress.ProgressTaskImpl.update(MultiProgressBarAnimation.kt:276)
	at com.github.ajalt.mordant.animation.coroutines.CoroutineProgressTaskAnimatorImpl.update(CoroutineAnimator.kt)
	at com.github.ajalt.mordant.animation.progress.ProgressBarAnimationKt.advance(ProgressBarAnimation.kt:84)
	at com.github.ajalt.mordant.samples.MainKt$main$1$2$1$1.invokeSuspend(main.kt:43)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:111)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:99)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:811)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:715)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:702)
	Suppressed: java.util.ConcurrentModificationException: Failed to update state due to concurrent updates
		... 12 more
	Suppressed: java.util.ConcurrentModificationException: Failed to update state due to concurrent updates
		... 12 more
	Suppressed: java.util.ConcurrentModificationException: Failed to update state due to concurrent updates
		... 12 more

  7%  ━━━━━━━━━━ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━     0.7/10.0K    2.6Kop/s  eta 0:00:03
@ajalt
Copy link
Owner

ajalt commented Aug 17, 2024

Progress bars are thread safe: they have no mutable state other than a single atomic ref. Calling update{} does an atomic compare and swap. Under high concurrency, you might have to retry the swap.

That exception is thrown by mordant to be conservative and avoid excessive spinning. I didn't expect anyone to try to update a progress bar from 10000 tasks concurrently, but that limit is safe to remove.

@aSemy
Copy link
Contributor Author

aSemy commented Aug 17, 2024

Thanks for the quick response!

Will while (true) {} cause performance problems?

Perhaps I'm asking a lot of Mordant and 10k tasks is too many, but I'm noticing slower performance with fewer tasks too. Is performance something you've looked at?

@aSemy
Copy link
Contributor Author

aSemy commented Aug 17, 2024

I ran the IntelliJ Profiler on my example above and added your spinlock fix, and I noticed that while update() did take a lot of time,

image

interestingly samples.dropWhile {} seems to take longer.

val samples = samples.dropWhile { it.time < oldestSampleTime } + entry

image

Flamegraph:

image

ajalt added a commit that referenced this issue Aug 18, 2024
@ajalt
Copy link
Owner

ajalt commented Aug 18, 2024

Thanks for the profile info, that's good to know. The while(true) is safe in that it won't deadlock, but I'll think about a more performant solution.

@aSemy
Copy link
Contributor Author

aSemy commented Aug 18, 2024

I did some playing around and these changes help a lot with performance, though they might be difficult to decipher and maintain.

  • Convert to a mutable list, because .add() on a fixed list is quite expensive.
  • Because the samples are ordered, use a binary search to find the index of a non-matching element.
  • Use subList().clear() to drop samples by index, without making a new copy.
    // Remove samples older than the speed estimate duration
    val oldestSampleTime = timeSource.markNow() - speedEstimateDuration
    val entry = HistoryEntry(timeSource.markNow(), scope.completed)
    
    // convert to a mutable list to improve performance
    val samples = samples.toMutableList()
    
    val indexOfOldestSample = samples.binarySearchBy(oldestSampleTime) { it.elapsed }
    samples.subList(indexOfOldestSample.coerceAtLeast(0), samples.size).clear()
    
    samples.add(entry)

There's probably a better way, but I thought I'd share this anyway.

Something else that caught my eye: I see that TimeSource.WithComparableMarks is used a lot. I suspect that using a TimeMarks to mark the start of the progress bar, each task, and the samples history would be better suited, since the times don't appear to need to be absolute, but instead can be relative compared to the start of the progress bar. But that's a weak suspicion.

ajalt added a commit that referenced this issue Aug 18, 2024
Keeping track of progress history was taking a lot of CPU load if
updates were happening very frequently.

@aSemy suggested an improvement in
#204 (comment), and
this PR goes further:

Instead of keeping a list with an entry for each call to `update`, we
instead keep a fixed size list, and overwrite the last entry if it was
very recent. We only look at the first and last entries to calculate
speed, so there's no need to keep every entry.

```
Benchmark                        Mode  Cnt      Score      Error  Units
JmhBenchmark.benchmarkCurrent    avgt    3  34931.106 ± 1299.242  ns/op
JmhBenchmark.benchmarkAsemy      avgt    3   5474.108 ±  634.242  ns/op
JmhBenchmark.benchmarkFixedList  avgt    3    176.047 ±   53.293  ns/op
```

This change is a 350x speedup on the benchmarks I ran, which seems fast
enough.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants