Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix propagation of query stats. #9733

Closed
wants to merge 1 commit into from

Conversation

atanasenko
Copy link
Member

  • Make sure TaskHolder updates happen after initial TaskExecution is created
  • Make sure final TaskInfo is ready on the worker before exposing final task state to coordinator
  • Give final task state some time to reach coordinator before cancelling any unfinished stages

Addresses #5172

* Make sure TaskHolder updates happen after initial TaskExecution is
created
* Make sure final TaskInfo is ready on the worker before exposing final
task state to coordinator
* Give final task state some time to reach coordinator before cancelling
any unfinished stages
@cla-bot
Copy link

cla-bot bot commented Oct 21, 2021

Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. In order for us to review and merge your code, please submit the signed CLA to [email protected]. For more information, see https://github.com/trinodb/cla.


// task state changes might happen while taskExecution is still being constructed and initialized,
// so make sure initial creation and write of the holder happens before any attempt to update it
taskHolderLock.writeLock().lock();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used rwlock so that subsequent updates to TaskHolder don't interfere with each other, but they do need to wait until the initial value is set.
I'm not entirely sure what sychronized(this) guarded against here though.

@findepi
Copy link
Member

findepi commented Oct 22, 2021

cc @losipiuk

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for investigation. This solution (keeping state in TaskHolder) has problems. Notification order is not guaranteed. I proposed a different approach (#9733 (comment)) in a comment

@@ -54,6 +54,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
Copy link
Member

@sopel39 sopel39 Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give some description (scenario) of how flakyness can happen in commit message?

// so give them a bit more time to complete before cancelling
schedulerExecutor.schedule(() -> {
childStages.forEach(SqlStageExecution::cancel);
}, 100, MILLISECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?
I think final task infos will still be propagated to cancelled stage.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've started investigation from one of our tests with missing stats, which also had one stage marked as CANCELLED each time it failed and I tracked the cancellation to this. I thought that was the main cause initially.
It might belong to a separate PR though, I'll run a test loop without this change to see whether it only affects stage state or stats as well.

@@ -89,6 +91,7 @@
private final AtomicLong taskStatusVersion = new AtomicLong(TaskStatus.STARTING_VERSION);
private final FutureStateChange<?> taskStatusVersionChange = new FutureStateChange<>();

private final ReadWriteLock taskHolderLock = new ReentrantReadWriteLock();
private final AtomicReference<TaskHolder> taskHolderReference = new AtomicReference<>(new TaskHolder());
Copy link
Member

@sopel39 sopel39 Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @GuardedBy("taskHolderLock")

Since its guarded, taskHolderReference doesn't have to be atomic anymore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further updates to taskHolderReference can still happen cocurrently when states change in fast succession. Only the initial creation is guarded by writeLock, further updates happen under readLock and can run in parallel.

@@ -148,6 +151,18 @@ private void initialize(Consumer<SqlTask> onDone, CounterStat failedTasks)
requireNonNull(failedTasks, "failedTasks is null");
taskStateMachine.addStateChangeListener(newState -> {
if (!newState.isDone()) {
while (true) {
TaskHolder taskHolder = taskHolderReference.get();
taskHolderLock.readLock().lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is taskHolderLock lock for taskHolderReference or for TaskHolder itself?
If for the latter, then you don't need a lock since taskHolder.withState doesn't mutate anything.
If for the former, then

taskHolderLock.readLock().lock();

should be called before taskHolderReference.get(). However, then you should also use write lock since you modify taskHolderReference value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskHolderReference can be in three different states: "initial" with all its internal structures being null before TaskExecution is constructed, "in-progress" when it contains TaskExecution and "final" when task transitions to a final state and final TaskInfo is created.
The problem here is that you can only update it when it's "in-progress".

This rwLock prevents updates to "initial" when taskState changes while TaskExecution is still being constructed.
An example of such transition being triggered:

 io.trino.execution.TaskStateMachine.transitionToDoneState(TaskStateMachine.java:130)
 io.trino.execution.TaskStateMachine.finished(TaskStateMachine.java:106)
 io.trino.execution.SqlTaskExecution.checkTaskCompletion(SqlTaskExecution.java:650)
 io.trino.execution.SqlTaskExecution.addSources(SqlTaskExecution.java:320)
 io.trino.execution.SqlTaskExecution.createSqlTaskExecution(SqlTaskExecution.java:167)
 io.trino.execution.SqlTaskExecutionFactory.create(SqlTaskExecutionFactory.java:97)
 io.trino.execution.SqlTask.updateTask(SqlTask.java:446)
 io.trino.execution.SqlTaskManager.doUpdateTask(SqlTaskManager.java:417)
 io.trino.execution.SqlTaskManager.lambda$updateTask$6(SqlTaskManager.java:381)
 io.trino.$gen.Trino_testversion____20211021_130330_8.call(Unknown Source)
 io.trino.execution.SqlTaskManager.updateTask(SqlTaskManager.java:381)
 io.trino.server.TaskResource.createOrUpdateTask(TaskResource.java:132)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would task immediately transition to done state?

@@ -54,6 +54,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
Copy link
Member

@sopel39 sopel39 Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Make sure TaskHolder updates happen after initial TaskExecution is
    created

Could you show a scenario where TaskHolder updates happen before initial TaskExecution is created? Is it early cancellation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a stracktrace of such transition above.

Copy link
Member

@sopel39 sopel39 Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a stracktrace of such transition above.

 io.trino.execution.SqlTaskExecution.createSqlTaskExecution(SqlTaskExecution.java:167)
 io.trino.execution.SqlTaskExecutionFactory.create(SqlTaskExecutionFactory.java:97)
 io.trino.execution.SqlTask.updateTask(SqlTask.java:446)
 io.trino.execution.SqlTaskManager.doUpdateTask(SqlTaskManager.java:417)
 io.trino.execution.SqlTaskManager.lambda$updateTask$6(SqlTaskManager.java:381)
 io.trino.$gen.Trino_testversion____20211021_130330_8.call(Unknown Source)
 io.trino.execution.SqlTaskManager.updateTask(SqlTaskManager.java:381)
 io.trino.server.TaskResource.createOrUpdateTask(TaskResource.java:132)

It seems like within updateTask we should call sqlTaskExecutionFactory.create without any initial sources since sources are added directly later on anyway.
I'm not sure we pass sources there TBH

TaskHolder taskHolder = taskHolderReference.get();
taskHolderLock.readLock().lock();
try {
if (taskHolderReference.compareAndSet(taskHolder, taskHolder.withState(newState))) {
Copy link
Member

@sopel39 sopel39 Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can potentially move to an earlier state. You cannot assume the order in which listeners will be executed since they are async

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

@@ -207,7 +229,7 @@ public SqlTaskIoStats getIoStats()

public TaskState getTaskState()
{
return taskStateMachine.getState();
Copy link
Member

@sopel39 sopel39 Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think within SqlTask class taskStateMachine.getState should never be used directly.
All other methods should use SqlTask#getTaskState instead.

There should be some method like

doFinalStateNotification

which should be called from taskStateMachine.addStateChangeListener(..) callback (within io.trino.execution.SqlTask#initialize)
and from SqlTask#getTaskState.
It would fire only once

This way you will be sure that whenever getTaskState returns done state, final stats will be set.

// task state changes might happen while taskExecution is still being constructed and initialized,
// so make sure initial creation and write of the holder happens before any attempt to update it
taskHolderLock.writeLock().lock();
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need changes here? Within the synchronized we check if task is already complete.

Anyway, I'm not sure how this prevents Make sure TaskHolder updates happen after initial TaskExecution is created (if for example, cancellation happens earlier)

@sopel39
Copy link
Member

sopel39 commented Oct 22, 2021

@atanasenko could you explain how coordinator sees FINISHED state before getting final stats?
Unless I don't see something, I think the only way coordinator gets task state is via io.trino.execution.SqlTask#createTaskStatus. But stats there should always be up-to date since in io.trino.execution.SqlTask#createTaskStatus we get taskStateMachine.getState() before computing actual stats (either from actual TaskExecution or io.trino.execution.SqlTask.TaskHolder#getFinalTaskInfo)

@atanasenko
Copy link
Member Author

atanasenko commented Oct 22, 2021

@sopel39
This below sometimes causes SqlTask to compute final TaskInfo based on empty stats (code)
TaskListener might manage to run before SqlTaskExecutionFactory.create() completes.

 io.trino.execution.TaskStateMachine.transitionToDoneState(TaskStateMachine.java:130)
 io.trino.execution.TaskStateMachine.finished(TaskStateMachine.java:106)
 io.trino.execution.SqlTaskExecution.checkTaskCompletion(SqlTaskExecution.java:650)
 io.trino.execution.SqlTaskExecution.addSources(SqlTaskExecution.java:320)
 io.trino.execution.SqlTaskExecution.createSqlTaskExecution(SqlTaskExecution.java:167)
 io.trino.execution.SqlTaskExecutionFactory.create(SqlTaskExecutionFactory.java:97)
 io.trino.execution.SqlTask.updateTask(SqlTask.java:446)
 io.trino.execution.SqlTaskManager.doUpdateTask(SqlTaskManager.java:417)
 io.trino.execution.SqlTaskManager.lambda$updateTask$6(SqlTaskManager.java:381)
 io.trino.$gen.Trino_testversion____20211021_130330_8.call(Unknown Source)
 io.trino.execution.SqlTaskManager.updateTask(SqlTaskManager.java:381)
 io.trino.server.TaskResource.createOrUpdateTask(TaskResource.java:132)

@atanasenko
Copy link
Member Author

Superseded by #9888 and #9913

@atanasenko atanasenko closed this Nov 9, 2021
@atanasenko atanasenko deleted the at/stats-flaky-tests branch November 9, 2021 12:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants