-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prioritized locking #4550
Prioritized locking #4550
Conversation
docs/content/ingestion/tasks.md
Outdated
- A task needs to acquire a shared lock before it reads segments of an interval. Multiple shared locks can be acquired for the same dataSource and interval. Shared locks are always preemptable, but they don't preempt each other. | ||
- A task needs to acquire an exclusive lock before it writes segemtns for an interval. An exclusive lock is acquired as preemptable and can be upgraded as non-preemptable when publishing segments. | ||
|
||
Each task can have different lock priorities. The locks of higher-priority tasks can preempt the locks of lower-priority tasks. The lock preemption works based on _optimistic locking_. When a lock is preempted, it is not notified to the owner task immediately. Instead, it's notified when the owner task tries to acquire the same lock again or upgrade it. (Note that lock acquisition is idempotent unless the lock is preempted.) In general, tasks don't content to acquire locks because they usually targets different dataSources or intervals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: "tasks don't content" -> "tasks don't contend"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jon-wei.. I fixed typos.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did an initial pass of the files outside of TaskLockbox, still looking at the changes in TaskLockbox, will do another review pass after that
docs/content/ingestion/tasks.md
Outdated
|
||
Each task can have different lock priorities. The locks of higher-priority tasks can preempt the locks of lower-priority tasks. The lock preemption works based on _optimistic locking_. When a lock is preempted, it is not notified to the owner task immediately. Instead, it's notified when the owner task tries to acquire the same lock again or upgrade it. (Note that lock acquisition is idempotent unless the lock is preempted.) In general, tasks don't content to acquire locks because they usually targets different dataSources or intervals. | ||
|
||
A task writing data into a dataSource must acquire exclusive locks for target intervals. Note that execlusive locks are still preemptable. As a result, the task must _upgrade_ its locks as non-preemptable when it executes a critical operation, _publishing segments_. Once the lock is upgraded, it can't be preempted by even higher-priority locks. After publishing segments, the task downgrades its locks as preemptable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"execlusive" -> "exclusive"
docs/content/ingestion/tasks.md
Outdated
There are two locks types, i.e., _shared lock_ and _exclusive lock_. | ||
|
||
- A task needs to acquire a shared lock before it reads segments of an interval. Multiple shared locks can be acquired for the same dataSource and interval. Shared locks are always preemptable, but they don't preempt each other. | ||
- A task needs to acquire an exclusive lock before it writes segemtns for an interval. An exclusive lock is acquired as preemptable and can be upgraded as non-preemptable when publishing segments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"segemtns" -> "segments"
public class LockResult | ||
{ | ||
private final TaskLock taskLock; | ||
private final boolean wasRevoked; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be just called "revoked"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
@@ -654,7 +683,8 @@ private boolean generateAndPublishSegments( | |||
|
|||
final Interval interval = optInterval.get(); | |||
final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, inputRow); | |||
final String sequenceName = Appenderators.getSequenceName(interval, version, shardSpec); | |||
// TODO: validate versions if appendToExisting? => need to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you planning on updating this TODO within this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, I forgot removing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a second pass on all the files, had a few comments.
Generally LGTM, but I think it'd be good if someone more familiar with the existing task management code also takes a look in case I missed stuff.
final Task task, | ||
final Interval interval, | ||
final Optional<String> preferredVersion | ||
@Nullable final String preferredVersion, // TODO: check this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this TODO implemented already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. It's already done but didn't remove it.
toolbox.verifyTaskLocks(task, segments); | ||
for (DataSegment segment : segments) { | ||
if (!toolbox.getTaskLockbox().upgrade(task, segment.getInterval()).isOk()) { | ||
return SegmentPublishResult.fail(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could make SegmentPublishResult.fail() a static object instead of a function that returns a new one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the fail SegmentPublishResult
is not common because it can happen only when the lock upgrade is failed due to preemption or metadata update is failed. In this case, I think this is better because the instance will be GCed immediately after it is processed while the static instance will be kept until the jvm terminates.
} | ||
} | ||
|
||
TaskActionPreconditions.checkTaskLocks(task, toolbox.getTaskLockbox(), segments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this use checkTaskLocksUpgraded() instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! Yes it should be. But now, it looks better to check locks first and then upgrade them. checkTaskLocksUpgraded()
is not used anymore and I removed it.
@@ -62,7 +62,7 @@ public SegmentNukeAction( | |||
@Override | |||
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException | |||
{ | |||
toolbox.verifyTaskLocks(task, segments); | |||
TaskActionPreconditions.checkTaskLocks(task, toolbox.getTaskLockbox(), segments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the segment delete need upgraded locks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I added lock upgrade here and SegmentMetadataUpdateAction
. Additionally, some other actions like SegmentListUsedAction
and SegmentListUnusedAction
also look to check lock before performing their actions. I'll check and raise a new pr for this issue.
} | ||
} | ||
|
||
public static Map<Interval, TaskLock> acquireLocks(TaskActionClient client, SortedSet<Interval> intervals) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call this acquireExclusiveLocks
since it always creates exclusive locks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
@jon-wei thanks. I addressed your comments. |
Would anyone review this please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a partial review. Will pick it back up after seeing what you think about the comments so far.
this.dataSource = dataSource; | ||
this.interval = interval; | ||
this.version = version; | ||
Preconditions.checkArgument(!type.equals(TaskLockType.SHARED) || !upgraded, "lock[%s] cannot be upgraded", type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line will throw NPE when deserializing existing task locks from the database, since anything created before this patch won't have a "type", and so type.equals
cannot work.
I think that means we need some migration code here for how to deal with legacy task locks. Would it make sense to load them as exclusive, upgraded locks? (Since legacy locks were always exclusive and non-preemptible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Yeah, the lock should be exclusive if type is null.
Preconditions.checkArgument(!upgraded || !revoked, "Upgraded locks cannot be revoked"); | ||
|
||
this.type = type; | ||
this.groupId = Preconditions.checkNotNull(groupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception messages are more useful if you do something like Preconditions.checkNotNull(groupId, "groupId")
-- otherwise a caller has to dig through the source to find what check happened on the line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added.
|
||
public TaskLock upgrade() | ||
{ | ||
Preconditions.checkState(!revoked, "Revoked locks cannot be upgraded"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having two states that interact in very specific ways seems confusing to me. I'm already having a tough time keeping the checks straight. I think it's usually simpler to have a single state enum, maybe in this case with three possible values:
- PREEMPTIBLE
- NONPREEMPTIBLE
- REVOKED
It would help make a couple of salient facts more clear:
- revoked + upgraded is impossible
- REVOKED is a dead-end and cannot be transitioned out of
It would be similar to the design of the "State" enum in QueryLifecycle or DruidStatement or the "Status" enum in KafkaIndexTask.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, but I removed preemptible/non-preemptible states in the latest patch. Please see #4550 (comment).
@JsonProperty("interval") Interval interval, | ||
@JsonProperty("timeoutMs") long timeoutMs | ||
) | ||
{ | ||
this.type = type; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add null checks here if they are important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a default lock type if it's null.
import org.joda.time.Interval; | ||
|
||
public class LockAcquireAction implements TaskAction<TaskLock> | ||
public class LockAcquireAction implements TaskAction<LockResult> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type of the return can't change, for protocol compatibility reasons. Check out rolling-updates.md
-- we allow users to update middleManagers either before or after the overlord and the two versions of code must be able to communicate with each other.
One possible solution could involve adding TaskLock's fields to LockResult, so a LockResult from a new overlord could be deserialized as a TaskLock by an old task (while making sure that the right thing will happen in this case).
It could also involve adding an extra field to LockResult such that a new task could detect that it's talking to an old overlord and act accordingly (the task could know that if the field's not present, it means the overlord is old).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I reverted the return type of LockAcquireAction and LockTryAcquireAction. Also, tested backward compatibility in my local cluster.
@@ -38,14 +41,22 @@ | |||
|
|||
@JsonCreator | |||
public LockAcquireAction( | |||
@JsonProperty("lockType") TaskLockType type, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are protocol compatibility concerns here too. Imagine an old middleManager sending a LockAcquireAction with no lockType
to a new overlord. Or imagine an old overlord receiving a LockAcquireAction from a new middleManager, and it would ignore the lockType
field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. TaskLockType should be EXCLUSIVE if it is null. Tested backward compatibility in my local cluster.
import org.joda.time.Interval; | ||
|
||
public class LockTryAcquireAction implements TaskAction<TaskLock> | ||
public class LockTryAcquireAction implements TaskAction<LockResult> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comments to LockAcquireAction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted return type.
return existingLock.getPriority() < tryLockPriority && !existingLock.isUpgraded(); | ||
} | ||
|
||
private static TaskLockPosse getOnlyTaskLockPosse(Task task, Interval interval, List<TaskLockPosse> lockPosses) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interval
is unused here, outside of log messages and exceptions. Seems suspicious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored.
TaskActionPreconditions.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); | ||
|
||
for (DataSegment segment : segments) { | ||
if (!toolbox.getTaskLockbox().upgrade(task, segment.getInterval()).isOk()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not work as you intend. I believe lockbox.upgrade(task, interval)
only works if the lock interval is exactly interval
, but it might not be in this case. For example, HadoopIndexTask acquires locks for a wider interval than its individual segments (it computes an umbrella interval and locks that).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lockbox.upgrade()
actually works if there is only one lock whose interval contains the given interval. Do you think this makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not consistent with how unlock
works so it's a little strange in that regard. I guess it's fine, but the javadocs for the methods lock
, unlock
, upgrade
, and downgrade
should describe how this works, since otherwise it will get confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, but in the latest patch, I simplified the logic around critical section and removed upgrade/downgrade methods.
* | ||
* @throws IllegalStateException if a downgrade for a shared lock is requested | ||
*/ | ||
public TaskLock downgrade(Task task, Interval interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since multiple tasks can obtain the same lock, is this vulnerable to a problem like:
- Task A and B share a lock
- Task A enters a critical section and upgrades the lock
- Task B enters a critical section and upgrades the lock
- Task A downgrades the lock
- Now Task B's lock is downgraded even though it's still in a critical section
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I guess Task A and B are subtasks of a task because it's allowed to share an exclusive lock only if they have the same groupId. In this case, upgrade/downgrade must be atomic.
BTW, while I'm looking into this, I realized that the atomic operation in a critical section can be achieved without complicated upgrade and downgrade. So, in the latest patch, I removed them and added a new method doInCriticalSection()
to TaskLockBox
instead.
doInCriticalSection()
method performs a given action with a guarantee that the locks of the task requested the action are not preempted while performing the action. Currently, this is guaranteed by acquiring a ReentrantLock, which means every call to methods of TaskLockBox are blocked until doInCriticalSection() is finished. I think this should be fine because
- Publishing segments should not take a long time.
- Publishing segments won't be frequent even after the fine-grained locking proposed in [Proposal] Background compaction #4479 is implemented. The compaction task will be the one which needs more frequent locking and publishing segments than other task types, but it should be suppressed to not make a big overhead to the cluster.
this(type, groupId, dataSource, interval, version, priority, false); | ||
} | ||
|
||
public TaskLock revoke() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revokedCopy()
would be a better name; revoke()
sounds like a verb that would do something. But this method doesn't really do anything, just returns a value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
private final Map<String, NavigableMap<Interval, TaskLockPosse>> running = Maps.newHashMap(); | ||
// Datasource -> Interval -> list of (Tasks + TaskLock) | ||
// Multiple shared locks can be acquired for the same dataSource and interval. | ||
// Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How long are they kept? Could we run out of memory storing them all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw later on that these are kept until unlock
is called so it seems OK to me.
Task task, | ||
List<Interval> intervals, | ||
CriticalAction<T> actionOnValidLocks, | ||
CriticalAction<T> actionOnInvalidLocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about combining both of these behaviors into the CriticalAction interface? (Or did you want to keep them separate so lambdas could be used?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes for using lambdas for simplicity, but I agree this interface looks quite weird. I combined these actions into one CriticalAction and added a builder.
// Even though these two operations are not atomically executed, the caller of replaceLock() is thread-safe and | ||
// guarantees that two or more threads never call replaceLock() at the same time | ||
removeLock(taskid, oldLock); | ||
addLock(taskid, newLock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you do this in a metadata store transaction? We take advantage of such transactions for other operations that must be atomic, such as segment publishing and kafka offset committing.
See IndexerSQLMetadataStorageConnector.announceHistoricalSegments
for an example using connector.retryTransaction
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Fixed.
retVal = toolbox.getTaskLockbox().doInCriticalSection( | ||
task, | ||
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), | ||
() -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
announceHistoricalSegments
is already done inside a metadata store transaction, so does it have to be in a lockbox critical section as well? It's either going to happen, or not happen, so it shouldn't be an issue if a lock preempts this operation in the middle or not.
(However, if it is preempted, the task should promptly exit)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment makes me wonder, do we need critical sections at all for locks?
Could the cases that require them all be implemented using metadata store transactions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doInCriticalSection
guarantees the lock validation and a critical action are done atomically. I thought the metadata store transactions are used for storing/retrieving segment information to/from the metadata storage, and TaskLockBox is responsible for managing task locks. It means, lock validity should be checked by TaskLockBox.
Do you think locks can be checked inside metadata storage transactions as well? I think it might be possible if all locks are stored in only metadata store not in memory. Also, I wonder why locks are stored in memory as well as metadata store. Is this for performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. You're right. I almost forgot that lock checking is important, since for insert-only tasks the design does not depend on lock checking for correctness. But it does matter for mixing insert tasks with reindexing or updating tasks.
Do you think locks can be checked inside metadata storage transactions as well? I think it might be possible if all locks are stored in only metadata store not in memory. Also, I wonder why locks are stored in memory as well as metadata store. Is this for performance?
Hmm, I think it's worth exploring...
The idea behind storing locks in memory was indeed performance. However with the really fat synchronization I'm not sure how much benefit we get beyond storing them solely in the metadata store. I haven't evaluated the performance of metadata-store-only vs. the current memory+metadata system. It would surely make things simpler to do metadata-store-only, all else being equal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I'll do some benchmark in a follow-up pr.
|
||
public enum TaskLockType | ||
{ | ||
SHARED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any code that creates SHARED
locks. Are they used?
Should they be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they will be created in the next pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @jihoonson!
|
||
// Acquire again | ||
final LockResult lockResult = lockbox.lock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval); | ||
Assert.assertFalse(lockResult.isOk()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jihoonson I see that this is expected behavior, but is this really ok. So, low priority task can't get the lock even after high priority task has unlocked.
I think, this behavior is creating following problem.
- a kill task was created that acquired the lock.
- a index task came and took same lock , revoking above before kill task even had a chance to run.
- overlord restarted for whatever reason
- kill task would not be given to taskRunner as it can't get lock , index task has it.
- now index task is finished , and unlocked.
- kill task is forever in waiting state because it is not being able to acquire the lock even if no-one has that lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so the intention of the revoked lock is for tasks to know that the acquired lock is no longer valid so the task should fail itself. I guess the real issue might be a wrong task state after the overlord is restarted. Once a kill task gets a lock, its isReady() method should be called already, which means, the kill task is at least in the pending state. i think this state shouldn’t be changed after restart.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe, but given that the high priority task (which caused the revocation) is done, unlocked and gone.. why the low priority task shouldn't be able to get the lock now and do its thing instead of dying?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think current behavior makes sense for case ...
- low priority indexing task1 took lock , started running creating segments.
- high priority indexing task2 took lock, revoked above, created segments, published them
- at this point segments created by indexing task1 might be outdated and hence it should not be able to make progress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
however, this causes potential for task to be stuck in waiting state forever in case of overlord restarts... as task.isReady() will keep on returning false forever due to the revoked lock and TaskQueue will not give it to TaskRunner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess there could be a couple of reasons.
- I guess we don't know exactly what users want. In this example, you might want to kill some segments after the index task is finished, but in the same scenario, some people want to just ignore the kill task and run the index task. Running kill task could be surprising for these people.
- It's not easy to make the behavior consistent across the tasks of different states. For example, if some locks of a running task are revoked, to make the behavior consistent, the task could be paused for a while until the high priority task releases the lock and continues. I think this could be doable, but will need a lot of work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I missed your last 2 comments. It makes sense too. Maybe we need to figure out why its state has changed after restart and fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I "think" here is what led to the problem state...
- a kill task is submitted, TaskQueue called
isReady()
on it which created a lock in DB for it. - above task hasn't started running yet, a index task is submitted over same interval. TaskQueue called
isReady()
on it which revoked kill task's lock. - none of the above tasks are running just yet (because a lot of tasks got submitted at same time and
TaskRunner
is busy before it gets to assign above tasks to worker) - overlord restarted
- kill task's
isReady()
now returns false forever andTaskQueue
never gives it toTaskRunner
- index task's
isReady()
said true, it was run and finished successfully.
I think the problem happens because isReady()
doesn't differentiate between not getting a lock and having a revoked lock . Ideally TaskQueue
should notice that kill task's lock is revoked and should set its state to failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. Perhaps this could be related to the lame task state tracking in the overlord. 😭
I'm not sure if it's the best idea but I think we can change the return type of isReady()
to some enum value which could be one of READY
, NOT_READY
, and CANNOT_READY
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry , got sidetracked in some other stuff.
I can't think of a better quick fix so that would be OK I think.
Part of #4479.
This patch contains only the prioritized locking and is based on #1679. I'll implement fine-grained locking which include acquiring different types of locks depending on the task spec in a follow-up pr.
Highlight changes are
TaskLock
has aTaskLockType
and two states ofupgraded
andrevoked
.TaskLockbox
is responsible for managing allTaskLock
s including upgraded and revoked ones.LockResult
s.SegmentTransactionalInsertAction
.This change is