-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
throttle
does not emit the latest element at interval occurrence when latest is true
#266
Comments
throttle
does not emit the latest element upon interval occurrence and latest is true
throttle
does not emit the latest element at interval occurrence and latest is true
throttle
does not emit the latest element at interval occurrence and latest is true
throttle
does not emit the latest element at interval occurrence when latest is true
I think the semantics is very unclear on this one. Let's say we publish 0...99 every 100ms with a throttling window of 1 sec.
That feels very strange. I agree that at least there should be an option to produce the last element, if this element was initially skipped in the throttling window. But is there a clear semantics for this concept? |
@notcome given the Combine and Swift Concurrency interoperability and switching from |
@tarbaiev-smg So I did a little search on RxSwift, and it seems that something as simple as throttle actually has lots of subtle variations. Anyway, it seems that adding a clock task, which is required to publish the latest element after signal stabilizing, is very tricky. The current throttle implementation has less than a hundred lines of code, whereas bounce spans near 2k LOC. Stunning… I can't use Combine here because I am moving to concurrency infras for testing. So I end up focusing on my particular case — infinite demand, no error handling — and get away with a very simple actor. It's quite tricky though, Task.init captures your actor without warning (you don't need to write explicit self) and gets cancellation right is hard. |
One would indeed expect to receive the last element when using throttle. A very common use case for example is to throttle a sequence of progress values where receiving the last element (i.e. 100%) is crucial. In this example: var numbers = Array((0...100).reversed())
let stream = AsyncStream {
try! await Task.sleep(nanoseconds: 50_000_000)
return numbers.popLast()
}
Task {
let sequence = stream
.throttle(for: .seconds(1))
for try await num in sequence {
print(num)
}
} The output is: 0 Which is unexpected as the latest element is 100. The problem is that in the public mutating func next() async rethrows -> Reduced? {
var reduced: Reduced?
let start = last ?? clock.now
repeat {
// Return reduced if we wait longer then interval (and reduced was not returned previously)
guard let element = try await base.next() else {
return nil
}
let reduction = await reducing(reduced, element)
let now = clock.now
if start.duration(to: now) >= interval || last == nil {
last = now
return reduction
} else {
reduced = reduction
}
} while true
} |
I also find throttle's implementation confusing. In addition to the behavior of 'latest', given infinite demand, the loop consumes available processing. For example an AsyncSequence that is simply a counter returning immediately the next iteration's value. Maybe I am expecting something I shouldn't but throttling by rapidly consuming a stream's elements, only to discard them, seems incorrect. Throttling should handle the case of over and under producing streams without a severe performance penalty in either case. |
Just as a use case, I attempted to use |
@kdubb I think that is the expected behaviour from I think you want another algorithm for what you describe here. |
The issue for me is that I was switching from a slow producing stream to an infinite producing stream and you need something that can deal with both cases. I attempted to use timer sequence and Finally, I To me the name Maybe we create one called 'polling' that uses a |
Also, adding a doc |
@FranzBusch Thanks for the update! However the PR does not seem to be addressing this issue. The |
Yes, while the PR addresses issues in cases where the sequence finishes (by returning nil), but as @tarbaiev-smg said, we can still await for the next element longer than interval which means that the last element wont be emitted. Here is an updated example of the issue that uses a sequence that waits for the next element longer and is not finishing: var numbers = Array((0...200).reversed())
let stream = AsyncStream<Int> {
guard let number = numbers.popLast() else { return nil }
if number == 101 {
try! await Task.sleep(nanoseconds: 1_000_000_000_000)
} else {
try! await Task.sleep(nanoseconds: 50_000_000)
}
return number
}
Task {
let sequence = stream
.throttle(for: .seconds(1))
for try await num in sequence {
print(num)
}
} prints:
instead of:
|
@Kolos65 Thanks for the example. Could you do me a favour an try to write a validation test that represents what currently is not working like you expect it to work. Validation tests look like this
I am just trying to nail down if we have a problem and what it exactly is and that we are not mixing semantics of |
Let me help there. I believe this validation test should pass but it doesn't:
|
@FranzBusch Here is one I think matches my example: validate {
"-a-b-c-d-e-f-----h-i-j-k-|"
$0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
"-a---c---e---f---h---j---|"
} actual: |
I'm sorry @Kolos65, but I think there is a bug in that. Throttle should reset its timer for each output it yields, not input. If the timer lapses without further input, then the next input after that should be yielded immediately just like the initial input. Like so: validate {
"-a-b-c-d-e-f-----h-i-j-k-|"
$0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
"-a--b--d--e--f---h--i--k-|"
} Notice how there's an output every 3 steps for as long as there's input at least as often; otherwise the gap between outputs can be longer than 3 but no more than necessary. |
after looking into this pretty extensively; the outcome is this - I am currently in favor of removing throttle and deferring it out of the 1.0 since that is the only real remaining task at hand. And to be quite honest there is a lot of discordant expectations for what it ought to do. I agree that the last element was definitely a bug, but the other issues stated have to do with how the interval is counted and for what context it is used. One confusing factor is that all AsyncSequences are applied as a demand of 1; in that each call to next is what drives the production of values. This comes in conflict when measuring time to the next value or the time to the previous value. Quite honestly; it doesn't seem like we have a good consensus on how that should operate. The current implementation is rather straightforward but definitely exposes some behavioral aspects that might be surprising. But it definitely clashes easily on: "is this really a variation on a lower level thing that conjoins the debounce and throttle machinery?" or "how does this effect sendability?" or "does this secretly have a buffer under the hood?". I welcome more clarity here; but the murkier it gets the more likely it seems that this operation (albeit currently rather simple) is not ready. |
I agree with this. Let's remove throttle from 1.0.0 and brainstorm again exactly what semantics we want to implement here. |
A side topic, but I was somewhat surprised to see a 1.0.0 release already. There is no description or a changelog for it. |
We have been working on a 1.0.0 for quite some time now and are just going through the final open issues that are blocking the release. The goal for 1.0.0 is not feature parity or providing all possible algorithms but to have a baseline of algorithms that are working and functional. |
As I see it, based on Combine's throttle, the demand of 1 should be fine. We need to:
Maybe we don't even need to measure time this way 🤔, I don't think we need to be very precise here fighting a race between the timeout and next value. I could try to craft a possible implementation. |
I too was/am expecting Combine's behavior! If all I do with the base sequence is send two values immediately, I'd expect the throttled sequence to return the first immediately, and the second after one interval is elapsed. The implementation in 1.0.0 returns the first immediately and effectively drops the second. |
Is there any update on throttle? Just was surprised by missing our last element as well when attempting to use it in a non-terminating stream. |
+1 Any updates on this issue so far? |
I think part of the problem is the overloading of terms like "throttle". I see it used to mean very different things, and the discussion above demonstrates it quite well. I think it would probably be best, given how loaded the term is, to not use it at all, coming up with new functions that are clearer. To me, in the real world, throttling is about changing the rate of processing. In an engine, you can increase or decrease the revs by changing the throttle. What you never do is "drop" revs (unless your car has issues). What I expected from "throttle" was that it would never drop anything in the sequence, but simply delay it, in order to maintain a maximum density of iteration. This apparently is not what it does. It seems that it indeed drops items. What I would like to see are three different operators... debounce - which waits for a pause in the stream before emitting the latest value, dropping anything in between. |
There are only two hard problems in computer science: naming and cache invalidation 😉. |
I'm afraid I think this is part of the problem. Apple should have come up with their own naming, instead of adopting things from half baked Facebook frameworks and academic work half a century old. They had an opportunity, and they missed it. The naming is quite alienating to most app-level developers. Sure, if you are right into functional or reactive programming, it probably makes sense, but that is not where Apple comes from. Terms like 'debounce' mean very little to someone who has been using UIKit or AppKit (ie most developers). The discussion above demonstrates it quite well IMO. It is pretty clear people have different ideas about what throttle means, and the accepted terminology actually makes no sense. It was a poor choice to begin with. But even if "throttle" is the accepted term, it should be very clear that it drops items in the stream. That should at least be stated in the docs, and not left up to how much experience of the reader has with ReactNative or whatever 3rd party framework is using it. And as far as I can tell, there is still another type of function — a true throttle that doesn't drop items — which is missing. Perhaps it is there, but I can't find it. I assumed "throttle" would be it, but seems it is missing. |
Is there any roadmap for when there'll be a solution for this? I hoped that there was a 1.1 release planned in sync with swift 6, but it seems it doesn't work like this. |
it isn't per se a naming issue but instead a focus of folks working on it; if you want to take this up and drive a discussion on the forums to resolve it id welcome that! |
I had a quick look at this issue (since I'd like to have it resolved for one of my projects). Looking at the On each loop of the
(the current implementation handles only the first case, not the second - which is the source of this issue) The complication though, is that in the case where the sleep elapses first, we don't want to cancel the But Swift concurrency can't do this. None of the Swift structured concurrency components ( What we would need is some way to cancel I think this kind of interruption is what was requested here: https://forums.swift.org/t/why-doesnt-await-task-value-set-up-a-cancellation-handler/57740 but it seems like the need for it wasn't understood so it wasn't implemented. I can definitely see how this might be implemented with a DispatchSemaphore but these have been avoided in Swift Concurrency. I think either a semaphore (or changes to the Swift standard library around Task.value) is required to make this work. But I suspect that DispatchSemaphore is not the right kind of code to submit to this repo. |
Okay, perhaps there is a way to do this. My problem above was trying to solve the problem as a TaskGroup. But using Swift continuations, we have a lot more flexibility. It requires some unstructured child tasks to do it but it seems to work? It's not exactly simple, though. Curious to know if anyone has any thoughts about this approach. It's a drop-in replacement for _AsyncThrottleSequence.Iterator in AsyncThrottleSequence.swift. /// The iterator for an `AsyncThrottleSequence` instance.
public struct Iterator: AsyncIteratorProtocol {
var base: Base.AsyncIterator
var last: C.Instant?
let interval: C.Instant.Duration
let clock: C
let reducing: @Sendable (Reduced?, Base.Element) async -> Reduced
var baseNextTask: Task<Base.Element?, Error>?
enum Emit {
case timer
case next(Base.Element?)
}
enum State {
case waiting(CheckedContinuation<Emit, Error>)
case notWaiting(Result<Emit, Error>?)
}
init(_ base: Base.AsyncIterator, interval: C.Instant.Duration, clock: C, reducing: @Sendable @escaping (Reduced?, Base.Element) async -> Reduced) {
self.base = base
self.interval = interval
self.clock = clock
self.reducing = reducing
}
public mutating func next() async rethrows -> Reduced? {
var reduced: Reduced?
let start = last ?? clock.now
let result = Mutex<State>(.notWaiting(nil))
repeat {
var timerTask: Task<Void, Error>?
do {
let emit = try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Emit, Error>) in
result.withLock { result in
// Check if a previous iteration of the repeat loop has emitted a value
if case .notWaiting(let emittedResult?) = result {
result = .notWaiting(nil)
continuation.resume(with: emittedResult)
return
} else {
// Common case is that we start waiting
result = .waiting(continuation)
}
}
if reduced != nil, let last {
// If we already have a cached reduction start a timer so we can just emit what we already
// have when the timer elapses
timerTask = Task { [interval, last, clock] in
let amount = interval - last.duration(to: clock.now)
if amount > .zero {
try await clock.sleep(until: clock.now.advanced(by: amount), tolerance: nil)
}
result.withLock { result in
// In all cases, emit only if we won the race
guard case .waiting(let continuation) = result else { return }
result = .notWaiting(nil)
continuation.resume(with: .success(Emit.timer))
return
}
}
}
if reduced == nil, let baseNextTask, !baseNextTask.isCancelled {
// If we have no cached reduction and have a baseNextTask from a previous iteration
// Just wait on the baseNextTask since there's no other source of values
Task {
let element = try await baseNextTask.value
result.withLock { result in
guard case .waiting(let continuation) = result else { return }
result = .notWaiting(nil)
continuation.resume(with: .success(.next(element)))
// We can't mutate Iterator.baseNextTask back to nil from here so mark the task
// as cancelled, which should be considered equivalent.
baseNextTask.cancel()
return
}
}
} else if baseNextTask?.isCancelled ?? true {
// We have no "next" task so start a new one
baseNextTask = Task<Base.Element?, Error> { [base] in
var base = base
do {
let element = try await base.next()
result.withLock { result in
switch result {
case .waiting(let continuation):
result = .notWaiting(nil)
continuation.resume(with: .success(.next(element)))
case .notWaiting:
result = .notWaiting(.success(.next(element)))
}
// We can't mutate Iterator.baseNextTask back to nil from here so mark the task
// as cancelled, which should be considered equivalent.
withUnsafeCurrentTask { task in task?.cancel() }
return
}
return element
} catch {
result.withLock { result in
switch result {
case .waiting(let continuation):
result = .notWaiting(nil)
continuation.resume(with: .failure(error))
case .notWaiting:
result = .notWaiting(.failure(error))
}
// We can't mutate Iterator.baseNextTask back to nil from here so mark the task
// as cancelled, which should be considered equivalent.
withUnsafeCurrentTask { task in task?.cancel() }
return
}
throw error
}
}
}
}
} onCancel: {
result.withLock { result in
// On cancellation, immediately complete the continuation (if possible)
guard case .waiting(let continuation) = result else { return }
result = .notWaiting(nil)
continuation.resume(throwing: CancellationError())
}
}
// Clean up our unstructured tasks
timerTask?.cancel()
if let baseNextTask, baseNextTask.isCancelled {
self.baseNextTask = nil
}
switch emit {
case .timer:
// Emit cached values when the timer fires
last = clock.now
return reduced
case .next(let element):
if let element {
// Reduce and cache the value
reduced = await reducing(reduced, element)
// Choose to either emit the new value or continue
// accumulating
let now = clock.now
if start.duration(to: now) >= interval || last == nil {
last = now
return reduced
}
} else {
// End of stream.
if reduced != nil, let last {
// If we're going to emit additional values, make sure we've waited enough
// since the previous values
let amount = interval - last.duration(to: clock.now)
if amount > .zero {
try await clock.sleep(until: clock.now.advanced(by: amount), tolerance: nil)
}
}
return reduced
}
}
} catch is CancellationError {
// Be sure to cancel unstructured tasks on the way out
timerTask?.cancel()
baseNextTask?.cancel()
return nil
}
} while true
}
} |
@mattgallagher Looks promising. Although it's hard to tell for sure without a comprehensive amount of tests. |
Unlike similar operator in
Combine
thethrottle
'slatest
parameter is confusing, as it does not emit any latest, but a next element instead.When
latest
parameter is set totrue
Actual behavior:
The throttle emits the next element from the base sequence, if any, after the interval occurs.
Meaning if there's no new elements emitted after the interval occurrence, the latest element is just skipped, leading to only the first element being emitted.
Expected behavior:
The throttle emits the latest cached element instantly when the interval occurs.
The text was updated successfully, but these errors were encountered: