Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thenApply and thenApplyAsync methods for managed completable future #1394

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -616,29 +616,65 @@ public <U> ManagedCompletableFuture<Void> thenAcceptBothAsync(CompletionStage<?
* @see java.util.concurrent.CompletionStage#thenApply(java.util.function.Function)
*/
@Override
public <U> ManagedCompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
// TODO contextualize fn and other steps
return new ManagedCompletableFuture<U>(completableFuture.thenApply(fn), defaultExecutor);
@SuppressWarnings("unchecked")
public <R> ManagedCompletableFuture<R> thenApply(Function<? super T, ? extends R> action) {
// Reject ManagedTask so that we have the flexibility to decide later how to handle ManagedTaskListener and execution properties
if (action instanceof ManagedTask)
throw new IllegalArgumentException(ManagedTask.class.getName());

WSContextService contextSvc = defaultExecutor.getContextService();

ThreadContextDescriptor contextDescriptor = contextSvc.captureThreadContext(XPROPS_SUSPEND_TRAN);
action = contextSvc.createContextualProxy(contextDescriptor, action, Function.class);

CompletableFuture<R> dependentStage = completableFuture.thenApply(action);
return new ManagedCompletableFuture<R>(dependentStage, defaultExecutor);
}

/**
* @see java.util.concurrent.CompletionStage#thenApplyAsync(java.util.function.Function)
*/
@Override
public <U> ManagedCompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
// TODO contextualize fn and other steps
Executor policyExecutor = (Executor) defaultExecutor; // TODO get policy executor from default executor
return new ManagedCompletableFuture<U>(completableFuture.thenApplyAsync(fn, policyExecutor), defaultExecutor);
@SuppressWarnings("unchecked")
public <R> ManagedCompletableFuture<R> thenApplyAsync(Function<? super T, ? extends R> action) {
// Reject ManagedTask so that we have the flexibility to decide later how to handle ManagedTaskListener and execution properties
if (action instanceof ManagedTask)
throw new IllegalArgumentException(ManagedTask.class.getName());

PolicyExecutor policyExecutor = defaultExecutor.getNormalPolicyExecutor(); // TODO choose based on LONGRUNNING_HINT execution property
WSContextService contextSvc = defaultExecutor.getContextService();

ThreadContextDescriptor contextDescriptor = contextSvc.captureThreadContext(XPROPS_SUSPEND_TRAN);
action = contextSvc.createContextualProxy(contextDescriptor, action, Function.class);

CompletableFuture<R> dependentStage = completableFuture.thenApplyAsync(action, policyExecutor);
return new ManagedCompletableFuture<R>(dependentStage, defaultExecutor);
}

/**
* @see java.util.concurrent.CompletionStage#thenApplyAsync(java.util.function.Function, java.util.concurrent.Executor)
*/
@Override
public <U> ManagedCompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
// TODO contextualize fn and other steps
Executor policyExecutor = executor; // TODO get policy executor from the supplied executor, if a managed executor
return new ManagedCompletableFuture<U>(completableFuture.thenApplyAsync(fn, policyExecutor), defaultExecutor);
@SuppressWarnings("unchecked")
public <R> ManagedCompletableFuture<R> thenApplyAsync(Function<? super T, ? extends R> action, Executor executor) {
CompletableFuture<R> dependentStage;
if (executor instanceof ManagedExecutorService) { // the only type of managed executor implementation allowed here is the built-in one
// Reject ManagedTask so that we have the flexibility to decide later how to handle ManagedTaskListener and execution properties
if (action instanceof ManagedTask)
throw new IllegalArgumentException(ManagedTask.class.getName());

WSManagedExecutorService managedExecutor = (WSManagedExecutorService) executor;
PolicyExecutor policyExecutor = managedExecutor.getNormalPolicyExecutor(); // TODO choose based on LONGRUNNING_HINT execution property
WSContextService contextSvc = managedExecutor.getContextService();

ThreadContextDescriptor contextDescriptor = contextSvc.captureThreadContext(XPROPS_SUSPEND_TRAN);
action = contextSvc.createContextualProxy(contextDescriptor, action, Function.class);

dependentStage = completableFuture.thenApplyAsync(action, policyExecutor);
} else {
dependentStage = completableFuture.thenApplyAsync(action, executor);
}
return new ManagedCompletableFuture<R>(dependentStage, defaultExecutor);
}

/**
Expand Down Expand Up @@ -793,23 +829,50 @@ public ManagedCompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? sup
return new ManagedCompletableFuture<T>(completableFuture.whenCompleteAsync(action, policyExecutor), defaultExecutor);
}

// TODO move to general context service in com.ibm.ws.context project once Java 8 can be used there
// TODO move the following classes to the general context service in com.ibm.ws.context project once Java 8 can be used there

/**
* Proxy for Function that applies thread context before running and removes it afterward
*
* @param <T> type of the function's parameter
* @param <R> type of the function's result
*/
private static class ContextualFunction<T, R> implements Function<T, R> {
private final Function<T, R> action;
private final ThreadContextDescriptor threadContextDescriptor;

private ContextualFunction(ThreadContextDescriptor threadContextDescriptor, Function<T, R> action) {
this.action = action;
this.threadContextDescriptor = threadContextDescriptor;
}

@Override
public R apply(T t) {
ArrayList<ThreadContext> contextApplied = threadContextDescriptor.taskStarting();
try {
return action.apply(t);
} finally {
threadContextDescriptor.taskStopping(contextApplied);
}
}
}

/**
* Proxy for Supplier that applies thread context before running and removes it afterward
*
* @param <U>
* @param <T> type of the result that is supplied by the supplier
*/
private static class ContextualSupplier<U> implements Supplier<U> {
private final Supplier<U> action;
private static class ContextualSupplier<T> implements Supplier<T> {
private final Supplier<T> action;
private final ThreadContextDescriptor threadContextDescriptor;

private ContextualSupplier(ThreadContextDescriptor threadContextDescriptor, Supplier<U> action) {
private ContextualSupplier(ThreadContextDescriptor threadContextDescriptor, Supplier<T> action) {
this.action = action;
this.threadContextDescriptor = threadContextDescriptor;
}

@Override
public U get() {
public T get() {
ArrayList<ThreadContext> contextApplied = threadContextDescriptor.taskStarting();
try {
return action.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;

import javax.annotation.Resource;
Expand Down Expand Up @@ -580,6 +581,94 @@ else if (lookupResult instanceof Throwable)
assertEquals(Boolean.TRUE, results.poll());
}

/**
* Verify thenApply and both forms of thenApplyAsync.
*/
@Test
public void testThenApply() throws Exception {
LinkedBlockingQueue<Object> results = new LinkedBlockingQueue<Object>();
String currentThreadName = Thread.currentThread().getName();

final Function<Integer, Integer> increment = (count) -> {
System.out.println("> increment #" + (++count) + " from testThenApply");
results.add(Thread.currentThread().getName());
try {
results.add(InitialContext.doLookup("java:comp/env/executorRef"));
} catch (NamingException x) {
results.add(x);
}
System.out.println("< increment");
return count;
};

final CompletableFuture<Integer> cf = ManagedCompletableFuture
.supplyAsync(() -> 0, defaultManagedExecutor)
.thenApplyAsync(increment)
.thenApplyAsync(increment, testThreads)
.thenApply(increment)
.thenApplyAsync(increment);

// Submit from thread that lacks context
CompletableFuture.runAsync(() -> cf.thenApplyAsync(increment));

String threadName;
Object lookupResult;

assertTrue(cf.toString(), cf instanceof ManagedCompletableFuture);

// thenApplyAsync on default execution facility
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertTrue(threadName, threadName.startsWith("Default Executor-thread-")); // must run on Liberty global thread pool
assertNotSame(currentThreadName, threadName); // cannot be the servlet thread because operation is async
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof Throwable)
throw new Exception((Throwable) lookupResult);
assertEquals(defaultManagedExecutor, lookupResult);

// thenApplyAsync on unmanaged executor
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertFalse(threadName, threadName.startsWith("Default Executor-thread-")); // must run async on unmanaged thread
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof NamingException)
; // pass
else if (lookupResult instanceof Throwable)
throw new Exception((Throwable) lookupResult);
else
fail("Unexpected result of lookup: " + lookupResult);

// thenApply on unmanaged thread (context should be applied from stage creation time)
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertTrue(threadName, threadName.equals(currentThreadName) || !threadName.startsWith("Default Executor-thread-")); // could run on current thread if previous stage is complete, otherwise must run on unmanaged thread
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof Throwable)
throw new Exception((Throwable) lookupResult);
assertEquals(defaultManagedExecutor, lookupResult);

// thenApplyAsync (second occurrence) on default execution facility
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertTrue(threadName, threadName.startsWith("Default Executor-thread-")); // must run on Liberty global thread pool
assertNotSame(currentThreadName, threadName); // cannot be the servlet thread because operation is async
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof Throwable)
throw new Exception((Throwable) lookupResult);
assertEquals(defaultManagedExecutor, lookupResult);

// thenApplyAsync requested from unmanaged thread
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertTrue(threadName, threadName.startsWith("Default Executor-thread-")); // must run on Liberty global thread pool
assertNotSame(currentThreadName, threadName); // cannot be the servlet thread because operation is async
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof NamingException)
; // pass
else if (lookupResult instanceof Throwable)
throw new Exception((Throwable) lookupResult);
else
fail("Unexpected result of lookup: " + lookupResult);

// result after 4 increments (the 5th increment is on a subsequent stage, and so would not be reflected in cf's result)
assertEquals(Integer.valueOf(4), cf.get(TIMEOUT_NS, TimeUnit.NANOSECONDS));
}

/**
* Verify thenRun and both forms of thenRunAsync.
*/
Expand Down Expand Up @@ -619,24 +708,25 @@ public void testThenRun() throws Exception {

// static runAsync that creates ManagedCompletableFuture
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertNotSame(currentThreadName, threadName);
assertTrue(threadName, threadName.startsWith("Default Executor-thread-"));
assertTrue(threadName, threadName.startsWith("Default Executor-thread-")); // must run on Liberty global thread pool
assertNotSame(currentThreadName, threadName); // cannot be the servlet thread because operation is async
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof Throwable)
throw new Exception((Throwable) lookupResult);
assertEquals(defaultManagedExecutor, lookupResult);

// thenRunAsync on default execution facility
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertTrue(threadName, threadName.startsWith("Default Executor-thread-"));
assertTrue(threadName, threadName.startsWith("Default Executor-thread-")); // must run on Liberty global thread pool
assertNotSame(currentThreadName, threadName); // cannot be the servlet thread because operation is async
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof Throwable)
throw new Exception((Throwable) lookupResult);
assertEquals(defaultManagedExecutor, lookupResult);

// thenRunAsync on unmanaged executor
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertTrue(threadName, threadName.equals(currentThreadName) || !threadName.startsWith("Default Executor-thread-")); // could run on current thread if previous stage is complete
assertFalse(threadName, threadName.startsWith("Default Executor-thread-")); // must run async on unmanaged thread
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof NamingException)
; // pass
Expand All @@ -647,23 +737,25 @@ else if (lookupResult instanceof Throwable)

// thenRun on unmanaged thread (context should be applied from stage creation time)
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertTrue(threadName, threadName.equals(currentThreadName) || !threadName.startsWith("Default Executor-thread-")); // could run on current thread if previous stage is complete
assertTrue(threadName, threadName.equals(currentThreadName) || !threadName.startsWith("Default Executor-thread-")); // could run on current thread if previous stage is complete, otherwise must run on unmanaged thread
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof Throwable)
throw new Exception((Throwable) lookupResult);
assertEquals(defaultManagedExecutor, lookupResult);

// thenRunAsync (second occurrence) on default execution facility
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertTrue(threadName, threadName.startsWith("Default Executor-thread-"));
assertTrue(threadName, threadName.startsWith("Default Executor-thread-")); // must run on Liberty global thread pool
assertNotSame(currentThreadName, threadName); // cannot be the servlet thread because operation is async
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof Throwable)
throw new Exception((Throwable) lookupResult);
assertEquals(defaultManagedExecutor, lookupResult);

// thenRunAsync requested from unmanaged thread
assertNotNull(threadName = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS).toString());
assertNotSame(currentThreadName, threadName);
assertTrue(threadName, threadName.startsWith("Default Executor-thread-")); // must run on Liberty global thread pool
assertNotSame(currentThreadName, threadName); // cannot be the servlet thread because operation is async
assertNotNull(lookupResult = results.poll(TIMEOUT_NS, TimeUnit.NANOSECONDS));
if (lookupResult instanceof NamingException)
; // pass
Expand Down