-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prioritized locking #4550
Prioritized locking #4550
Changes from 4 commits
1f32283
df93349
eecc391
14bfaa4
c57a9c5
c17493f
17fbcea
700a968
1dc9941
2eeaec5
6a33824
2c394a2
66bbc54
332c6e3
f5550dc
7ef3774
db3195f
51b6962
c5576d7
a9ceace
c04bc36
7ad8720
07ace37
4389f7f
42ab8f7
0ca6c89
05078b7
507c6ba
5a29ee1
4586f24
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,9 +88,32 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed | |
|property|description|required?| | ||
|--------|-----------|---------| | ||
|type|The task type, this should always be "index".|yes| | ||
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no| | ||
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp. |no| | ||
|spec|The ingestion spec. See below for more details. |yes| | ||
|
||
#### Task Priority | ||
|
||
Druid's indexing tasks use locks for atomic data ingestion. Each lock is acquired for the combination of a dataSource and an interval. Once a task acquires a lock, it can write data for the dataSource and the interval of the acquired lock unless the lock is released or preempted. Please see [the below Locking section](#locking) | ||
|
||
Each task has a priority which is used for lock acquisition. Higher-priority tasks can preempt lower-priority tasks if they try to write on the same dataSource and interval. If some locks of a task are preempted, the behavior of the preempted task depends on the task implementation. Usually, most tasks finish as failed if they are preempted. | ||
|
||
Tasks can have different default priorities depening on their types. Here are a list of default priorities. Higher the number, higher the priority. | ||
|
||
|task type|default priority| | ||
|---------|----------------| | ||
|Realtime index task|75| | ||
|Batch index task|50| | ||
|Merge/Append task|25| | ||
|Other tasks|0| | ||
|
||
You can override the task priority by setting your priority in the task context like below. | ||
|
||
```json | ||
"context" : { | ||
"priority" : 100 | ||
} | ||
``` | ||
|
||
#### DataSchema | ||
|
||
This field is required. | ||
|
@@ -308,7 +331,17 @@ These tasks start, sleep for a time and are used only for testing. The available | |
Locking | ||
------- | ||
|
||
Once an overlord node accepts a task, a lock is created for the data source and interval specified in the task. | ||
Once an overlord node accepts a task, the task acquires locks for the data source and intervals specified in the task. | ||
|
||
There are two locks types, i.e., _shared lock_ and _exclusive lock_. | ||
|
||
- A task needs to acquire a shared lock before it reads segments of an interval. Multiple shared locks can be acquired for the same dataSource and interval. Shared locks are always preemptable, but they don't preempt each other. | ||
- A task needs to acquire an exclusive lock before it writes segemtns for an interval. An exclusive lock is acquired as preemptable and can be upgraded as non-preemptable when publishing segments. | ||
|
||
Each task can have different lock priorities. The locks of higher-priority tasks can preempt the locks of lower-priority tasks. The lock preemption works based on _optimistic locking_. When a lock is preempted, it is not notified to the owner task immediately. Instead, it's notified when the owner task tries to acquire the same lock again or upgrade it. (Note that lock acquisition is idempotent unless the lock is preempted.) In general, tasks don't content to acquire locks because they usually targets different dataSources or intervals. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: "tasks don't content" -> "tasks don't contend" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @jon-wei.. I fixed typos. |
||
|
||
A task writing data into a dataSource must acquire exclusive locks for target intervals. Note that execlusive locks are still preemptable. As a result, the task must _upgrade_ its locks as non-preemptable when it executes a critical operation, _publishing segments_. Once the lock is upgraded, it can't be preempted by even higher-priority locks. After publishing segments, the task downgrades its locks as preemptable. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "execlusive" -> "exclusive" |
||
|
||
Tasks do not need to explicitly release locks, they are released upon task completion. Tasks may potentially release | ||
locks early if they desire. Tasks ids are unique by naming them using UUIDs or the timestamp in which the task was created. | ||
Tasks are also part of a "task group", which is a set of tasks that can share interval locks. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,30 +22,85 @@ | |
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.google.common.base.Objects; | ||
import com.google.common.base.Preconditions; | ||
import org.joda.time.Interval; | ||
|
||
/** | ||
* Represents a lock held by some task. Immutable. | ||
*/ | ||
public class TaskLock | ||
{ | ||
private final TaskLockType type; | ||
private final String groupId; | ||
private final String dataSource; | ||
private final Interval interval; | ||
private final String version; | ||
private final int priority; | ||
private final boolean upgraded; | ||
private final boolean revoked; | ||
|
||
@JsonCreator | ||
public TaskLock( | ||
@JsonProperty("type") TaskLockType type, | ||
@JsonProperty("groupId") String groupId, | ||
@JsonProperty("dataSource") String dataSource, | ||
@JsonProperty("interval") Interval interval, | ||
@JsonProperty("version") String version | ||
@JsonProperty("version") String version, | ||
@JsonProperty("priority") int priority, | ||
@JsonProperty("upgraded") boolean upgraded, | ||
@JsonProperty("revoked") boolean revoked | ||
) | ||
{ | ||
this.groupId = groupId; | ||
this.dataSource = dataSource; | ||
this.interval = interval; | ||
this.version = version; | ||
Preconditions.checkArgument(!type.equals(TaskLockType.SHARED) || !upgraded, "lock[%s] cannot be upgraded", type); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line will throw NPE when deserializing existing task locks from the database, since anything created before this patch won't have a "type", and so I think that means we need some migration code here for how to deal with legacy task locks. Would it make sense to load them as exclusive, upgraded locks? (Since legacy locks were always exclusive and non-preemptible) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Yeah, the lock should be exclusive if type is null. |
||
Preconditions.checkArgument(!upgraded || !revoked, "Upgraded locks cannot be revoked"); | ||
|
||
this.type = type; | ||
this.groupId = Preconditions.checkNotNull(groupId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exception messages are more useful if you do something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. Added. |
||
this.dataSource = Preconditions.checkNotNull(dataSource); | ||
this.interval = Preconditions.checkNotNull(interval); | ||
this.version = Preconditions.checkNotNull(version); | ||
this.priority = priority; | ||
this.upgraded = upgraded; | ||
this.revoked = revoked; | ||
} | ||
|
||
public TaskLock( | ||
TaskLockType type, | ||
String groupId, | ||
String dataSource, | ||
Interval interval, | ||
String version, | ||
int priority | ||
) | ||
{ | ||
this(type, groupId, dataSource, interval, version, priority, false, false); | ||
} | ||
|
||
public TaskLock upgrade() | ||
{ | ||
Preconditions.checkState(!revoked, "Revoked locks cannot be upgraded"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having two states that interact in very specific ways seems confusing to me. I'm already having a tough time keeping the checks straight. I think it's usually simpler to have a single state enum, maybe in this case with three possible values:
It would help make a couple of salient facts more clear:
It would be similar to the design of the "State" enum in QueryLifecycle or DruidStatement or the "Status" enum in KafkaIndexTask. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, but I removed preemptible/non-preemptible states in the latest patch. Please see #4550 (comment). |
||
Preconditions.checkState(!upgraded, "Already upgraded"); | ||
return new TaskLock(type, groupId, dataSource, interval, version, priority, true, revoked); | ||
} | ||
|
||
public TaskLock downgrade() | ||
{ | ||
Preconditions.checkState(!revoked, "Revoked locks cannot be downgraded"); | ||
Preconditions.checkState(upgraded, "Already downgraded"); | ||
return new TaskLock(type, groupId, dataSource, interval, version, priority, false, revoked); | ||
} | ||
|
||
public TaskLock revoke() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed. |
||
{ | ||
Preconditions.checkState(!revoked, "Already revoked"); | ||
Preconditions.checkState(!upgraded, "Upgraded locks cannot be revoked"); | ||
return new TaskLock(type, groupId, dataSource, interval, version, priority, upgraded, true); | ||
} | ||
|
||
@JsonProperty | ||
public TaskLockType getType() | ||
{ | ||
return type; | ||
} | ||
|
||
@JsonProperty | ||
|
@@ -72,34 +127,60 @@ public String getVersion() | |
return version; | ||
} | ||
|
||
@JsonProperty | ||
public int getPriority() | ||
{ | ||
return priority; | ||
} | ||
|
||
@JsonProperty | ||
public boolean isUpgraded() | ||
{ | ||
return upgraded; | ||
} | ||
|
||
@JsonProperty | ||
public boolean isRevoked() | ||
{ | ||
return revoked; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) | ||
{ | ||
if (!(o instanceof TaskLock)) { | ||
return false; | ||
} else { | ||
final TaskLock x = (TaskLock) o; | ||
return Objects.equal(this.groupId, x.groupId) && | ||
Objects.equal(this.dataSource, x.dataSource) && | ||
Objects.equal(this.interval, x.interval) && | ||
Objects.equal(this.version, x.version); | ||
final TaskLock that = (TaskLock) o; | ||
return this.type.equals(that.type) && | ||
this.groupId.equals(that.groupId) && | ||
this.dataSource.equals(that.dataSource) && | ||
this.interval.equals(that.interval) && | ||
this.version.equals(that.version) && | ||
this.priority == that.priority && | ||
this.upgraded == that.upgraded && | ||
this.revoked == that.revoked; | ||
} | ||
} | ||
|
||
@Override | ||
public int hashCode() | ||
{ | ||
return Objects.hashCode(groupId, dataSource, interval, version); | ||
return Objects.hashCode(type, groupId, dataSource, interval, version, priority, upgraded, revoked); | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return Objects.toStringHelper(this) | ||
.add("type", type) | ||
.add("groupId", groupId) | ||
.add("dataSource", dataSource) | ||
.add("interval", interval) | ||
.add("version", version) | ||
.add("priority", priority) | ||
.add("upgraded", upgraded) | ||
.add("revoked", revoked) | ||
.toString(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.indexing.common; | ||
|
||
public enum TaskLockType | ||
{ | ||
SHARED, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see any code that creates Should they be used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, they will be created in the next pr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, got it. |
||
EXCLUSIVE | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,42 +24,53 @@ | |
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.google.common.base.Throwables; | ||
import io.druid.indexing.common.TaskLock; | ||
import io.druid.indexing.common.TaskLockType; | ||
import io.druid.indexing.common.task.Task; | ||
import io.druid.indexing.overlord.LockResult; | ||
import org.joda.time.Interval; | ||
|
||
public class LockAcquireAction implements TaskAction<TaskLock> | ||
public class LockAcquireAction implements TaskAction<LockResult> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The type of the return can't change, for protocol compatibility reasons. Check out One possible solution could involve adding TaskLock's fields to LockResult, so a LockResult from a new overlord could be deserialized as a TaskLock by an old task (while making sure that the right thing will happen in this case). It could also involve adding an extra field to LockResult such that a new task could detect that it's talking to an old overlord and act accordingly (the task could know that if the field's not present, it means the overlord is old). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I reverted the return type of LockAcquireAction and LockTryAcquireAction. Also, tested backward compatibility in my local cluster. |
||
{ | ||
private final TaskLockType type; | ||
|
||
@JsonIgnore | ||
private final Interval interval; | ||
|
||
@JsonCreator | ||
public LockAcquireAction( | ||
@JsonProperty("lockType") TaskLockType type, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are protocol compatibility concerns here too. Imagine an old middleManager sending a LockAcquireAction with no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. TaskLockType should be EXCLUSIVE if it is null. Tested backward compatibility in my local cluster. |
||
@JsonProperty("interval") Interval interval | ||
) | ||
{ | ||
this.type = type; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add null checks here if they are important. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a default lock type if it's null. |
||
this.interval = interval; | ||
} | ||
|
||
@JsonProperty("lockType") | ||
public TaskLockType getType() | ||
{ | ||
return type; | ||
} | ||
|
||
@JsonProperty | ||
public Interval getInterval() | ||
{ | ||
return interval; | ||
} | ||
|
||
@Override | ||
public TypeReference<TaskLock> getReturnTypeReference() | ||
public TypeReference<LockResult> getReturnTypeReference() | ||
{ | ||
return new TypeReference<TaskLock>() | ||
return new TypeReference<LockResult>() | ||
{ | ||
}; | ||
} | ||
|
||
@Override | ||
public TaskLock perform(Task task, TaskActionToolbox toolbox) | ||
public LockResult perform(Task task, TaskActionToolbox toolbox) | ||
{ | ||
try { | ||
return toolbox.getTaskLockbox().lock(task, interval); | ||
return toolbox.getTaskLockbox().lock(type, task, interval); | ||
} | ||
catch (InterruptedException e) { | ||
throw Throwables.propagate(e); | ||
|
@@ -76,7 +87,8 @@ public boolean isAudited() | |
public String toString() | ||
{ | ||
return "LockAcquireAction{" + | ||
"interval=" + interval + | ||
"lockType=" + type + | ||
", interval=" + interval + | ||
'}'; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"segemtns" -> "segments"