Skip to content

Commit

Permalink
Allow when(fulfilled:…) to resolve synchronously if possible
Browse files Browse the repository at this point in the history
Fixes #52.
  • Loading branch information
lilyball committed May 22, 2020
1 parent abff130 commit 2e3be3b
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 90 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,14 @@ Unless you explicitly state otherwise, any contribution intentionally submitted
already been resolved this will cause the returned promise to likewise already be resolved ([#50][]).
- Ensure `when(first:cancelRemaining:)` returns an already-cancelled promise if all input promises were previously cancelled, instead of cancelling the
returned promise asynchronously ([#51][]).
- Ensure `when(fulfilled:qos:cancelOnFailure:)` returns an already-resolved promise if either all input promises were previously fulfliled or any input
promise was previously rejected or cancelled ([#52][]).

[#34]: https://github.com/lilyball/Tomorrowland/issues/34 "Add a .mainImmediate context"
[#47]: https://github.com/lilyball/Tomorrowland/issues/47 "Add Promise.Resolver.isCancelled property"
[#50]: https://github.com/lilyball/Tomorrowland/issues/50 "Consider changing timeout's default context to .nowOr(.auto)"
[#51]: https://github.com/lilyball/Tomorrowland/issues/51 "when(first:cancelRemaining:) won't cancel synchronously if all inputs have cancelled"
[#52]: https://github.com/lilyball/Tomorrowland/issues/52 "when(fulfilled:…) should be able to resolve synchronously"

### v1.1.1

Expand Down
49 changes: 28 additions & 21 deletions Sources/ObjC/TWLWhen.m
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,28 @@ @implementation TWLPromise (When)
NSUInteger count = promises.count;
id _Nullable __unsafe_unretained * _Nonnull resultBuffer = (id _Nullable __unsafe_unretained *)calloc((size_t)count, sizeof(id));
dispatch_group_t group = dispatch_group_create();
TWLContext *context = [TWLContext contextForQoS:qosClass];
TWLContext *context = [TWLContext nowOrContext:[TWLContext contextForQoS:qosClass]];
for (NSUInteger i = 0; i < count; ++i) {
TWLPromise *promise = promises[i];
dispatch_group_enter(group);
[promise enqueueCallbackWithoutOneshot:^(id _Nullable value, id _Nullable error, BOOL isSynchronous) {
[context executeIsSynchronous:isSynchronous block:^{
if (value) {
resultBuffer[i] = (__bridge id)CFBridgingRetain(value);
} else if (error) {
if (value) {
resultBuffer[i] = (__bridge id)CFBridgingRetain(value);
} else if (error) {
[context executeIsSynchronous:isSynchronous block:^{
[resolver rejectWithError:error];
[cancelAllInput invoke];
} else {
}];
[cancelAllInput invoke];
} else {
[context executeIsSynchronous:isSynchronous block:^{
[resolver cancel];
[cancelAllInput invoke];
}
dispatch_group_leave(group);
}];
}];
[cancelAllInput invoke];
}
dispatch_group_leave(group);
} willPropagateCancel:YES];
}
dispatch_group_notify(group, dispatch_get_global_queue(qosClass, 0), ^{
dispatch_block_t handler = ^{
@try {
for (NSUInteger i = 0; i < count; ++i) {
if (resultBuffer[i] == NULL) {
Expand All @@ -84,16 +86,21 @@ @implementation TWLPromise (When)
(void)CFBridgingRelease((__bridge CFTypeRef)(resultBuffer[i]));
}
}
});
NSHashTable *boxes = [NSHashTable weakObjectsHashTable];
for (TWLPromise *promise in promises) {
[boxes addObject:promise->_box];
}
[resolver whenCancelRequestedOnContext:TWLContext.immediate handler:^(TWLResolver * _Nonnull resolver) {
for (TWLObjCPromiseBox *box in boxes) {
[box propagateCancel];
};
if (dispatch_group_wait(group, DISPATCH_TIME_NOW) == 0) {
handler();
} else {
dispatch_group_notify(group, dispatch_get_global_queue(qosClass, 0), handler);
NSHashTable *boxes = [NSHashTable weakObjectsHashTable];
for (TWLPromise *promise in promises) {
[boxes addObject:promise->_box];
}
}];
[resolver whenCancelRequestedOnContext:TWLContext.immediate handler:^(TWLResolver * _Nonnull resolver) {
for (TWLObjCPromiseBox *box in boxes) {
[box propagateCancel];
}
}];
}
return resultPromise;
}

Expand Down
169 changes: 100 additions & 69 deletions Sources/When.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,28 @@ public func when<Value,Error>(fulfilled promises: [Promise<Value,Error>], qos: D
var resultBuffer = UnsafeMutablePointer<Value?>.allocate(capacity: count)
resultBuffer.initialize(repeating: nil, count: count)
let group = DispatchGroup()
let context = PromiseContext(qos: qos)
let context = PromiseContext.nowOr(.init(qos: qos))
for (i, promise) in promises.enumerated() {
group.enter()
promise._seal._enqueue { (result, isSynchronous) in
context.execute(isSynchronous: isSynchronous) {
switch result {
case .value(let value):
resultBuffer[i] = value
case .error(let error):
switch result {
case .value(let value):
resultBuffer[i] = value
case .error(let error):
context.execute(isSynchronous: isSynchronous) {
resolver.reject(with: error)
cancelAllInput?.invoke()
case .cancelled:
}
cancelAllInput?.invoke()
case .cancelled:
context.execute(isSynchronous: isSynchronous) {
resolver.cancel()
cancelAllInput?.invoke()
}
group.leave()
cancelAllInput?.invoke()
}
group.leave()
}
}
group.notify(queue: .global(qos: qos)) {
let handler: @convention(block) () -> Void = {
defer {
resultBuffer.deinitialize(count: count)
resultBuffer.deallocate()
Expand All @@ -90,9 +92,14 @@ public func when<Value,Error>(fulfilled promises: [Promise<Value,Error>], qos: D
}
resolver.fulfill(with: Array(results))
}
resolver.onRequestCancel(on: .immediate) { [boxes=promises.map({ Weak($0._box) })] (resolver) in
for box in boxes {
box.value?.propagateCancel()
if group.wait(timeout: DispatchTime(uptimeNanoseconds: 0)) == .success {
handler()
} else {
group.notify(queue: .global(qos: qos), execute: handler)
resolver.onRequestCancel(on: .immediate) { [boxes=promises.map({ Weak($0._box) })] (resolver) in
for box in boxes {
box.value?.propagateCancel()
}
}
}
return resultPromise
Expand Down Expand Up @@ -161,7 +168,7 @@ public func when<Value1,Value2,Value3,Value4,Value5,Value6,Error>(fulfilled a: P
}
}

let context = PromiseContext(qos: qos)
let context = PromiseContext.nowOr(.init(qos: qos))
group.enter()
tap(a, on: context, { resolver.handleResult($0, output: &aResult, cancelAllInput: cancelAllInput); group.leave() })
group.enter()
Expand All @@ -175,23 +182,28 @@ public func when<Value1,Value2,Value3,Value4,Value5,Value6,Error>(fulfilled a: P
group.enter()
tap(f, on: context, { resolver.handleResult($0, output: &fResult, cancelAllInput: cancelAllInput); group.leave() })

group.notify(queue: .global(qos: qos)) {
let handler: @convention(block) () -> Void = {
guard let a = aResult, let b = bResult, let c = cResult, let d = dResult, let e = eResult, let f = fResult else {
// Must have had a rejected or cancelled promise
return
}
resolver.fulfill(with: (a,b,c,d,e,f))
}
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box, weak boxC=c._box, weak boxD=d._box, weak boxE=e._box, weak boxF=f._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
boxC?.propagateCancel()
boxD?.propagateCancel()
boxE?.propagateCancel()
boxF?.propagateCancel()
})
if group.wait(timeout: DispatchTime(uptimeNanoseconds: 0)) == .success {
handler()
} else {
group.notify(queue: .global(qos: qos), execute: handler)
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box, weak boxC=c._box, weak boxD=d._box, weak boxE=e._box, weak boxF=f._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
boxC?.propagateCancel()
boxD?.propagateCancel()
boxE?.propagateCancel()
boxF?.propagateCancel()
})
}
return resultPromise
}

Expand Down Expand Up @@ -254,7 +266,7 @@ public func when<Value1,Value2,Value3,Value4,Value5,Error>(fulfilled a: Promise<
}
}

let context = PromiseContext(qos: qos)
let context = PromiseContext.nowOr(.init(qos: qos))
group.enter()
tap(a, on: context, { resolver.handleResult($0, output: &aResult, cancelAllInput: cancelAllInput); group.leave() })
group.enter()
Expand All @@ -266,22 +278,27 @@ public func when<Value1,Value2,Value3,Value4,Value5,Error>(fulfilled a: Promise<
group.enter()
tap(e, on: context, { resolver.handleResult($0, output: &eResult, cancelAllInput: cancelAllInput); group.leave() })

group.notify(queue: .global(qos: qos)) {
let handler: @convention(block) () -> Void = {
guard let a = aResult, let b = bResult, let c = cResult, let d = dResult, let e = eResult else {
// Must have had a rejected or cancelled promise
return
}
resolver.fulfill(with: (a,b,c,d,e))
}
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box, weak boxC=c._box, weak boxD=d._box, weak boxE=e._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
boxC?.propagateCancel()
boxD?.propagateCancel()
boxE?.propagateCancel()
})
if group.wait(timeout: DispatchTime(uptimeNanoseconds: 0)) == .success {
handler()
} else {
group.notify(queue: .global(qos: qos), execute: handler)
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box, weak boxC=c._box, weak boxD=d._box, weak boxE=e._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
boxC?.propagateCancel()
boxD?.propagateCancel()
boxE?.propagateCancel()
})
}
return resultPromise
}

Expand Down Expand Up @@ -341,7 +358,7 @@ public func when<Value1,Value2,Value3,Value4,Error>(fulfilled a: Promise<Value1,
}
}

let context = PromiseContext(qos: qos)
let context = PromiseContext.nowOr(.init(qos: qos))
group.enter()
tap(a, on: context, { resolver.handleResult($0, output: &aResult, cancelAllInput: cancelAllInput); group.leave() })
group.enter()
Expand All @@ -351,21 +368,26 @@ public func when<Value1,Value2,Value3,Value4,Error>(fulfilled a: Promise<Value1,
group.enter()
tap(d, on: context, { resolver.handleResult($0, output: &dResult, cancelAllInput: cancelAllInput); group.leave() })

group.notify(queue: .global(qos: qos)) {
let handler: @convention(block) () -> Void = {
guard let a = aResult, let b = bResult, let c = cResult, let d = dResult else {
// Must have had a rejected or cancelled promise
return
}
resolver.fulfill(with: (a,b,c,d))
}
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box, weak boxC=c._box, weak boxD=d._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
boxC?.propagateCancel()
boxD?.propagateCancel()
})
if group.wait(timeout: DispatchTime(uptimeNanoseconds: 0)) == .success {
handler()
} else {
group.notify(queue: .global(qos: qos), execute: handler)
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box, weak boxC=c._box, weak boxD=d._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
boxC?.propagateCancel()
boxD?.propagateCancel()
})
}
return resultPromise
}

Expand Down Expand Up @@ -422,28 +444,33 @@ public func when<Value1,Value2,Value3,Error>(fulfilled a: Promise<Value1,Error>,
}
}

let context = PromiseContext(qos: qos)
let context = PromiseContext.nowOr(.init(qos: qos))
group.enter()
tap(a, on: context, { resolver.handleResult($0, output: &aResult, cancelAllInput: cancelAllInput); group.leave() })
group.enter()
tap(b, on: context, { resolver.handleResult($0, output: &bResult, cancelAllInput: cancelAllInput); group.leave() })
group.enter()
tap(c, on: context, { resolver.handleResult($0, output: &cResult, cancelAllInput: cancelAllInput); group.leave() })

group.notify(queue: .global(qos: qos)) {
let handler: @convention(block) () -> Void = {
guard let a = aResult, let b = bResult, let c = cResult else {
// Must have had a rejected or cancelled promise
return
}
resolver.fulfill(with: (a,b,c))
}
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box, weak boxC=c._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
boxC?.propagateCancel()
})
if group.wait(timeout: DispatchTime(uptimeNanoseconds: 0)) == .success {
handler()
} else {
group.notify(queue: .global(qos: qos), execute: handler)
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box, weak boxC=c._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
boxC?.propagateCancel()
})
}
return resultPromise
}

Expand Down Expand Up @@ -497,25 +524,30 @@ public func when<Value1,Value2,Error>(fulfilled a: Promise<Value1,Error>,
}
}

let context = PromiseContext(qos: qos)
let context = PromiseContext.nowOr(.init(qos: qos))
group.enter()
tap(a, on: context, { resolver.handleResult($0, output: &aResult, cancelAllInput: cancelAllInput); group.leave() })
group.enter()
tap(b, on: context, { resolver.handleResult($0, output: &bResult, cancelAllInput: cancelAllInput); group.leave() })

group.notify(queue: .global(qos: qos)) {
let handler: @convention(block) () -> Void = {
guard let a = aResult, let b = bResult else {
// Must have had a rejected or cancelled promise
return
}
resolver.fulfill(with: (a,b))
}
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
})
if group.wait(timeout: DispatchTime(uptimeNanoseconds: 0)) == .success {
handler()
} else {
group.notify(queue: .global(qos: qos), execute: handler)
resolver.onRequestCancel(on: .immediate, {
[weak boxA=a._box, weak boxB=b._box]
(resolver) in
boxA?.propagateCancel()
boxB?.propagateCancel()
})
}
return resultPromise
}

Expand Down Expand Up @@ -585,15 +617,14 @@ public func when<Value,Error>(first promises: [Promise<Value,Error>], cancelRema
// DispatchTime(uptimeNanoseconds: 0) produces DISPATCH_TIME_NOW and is faster than using .now()
if group.wait(timeout: DispatchTime(uptimeNanoseconds: 0)) == .success {
resolver.cancel()
return newPromise
} else {
group.notify(queue: .global(qos: .utility)) {
resolver.cancel()
}
}
resolver.onRequestCancel(on: .immediate) { [boxes=promises.map({ Weak($0._box) })] (resolver) in
for box in boxes {
box.value?.propagateCancel()
resolver.onRequestCancel(on: .immediate) { [boxes=promises.map({ Weak($0._box) })] (resolver) in
for box in boxes {
box.value?.propagateCancel()
}
}
}
return newPromise
Expand Down
Loading

0 comments on commit 2e3be3b

Please sign in to comment.