-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Zen2] Introduce deterministic task queue #32197
[Zen2] Introduce deterministic task queue #32197
Conversation
The cluster coordination layer relies on timeouts to ensure that a cluster can successfully form, and must also deal with concurrent activity in the cluster. This commit introduces some infrastructure that will help us to deterministically test components that use concurrency and/or timeouts.
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I've left some minor comments.
currentTimeMillis = nextDeferredTaskExecutionTimeMillis; | ||
|
||
nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; | ||
List<DeferredTask> remainingTasks = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remainingDeferredTasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
}; | ||
} | ||
|
||
private class DeferredTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be a static class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It used currentTimeMillis
but on closer inspection this was just in a vacuous assertion.
taskQueue.runRandomTask(random()); | ||
} | ||
|
||
assertThat(strings, containsInAnyOrder("foo", "bar", "baz", "quux")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this really test that the tasks have been run in random order?
An implementation that just runs them in FIFO order would satisfy this assertion
final DeterministicTaskQueue taskQueue = newTaskQueue(); | ||
final List<String> strings = new ArrayList<>(1); | ||
|
||
taskQueue.scheduleAt(0, () -> strings.add("foo")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of using the fixed value 0 here, can you first move forward the current time and then schedule something at that exact time?
final DeterministicTaskQueue taskQueue = newTaskQueue(); | ||
final List<String> strings = new ArrayList<>(1); | ||
|
||
taskQueue.scheduleAt(-1, () -> strings.add("foo")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of using the fixed value -1 here, can you first move forward the current time and then schedule something at a random earlier time?
final DeterministicTaskQueue taskQueue = newTaskQueue(); | ||
final List<String> strings = new ArrayList<>(1); | ||
|
||
taskQueue.scheduleAt(100, () -> strings.add("foo")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here
/** | ||
* Schedule the given task for execution after the given delay has elapsed. | ||
*/ | ||
void schedule(TimeValue delay, Runnable task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should have this follow the shape of the schedule method in java.util.concurrent.ScheduledExecutorService
, i.e., have the command before the delay:
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
Thanks for the review @ywelsch - comments addressed, this is worth another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/** | ||
* Runs the first runnable task. | ||
*/ | ||
void runNextTask() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this method be public?
The scheduling methods are public as well as the advanceTime. I think hasRunnableTasks
, hasDeferredTasks
, getCurrentTimeMillis
and runNextTask
should be as well.
The cluster coordination layer relies on timeouts to ensure that a cluster can
successfully form, and must also deal with concurrent activity in the cluster.
This commit introduces some infrastructure that will help us to
deterministically test components that use concurrency and/or timeouts.