Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SequentialExecutorService.callNextTaskAsync only uses key #4992

Merged
merged 1 commit into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ private SequentialExecutor(Executor executor) {
}

protected void execute(final String key, Runnable task) {
Deque<Runnable> newTasks;
synchronized (tasksByKey) {
newTasks = tasksByKey.get(key);
Deque<Runnable> newTasks = tasksByKey.get(key);
// If this key is already being handled, add it to the queue and return.
if (newTasks != null) {
newTasks.add(task);
Expand All @@ -77,28 +76,32 @@ protected void execute(final String key, Runnable task) {
tasksByKey.put(key, newTasks);
}

callNextTaskAsync(key, newTasks);
callNextTaskAsync(key);
}

protected void callNextTaskAsync(final String key, final Deque<Runnable> tasks) {
protected void callNextTaskAsync(final String key) {
executor.execute(
new Runnable() {
@Override
public void run() {
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
Runnable task = tasks.poll();
if (task != null) {
task.run();
postTaskExecution(key, tasks);
Deque<Runnable> tasks;
synchronized (tasksByKey) {
tasks = tasksByKey.get(key);
if (tasks != null && tasks.isEmpty()) {
tasksByKey.remove(key);
tasks = null;
}
}
if (tasks != null) {
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
}
}
});
}

protected void postTaskExecution(String key, Deque<Runnable> tasks) {
// Do nothing in this class, but provide an opportunity for a subclass to do something
// interesting.
}
}

@BetaApi
Expand All @@ -108,25 +111,13 @@ static class AutoExecutor extends SequentialExecutor {
}

/** Runs synchronous {@code Runnable} tasks sequentially. */
void submit(String key, Runnable task) {
super.execute(key, task);
}

@Override
/** Once a task is done, automatically run the next task in the queue. */
protected void postTaskExecution(final String key, final Deque<Runnable> tasks) {
synchronized (tasksByKey) {
if (tasks.isEmpty()) {
// Note that there can be a race if a task is added to `tasks` at this point. However,
// tasks.add() is called only inside the block synchronized by `tasksByKey` object
// in the execute() function. Therefore, we are safe to remove `tasks` here. This is not
// optimal, but correct.
tasksByKey.remove(key);
return;
void submit(final String key, final Runnable task) {
super.execute(key, new Runnable() {
@Override public void run() {
task.run();
callNextTaskAsync(key);
}
}

callNextTaskAsync(key, tasks);
});
}
}

Expand Down Expand Up @@ -202,7 +193,7 @@ public void run() {
@Override
public void onSuccess(T msg) {
future.set(msg);
resume(key);
callNextTaskAsync(key);
}

// Step 5.2: on failure
Expand Down Expand Up @@ -230,22 +221,6 @@ public void cancel(Throwable e) {
return future;
}

/** Executes the next queued task associated with {@code key}. */
private void resume(String key) {
Deque<Runnable> tasks;
synchronized (tasksByKey) {
tasks = tasksByKey.get(key);
if (tasks == null) {
return;
}
if (tasks.isEmpty()) {
tasksByKey.remove(key);
return;
}
}
callNextTaskAsync(key, tasks);
}

/** Cancels every task in the queue assoicated with {@code key}. */
private void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ public void run() {

@Test
public void SequentialExecutorRunsTasksAutomatically() throws Exception {
int numKeys = 100;
int numTasks = 100;
int numKeys = 50;
int numTasks = 50;
SequentialExecutorService.AutoExecutor sequentialExecutor =
new SequentialExecutorService.AutoExecutor(executorProvider.getExecutor());
CountDownLatch remainingTasksCount = new CountDownLatch(numKeys * numTasks);
Expand All @@ -223,7 +223,7 @@ public void SequentialExecutorRunsTasksAutomatically() throws Exception {
for (int taskId = 0; taskId < numTasks; taskId++) {
SleepingSyncTask task =
new SleepingSyncTask(
taskId, 10, startedTasksSequence, completedTasksSequence, remainingTasksCount);
taskId, 5, startedTasksSequence, completedTasksSequence, remainingTasksCount);
sequentialExecutor.submit(key, task);
}
}
Expand Down