diff --git a/chronos/internal/asyncfutures.nim b/chronos/internal/asyncfutures.nim index 7f93b0e15..8220dc2fb 100644 --- a/chronos/internal/asyncfutures.nim +++ b/chronos/internal/asyncfutures.nim @@ -1522,6 +1522,52 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] {. inline, deprecated: "Use withTimeout(Future[T], Duration)".} = withTimeout(fut, timeout.milliseconds()) +proc waitUntilImpl[F: SomeFuture](fut: F, retFuture: auto, + deadline: auto): auto = + var timeouted = false + + template completeFuture(fut: untyped): untyped = + if fut.failed(): + retFuture.fail(fut.error(), warn = false) + elif fut.cancelled(): + retFuture.cancelAndSchedule() + else: + when type(fut).T is void: + retFuture.complete() + else: + retFuture.complete(fut.value) + + proc continuation(udata: pointer) {.raises: [].} = + if not(retFuture.finished()): + if timeouted: + retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) + return + + if not(fut.finished()): + timeouted = true + fut.cancelSoon() + else: + fut.completeFuture() + + var cancellation: proc(udata: pointer) {.gcsafe, raises: [].} + cancellation = proc(udata: pointer) {.gcsafe, raises: [].} = + deadline.removeCallback(continuation) + if not(fut.finished()): + fut.cancelSoon() + else: + fut.completeFuture() + + if fut.finished(): + fut.completeFuture() + else: + if deadline.finished(): + retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) + else: + retFuture.cancelCallback = cancellation + fut.addCallback(continuation) + deadline.addCallback(continuation) + retFuture + proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto = var moment: Moment @@ -1607,6 +1653,23 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {. else: wait(fut, timeout.milliseconds()) +proc waitUntil*[T](fut: Future[T], deadline: auto): Future[T] = + ## Returns a future which will complete once future ``fut`` completes + ## or if ``deadline`` future completes. + ## + ## If ``deadline`` future completes before future ``fut`` - + ## `AsyncTimeoutError` exception will be raised. + ## + ## Note that ``deadline`` future will not be cancelled and/or failed. + static: + doAssert deadline is SomeFuture + var + retFuture = newFuture[T]("chronos.waitUntil()", + {FutureFlag.OwnCancelSchedule}) + # We set `OwnCancelSchedule` flag, because we going to cancel `retFuture` + # manually at proper time. + waitUntilImpl(fut, retFuture, deadline) + proc join*(future: FutureBase): Future[void] {. async: (raw: true, raises: [CancelledError]).} = ## Returns a future which will complete once future ``future`` completes. @@ -1774,3 +1837,17 @@ proc wait*(fut: InternalRaisesFuture, timeout = InfiniteDuration): auto = # manually at proper time. waitImpl(fut, retFuture, timeout) + +proc waitUntil*(fut: InternalRaisesFuture, + deadline: InternalRaisesFuture): auto = + type + T = type(fut).T + E = type(fut).E + InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError) + + let + retFuture = newFuture[T]("chronos.waitUntil()", {OwnCancelSchedule}) + # We set `OwnCancelSchedule` flag, because we going to cancel `retFuture` + # manually at proper time. + + waitUntilImpl(fut, retFuture, deadline) diff --git a/tests/testfut.nim b/tests/testfut.nim index 1cf0aed5f..fdf32a198 100644 --- a/tests/testfut.nim +++ b/tests/testfut.nim @@ -146,6 +146,147 @@ suite "Future[T] behavior test suite": false check res + asyncTest "waitUntil[T]() test": + block: + ## Test for not immediately completed future and deadline which is not + ## going to be finished + let + deadline = newFuture[void]() + future1 = testFuture1() + let res = + try: + discard await waitUntil(future1, deadline) + true + except CatchableError: + false + check: + deadline.finished() == false + future1.finished() == true + res == true + + await deadline.cancelAndWait() + + check deadline.finished() == true + block: + ## Test for immediately completed future and timeout = -1 + let + deadline = newFuture[void]() + future2 = testFuture2() + let res = + try: + discard await waitUntil(future2, deadline) + true + except CatchableError: + false + check: + deadline.finished() == false + future2.finished() == true + res + + await deadline.cancelAndWait() + + check deadline.finished() == true + block: + ## Test for not immediately completed future and timeout = 0 + let + deadline = newFuture[void]() + future1 = testFuture1() + deadline.complete() + let res = + try: + discard await waitUntil(future1, deadline) + false + except AsyncTimeoutError: + true + except CatchableError: + false + check: + future1.finished() == false + deadline.finished() == true + res + + block: + ## Test for immediately completed future and timeout = 0 + let + deadline = newFuture[void]() + future2 = testFuture2() + deadline.complete() + let (res1, res2) = + try: + let res = await waitUntil(future2, deadline) + (true, res) + except CatchableError: + (false, -1) + check: + future2.finished() == true + deadline.finished() == true + res1 == true + res2 == 1 + + block: + ## Test for future which cannot be completed in timeout period + let + deadline = sleepAsync(50.milliseconds) + future100 = testFuture100() + let res = + try: + discard await waitUntil(future100, deadline) + false + except AsyncTimeoutError: + true + except CatchableError: + false + check: + deadline.finished() == true + res + await future100.cancelAndWait() + check: + future100.finished() == true + + block: + ## Test for future which will be completed before timeout exceeded. + let + deadline = sleepAsync(500.milliseconds) + future100 = testFuture100() + let (res1, res2) = + try: + let res = await waitUntil(future100, deadline) + (true, res) + except CatchableError: + (false, -1) + check: + future100.finished() == true + deadline.finished() == false + res1 == true + res2 == 0 + await deadline.cancelAndWait() + check: + deadline.finished() == true + + asyncTest "waitUntil(T) cancellation behavior test": + proc deepTest3(future: Future[void]) {.async.} = + await future + + proc deepTest2(future: Future[void]) {.async.} = + await deepTest3(future) + + proc deepTest1(future: Future[void]) {.async.} = + await deepTest2(future) + + var monitorFuture = newFuture[void]() + var deadlineFuture = newFuture[void]() + block: + var testFuture = deepTest1(monitorFuture) + let waitFut = waitUntil(testFuture, deadlineFuture) + await cancelAndWait(waitFut) + check: + monitorFuture.cancelled() == true + testFuture.cancelled() == true + waitFut.cancelled() == true + deadlineFuture.finished() == false + + + asyncTest "Discarded result Future[T] test": var completedFutures = 0 @@ -1143,7 +1284,39 @@ suite "Future[T] behavior test suite": fut.state == FutureState.Completed neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2 - asyncTest "Cancellation race test": + asyncTest "Cancellation waitUntil() test": + var neverFlag1, neverFlag2, neverFlag3: bool + var waitProc1, waitProc2: bool + proc neverEndingProc(): Future[void] = + var res = newFuture[void]() + proc continuation(udata: pointer) {.gcsafe.} = + neverFlag2 = true + proc cancellation(udata: pointer) {.gcsafe.} = + neverFlag3 = true + res.addCallback(continuation) + res.cancelCallback = cancellation + result = res + neverFlag1 = true + + proc waitProc() {.async.} = + let deadline = sleepAsync(100.milliseconds) + try: + await waitUntil(neverEndingProc(), deadline) + except CancelledError: + waitProc1 = true + except CatchableError: + doAssert(false) + finally: + await cancelAndWait(deadline) + waitProc2 = true + + var fut = waitProc() + await cancelAndWait(fut) + check: + fut.state == FutureState.Completed + neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2 + + asyncTest "Cancellation race() test": var someFut = newFuture[void]() proc raceProc(): Future[void] {.async.} = @@ -1322,6 +1495,29 @@ suite "Future[T] behavior test suite": check res + asyncTest "waitUntil(fut) should wait cancellation test": + proc futureNeverEnds(): Future[void] = + newFuture[void]("neverending.future") + + proc futureOneLevelMore() {.async.} = + await futureNeverEnds() + + var fut = futureOneLevelMore() + let res = + try: + await waitUntil(fut, sleepAsync(100.milliseconds)) + false + except AsyncTimeoutError: + # Because `fut` is never-ending Future[T], `wait` should raise + # `AsyncTimeoutError`, but only after `fut` is cancelled. + if fut.cancelled(): + true + else: + false + except CatchableError: + false + check res + test "race(zero) test": var tseq = newSeq[FutureBase]() var fut1 = race(tseq) @@ -1699,6 +1895,78 @@ suite "Future[T] behavior test suite": check (await testFoo()) == true + asyncTest "waitUntil() cancellation undefined behavior test #1": + proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await fooFut + return TestFooConnection() + + proc testFoo(fooFut: Future[void]) {.async.} = + let deadline = sleepAsync(10.seconds) + let connection = + try: + let res = await testInnerFoo(fooFut).waitUntil(deadline) + Result[TestFooConnection, int].ok(res) + except CancelledError: + Result[TestFooConnection, int].err(0) + except CatchableError: + Result[TestFooConnection, int].err(1) + finally: + await deadline.cancelAndWait() + + check connection.isOk() + + var future = newFuture[void]("last.child.future") + var someFut = testFoo(future) + future.complete() + discard someFut.tryCancel() + await someFut + + asyncTest "waitUntil() cancellation undefined behavior test #2": + proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await fooFut + return TestFooConnection() + + proc testMiddleFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await testInnerFoo(fooFut) + + proc testFoo(fooFut: Future[void]) {.async.} = + let deadline = sleepAsync(10.seconds) + let connection = + try: + let res = await testMiddleFoo(fooFut).waitUntil(deadline) + Result[TestFooConnection, int].ok(res) + except CancelledError: + Result[TestFooConnection, int].err(0) + except CatchableError: + Result[TestFooConnection, int].err(1) + finally: + await deadline.cancelAndWait() + check connection.isOk() + + var future = newFuture[void]("last.child.future") + var someFut = testFoo(future) + future.complete() + discard someFut.tryCancel() + await someFut + + asyncTest "waitUntil() should allow cancellation test (depends on race())": + proc testFoo(): Future[bool] {.async.} = + let + deadline = sleepAsync(3.seconds) + resFut = sleepAsync(2.seconds).waitUntil(deadline) + timeFut = sleepAsync(1.seconds) + cancelFut = cancelAndWait(resFut) + discard await race(cancelFut, timeFut) + await deadline.cancelAndWait() + if cancelFut.finished(): + return (resFut.cancelled() and cancelFut.completed()) + false + + check (await testFoo()) == true + asyncTest "Cancellation behavior test": proc testInnerFoo(fooFut: Future[void]) {.async.} = await fooFut