Skip to content

Commit

Permalink
Add waitUntil(deadline) implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
cheatfate committed Apr 17, 2024
1 parent 0d050d5 commit c6dcad3
Show file tree
Hide file tree
Showing 2 changed files with 346 additions and 1 deletion.
77 changes: 77 additions & 0 deletions chronos/internal/asyncfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,52 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] {.
inline, deprecated: "Use withTimeout(Future[T], Duration)".} =
withTimeout(fut, timeout.milliseconds())

proc waitUntilImpl[F: SomeFuture](fut: F, retFuture: auto,
deadline: auto): auto =
var timeouted = false

template completeFuture(fut: untyped): untyped =
if fut.failed():
retFuture.fail(fut.error(), warn = false)
elif fut.cancelled():
retFuture.cancelAndSchedule()
else:
when type(fut).T is void:
retFuture.complete()
else:
retFuture.complete(fut.value)

proc continuation(udata: pointer) {.raises: [].} =
if not(retFuture.finished()):
if timeouted:
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
return

if not(fut.finished()):
timeouted = true
fut.cancelSoon()
else:
fut.completeFuture()

var cancellation: proc(udata: pointer) {.gcsafe, raises: [].}
cancellation = proc(udata: pointer) {.gcsafe, raises: [].} =
deadline.removeCallback(continuation)
if not(fut.finished()):
fut.cancelSoon()
else:
fut.completeFuture()

if fut.finished():
fut.completeFuture()
else:
if deadline.finished():
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else:
retFuture.cancelCallback = cancellation
fut.addCallback(continuation)
deadline.addCallback(continuation)
retFuture

proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
var
moment: Moment
Expand Down Expand Up @@ -1607,6 +1653,23 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
else:
wait(fut, timeout.milliseconds())

proc waitUntil*[T](fut: Future[T], deadline: auto): Future[T] =
## Returns a future which will complete once future ``fut`` completes
## or if ``deadline`` future completes.
##
## If ``deadline`` future completes before future ``fut`` -
## `AsyncTimeoutError` exception will be raised.
##
## Note that ``deadline`` future will not be cancelled and/or failed.
static:
doAssert deadline is SomeFuture
var
retFuture = newFuture[T]("chronos.waitUntil()",
{FutureFlag.OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.
waitUntilImpl(fut, retFuture, deadline)

proc join*(future: FutureBase): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete once future ``future`` completes.
Expand Down Expand Up @@ -1774,3 +1837,17 @@ proc wait*(fut: InternalRaisesFuture, timeout = InfiniteDuration): auto =
# manually at proper time.

waitImpl(fut, retFuture, timeout)

proc waitUntil*(fut: InternalRaisesFuture,
deadline: InternalRaisesFuture): auto =
type
T = type(fut).T
E = type(fut).E
InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError)

let
retFuture = newFuture[T]("chronos.waitUntil()", {OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.

waitUntilImpl(fut, retFuture, deadline)
270 changes: 269 additions & 1 deletion tests/testfut.nim
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,147 @@ suite "Future[T] behavior test suite":
false
check res

asyncTest "waitUntil[T]() test":
block:
## Test for not immediately completed future and deadline which is not
## going to be finished
let
deadline = newFuture[void]()
future1 = testFuture1()
let res =
try:
discard await waitUntil(future1, deadline)
true
except CatchableError:
false
check:
deadline.finished() == false
future1.finished() == true
res == true

await deadline.cancelAndWait()

check deadline.finished() == true
block:
## Test for immediately completed future and timeout = -1
let
deadline = newFuture[void]()
future2 = testFuture2()
let res =
try:
discard await waitUntil(future2, deadline)
true
except CatchableError:
false
check:
deadline.finished() == false
future2.finished() == true
res

await deadline.cancelAndWait()

check deadline.finished() == true
block:
## Test for not immediately completed future and timeout = 0
let
deadline = newFuture[void]()
future1 = testFuture1()
deadline.complete()
let res =
try:
discard await waitUntil(future1, deadline)
false
except AsyncTimeoutError:
true
except CatchableError:
false
check:
future1.finished() == false
deadline.finished() == true
res

block:
## Test for immediately completed future and timeout = 0
let
deadline = newFuture[void]()
future2 = testFuture2()
deadline.complete()
let (res1, res2) =
try:
let res = await waitUntil(future2, deadline)
(true, res)
except CatchableError:
(false, -1)
check:
future2.finished() == true
deadline.finished() == true
res1 == true
res2 == 1

block:
## Test for future which cannot be completed in timeout period
let
deadline = sleepAsync(50.milliseconds)
future100 = testFuture100()
let res =
try:
discard await waitUntil(future100, deadline)
false
except AsyncTimeoutError:
true
except CatchableError:
false
check:
deadline.finished() == true
res
await future100.cancelAndWait()
check:
future100.finished() == true

block:
## Test for future which will be completed before timeout exceeded.
let
deadline = sleepAsync(500.milliseconds)
future100 = testFuture100()
let (res1, res2) =
try:
let res = await waitUntil(future100, deadline)
(true, res)
except CatchableError:
(false, -1)
check:
future100.finished() == true
deadline.finished() == false
res1 == true
res2 == 0
await deadline.cancelAndWait()
check:
deadline.finished() == true

asyncTest "waitUntil(T) cancellation behavior test":
proc deepTest3(future: Future[void]) {.async.} =
await future

proc deepTest2(future: Future[void]) {.async.} =
await deepTest3(future)

proc deepTest1(future: Future[void]) {.async.} =
await deepTest2(future)

var monitorFuture = newFuture[void]()
var deadlineFuture = newFuture[void]()
block:
var testFuture = deepTest1(monitorFuture)
let waitFut = waitUntil(testFuture, deadlineFuture)
await cancelAndWait(waitFut)
check:
monitorFuture.cancelled() == true
testFuture.cancelled() == true
waitFut.cancelled() == true
deadlineFuture.finished() == false



asyncTest "Discarded result Future[T] test":
var completedFutures = 0

Expand Down Expand Up @@ -1143,7 +1284,39 @@ suite "Future[T] behavior test suite":
fut.state == FutureState.Completed
neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2

asyncTest "Cancellation race test":
asyncTest "Cancellation waitUntil() test":
var neverFlag1, neverFlag2, neverFlag3: bool
var waitProc1, waitProc2: bool
proc neverEndingProc(): Future[void] =
var res = newFuture[void]()
proc continuation(udata: pointer) {.gcsafe.} =
neverFlag2 = true
proc cancellation(udata: pointer) {.gcsafe.} =
neverFlag3 = true
res.addCallback(continuation)
res.cancelCallback = cancellation
result = res
neverFlag1 = true

proc waitProc() {.async.} =
let deadline = sleepAsync(100.milliseconds)
try:
await waitUntil(neverEndingProc(), deadline)
except CancelledError:
waitProc1 = true
except CatchableError:
doAssert(false)
finally:
await cancelAndWait(deadline)
waitProc2 = true

var fut = waitProc()
await cancelAndWait(fut)
check:
fut.state == FutureState.Completed
neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2

asyncTest "Cancellation race() test":
var someFut = newFuture[void]()

proc raceProc(): Future[void] {.async.} =
Expand Down Expand Up @@ -1322,6 +1495,29 @@ suite "Future[T] behavior test suite":

check res

asyncTest "waitUntil(fut) should wait cancellation test":
proc futureNeverEnds(): Future[void] =
newFuture[void]("neverending.future")

proc futureOneLevelMore() {.async.} =
await futureNeverEnds()

var fut = futureOneLevelMore()
let res =
try:
await waitUntil(fut, sleepAsync(100.milliseconds))
false
except AsyncTimeoutError:
# Because `fut` is never-ending Future[T], `wait` should raise
# `AsyncTimeoutError`, but only after `fut` is cancelled.
if fut.cancelled():
true
else:
false
except CatchableError:
false
check res

test "race(zero) test":
var tseq = newSeq[FutureBase]()
var fut1 = race(tseq)
Expand Down Expand Up @@ -1699,6 +1895,78 @@ suite "Future[T] behavior test suite":

check (await testFoo()) == true

asyncTest "waitUntil() cancellation undefined behavior test #1":
proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {.
async.} =
await fooFut
return TestFooConnection()

proc testFoo(fooFut: Future[void]) {.async.} =
let deadline = sleepAsync(10.seconds)
let connection =
try:
let res = await testInnerFoo(fooFut).waitUntil(deadline)
Result[TestFooConnection, int].ok(res)
except CancelledError:
Result[TestFooConnection, int].err(0)
except CatchableError:
Result[TestFooConnection, int].err(1)
finally:
await deadline.cancelAndWait()

check connection.isOk()

var future = newFuture[void]("last.child.future")
var someFut = testFoo(future)
future.complete()
discard someFut.tryCancel()
await someFut

asyncTest "waitUntil() cancellation undefined behavior test #2":
proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {.
async.} =
await fooFut
return TestFooConnection()

proc testMiddleFoo(fooFut: Future[void]): Future[TestFooConnection] {.
async.} =
await testInnerFoo(fooFut)

proc testFoo(fooFut: Future[void]) {.async.} =
let deadline = sleepAsync(10.seconds)
let connection =
try:
let res = await testMiddleFoo(fooFut).waitUntil(deadline)
Result[TestFooConnection, int].ok(res)
except CancelledError:
Result[TestFooConnection, int].err(0)
except CatchableError:
Result[TestFooConnection, int].err(1)
finally:
await deadline.cancelAndWait()
check connection.isOk()

var future = newFuture[void]("last.child.future")
var someFut = testFoo(future)
future.complete()
discard someFut.tryCancel()
await someFut

asyncTest "waitUntil() should allow cancellation test (depends on race())":
proc testFoo(): Future[bool] {.async.} =
let
deadline = sleepAsync(3.seconds)
resFut = sleepAsync(2.seconds).waitUntil(deadline)
timeFut = sleepAsync(1.seconds)
cancelFut = cancelAndWait(resFut)
discard await race(cancelFut, timeFut)
await deadline.cancelAndWait()
if cancelFut.finished():
return (resFut.cancelled() and cancelFut.completed())
false

check (await testFoo()) == true

asyncTest "Cancellation behavior test":
proc testInnerFoo(fooFut: Future[void]) {.async.} =
await fooFut
Expand Down

0 comments on commit c6dcad3

Please sign in to comment.