-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix propagation of query stats. #9733
Conversation
* Make sure TaskHolder updates happen after initial TaskExecution is created * Make sure final TaskInfo is ready on the worker before exposing final task state to coordinator * Give final task state some time to reach coordinator before cancelling any unfinished stages
Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. In order for us to review and merge your code, please submit the signed CLA to [email protected]. For more information, see https://github.com/trinodb/cla. |
|
||
// task state changes might happen while taskExecution is still being constructed and initialized, | ||
// so make sure initial creation and write of the holder happens before any attempt to update it | ||
taskHolderLock.writeLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used rwlock so that subsequent updates to TaskHolder don't interfere with each other, but they do need to wait until the initial value is set.
I'm not entirely sure what sychronized(this)
guarded against here though.
cc @losipiuk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for investigation. This solution (keeping state in TaskHolder
) has problems. Notification order is not guaranteed. I proposed a different approach (#9733 (comment)) in a comment
@@ -54,6 +54,8 @@ | |||
import java.util.concurrent.atomic.AtomicBoolean; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you give some description (scenario) of how flakyness can happen in commit message?
// so give them a bit more time to complete before cancelling | ||
schedulerExecutor.schedule(() -> { | ||
childStages.forEach(SqlStageExecution::cancel); | ||
}, 100, MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
I think final task infos will still be propagated to cancelled stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've started investigation from one of our tests with missing stats, which also had one stage marked as CANCELLED
each time it failed and I tracked the cancellation to this. I thought that was the main cause initially.
It might belong to a separate PR though, I'll run a test loop without this change to see whether it only affects stage state or stats as well.
@@ -89,6 +91,7 @@ | |||
private final AtomicLong taskStatusVersion = new AtomicLong(TaskStatus.STARTING_VERSION); | |||
private final FutureStateChange<?> taskStatusVersionChange = new FutureStateChange<>(); | |||
|
|||
private final ReadWriteLock taskHolderLock = new ReentrantReadWriteLock(); | |||
private final AtomicReference<TaskHolder> taskHolderReference = new AtomicReference<>(new TaskHolder()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add @GuardedBy("taskHolderLock")
Since its guarded, taskHolderReference
doesn't have to be atomic anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Further updates to taskHolderReference
can still happen cocurrently when states change in fast succession. Only the initial creation is guarded by writeLock, further updates happen under readLock and can run in parallel.
@@ -148,6 +151,18 @@ private void initialize(Consumer<SqlTask> onDone, CounterStat failedTasks) | |||
requireNonNull(failedTasks, "failedTasks is null"); | |||
taskStateMachine.addStateChangeListener(newState -> { | |||
if (!newState.isDone()) { | |||
while (true) { | |||
TaskHolder taskHolder = taskHolderReference.get(); | |||
taskHolderLock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is taskHolderLock
lock for taskHolderReference
or for TaskHolder
itself?
If for the latter, then you don't need a lock since taskHolder.withState
doesn't mutate anything.
If for the former, then
taskHolderLock.readLock().lock();
should be called before taskHolderReference.get()
. However, then you should also use write lock since you modify taskHolderReference
value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskHolderReference
can be in three different states: "initial" with all its internal structures being null before TaskExecution is constructed, "in-progress" when it contains TaskExecution and "final" when task transitions to a final state and final TaskInfo
is created.
The problem here is that you can only update it when it's "in-progress".
This rwLock prevents updates to "initial" when taskState changes while TaskExecution is still being constructed.
An example of such transition being triggered:
io.trino.execution.TaskStateMachine.transitionToDoneState(TaskStateMachine.java:130)
io.trino.execution.TaskStateMachine.finished(TaskStateMachine.java:106)
io.trino.execution.SqlTaskExecution.checkTaskCompletion(SqlTaskExecution.java:650)
io.trino.execution.SqlTaskExecution.addSources(SqlTaskExecution.java:320)
io.trino.execution.SqlTaskExecution.createSqlTaskExecution(SqlTaskExecution.java:167)
io.trino.execution.SqlTaskExecutionFactory.create(SqlTaskExecutionFactory.java:97)
io.trino.execution.SqlTask.updateTask(SqlTask.java:446)
io.trino.execution.SqlTaskManager.doUpdateTask(SqlTaskManager.java:417)
io.trino.execution.SqlTaskManager.lambda$updateTask$6(SqlTaskManager.java:381)
io.trino.$gen.Trino_testversion____20211021_130330_8.call(Unknown Source)
io.trino.execution.SqlTaskManager.updateTask(SqlTaskManager.java:381)
io.trino.server.TaskResource.createOrUpdateTask(TaskResource.java:132)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would task immediately transition to done state?
@@ -54,6 +54,8 @@ | |||
import java.util.concurrent.atomic.AtomicBoolean; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
import java.util.concurrent.atomic.AtomicReference; | |||
import java.util.concurrent.locks.ReadWriteLock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Make sure TaskHolder updates happen after initial TaskExecution is
created
Could you show a scenario where TaskHolder
updates happen before initial TaskExecution is created? Is it early cancellation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a stracktrace of such transition above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a stracktrace of such transition above.
io.trino.execution.SqlTaskExecution.createSqlTaskExecution(SqlTaskExecution.java:167)
io.trino.execution.SqlTaskExecutionFactory.create(SqlTaskExecutionFactory.java:97)
io.trino.execution.SqlTask.updateTask(SqlTask.java:446)
io.trino.execution.SqlTaskManager.doUpdateTask(SqlTaskManager.java:417)
io.trino.execution.SqlTaskManager.lambda$updateTask$6(SqlTaskManager.java:381)
io.trino.$gen.Trino_testversion____20211021_130330_8.call(Unknown Source)
io.trino.execution.SqlTaskManager.updateTask(SqlTaskManager.java:381)
io.trino.server.TaskResource.createOrUpdateTask(TaskResource.java:132)
It seems like within updateTask
we should call sqlTaskExecutionFactory.create
without any initial sources since sources are added directly later on anyway.
I'm not sure we pass sources
there TBH
TaskHolder taskHolder = taskHolderReference.get(); | ||
taskHolderLock.readLock().lock(); | ||
try { | ||
if (taskHolderReference.compareAndSet(taskHolder, taskHolder.withState(newState))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can potentially move to an earlier state. You cannot assume the order in which listeners will be executed since they are async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
@@ -207,7 +229,7 @@ public SqlTaskIoStats getIoStats() | |||
|
|||
public TaskState getTaskState() | |||
{ | |||
return taskStateMachine.getState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think within SqlTask
class taskStateMachine.getState
should never be used directly.
All other methods should use SqlTask#getTaskState
instead.
There should be some method like
doFinalStateNotification
which should be called from taskStateMachine.addStateChangeListener(..)
callback (within io.trino.execution.SqlTask#initialize
)
and from SqlTask#getTaskState
.
It would fire only once
This way you will be sure that whenever getTaskState
returns done
state, final stats will be set.
// task state changes might happen while taskExecution is still being constructed and initialized, | ||
// so make sure initial creation and write of the holder happens before any attempt to update it | ||
taskHolderLock.writeLock().lock(); | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need changes here? Within the synchronized we check if task
is already complete.
Anyway, I'm not sure how this prevents Make sure TaskHolder updates happen after initial TaskExecution is created
(if for example, cancellation happens earlier)
@atanasenko could you explain how coordinator sees |
@sopel39
|
Addresses #5172