Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritized locking #4550

Merged
merged 30 commits into from
Oct 12, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1f32283
Implementation of prioritized locking
jihoonson Jul 15, 2017
df93349
Fix build failure
jihoonson Jul 15, 2017
eecc391
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Jul 17, 2017
14bfaa4
Fix tc fail
jihoonson Jul 18, 2017
c57a9c5
Fix typos
jihoonson Jul 21, 2017
c17493f
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Jul 22, 2017
17fbcea
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Jul 25, 2017
700a968
Fix IndexTaskTest
jihoonson Jul 25, 2017
1dc9941
Addressed comments
jihoonson Jul 27, 2017
2eeaec5
Fix test
jihoonson Jul 27, 2017
6a33824
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 6, 2017
2c394a2
Fix spacing
jihoonson Aug 6, 2017
66bbc54
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 9, 2017
332c6e3
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 15, 2017
f5550dc
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 17, 2017
7ef3774
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 21, 2017
db3195f
Fix build error
jihoonson Aug 22, 2017
51b6962
Fix build error
jihoonson Aug 22, 2017
c5576d7
Add lock status
jihoonson Aug 29, 2017
a9ceace
Cleanup suspicious method
jihoonson Aug 29, 2017
c04bc36
Add nullables
jihoonson Aug 29, 2017
7ad8720
add doInCriticalSection to TaskLockBox and revert return type of tas…
jihoonson Sep 7, 2017
07ace37
fix build
jihoonson Sep 7, 2017
4389f7f
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Sep 27, 2017
42ab8f7
refactor CriticalAction
jihoonson Oct 11, 2017
0ca6c89
make replaceLock transactional
jihoonson Oct 11, 2017
05078b7
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Oct 11, 2017
507c6ba
fix formatting
jihoonson Oct 11, 2017
5a29ee1
fix javadoc
jihoonson Oct 11, 2017
4586f24
fix build
jihoonson Oct 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,32 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|property|description|required?|
|--------|-----------|---------|
|type|The task type, this should always be "index".|yes|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp. |no|
|spec|The ingestion spec. See below for more details. |yes|

#### Task Priority

Druid's indexing tasks use locks for atomic data ingestion. Each lock is acquired for the combination of a dataSource and an interval. Once a task acquires a lock, it can write data for the dataSource and the interval of the acquired lock unless the lock is released or preempted. Please see [the below Locking section](#locking)

Each task has a priority which is used for lock acquisition. Higher-priority tasks can preempt lower-priority tasks if they try to write on the same dataSource and interval. If some locks of a task are preempted, the behavior of the preempted task depends on the task implementation. Usually, most tasks finish as failed if they are preempted.

Tasks can have different default priorities depening on their types. Here are a list of default priorities. Higher the number, higher the priority.

|task type|default priority|
|---------|----------------|
|Realtime index task|75|
|Batch index task|50|
|Merge/Append task|25|
|Other tasks|0|

You can override the task priority by setting your priority in the task context like below.

```json
"context" : {
"priority" : 100
}
```

#### DataSchema

This field is required.
Expand Down Expand Up @@ -308,7 +331,17 @@ These tasks start, sleep for a time and are used only for testing. The available
Locking
-------

Once an overlord node accepts a task, a lock is created for the data source and interval specified in the task.
Once an overlord node accepts a task, the task acquires locks for the data source and intervals specified in the task.

There are two locks types, i.e., _shared lock_ and _exclusive lock_.

- A task needs to acquire a shared lock before it reads segments of an interval. Multiple shared locks can be acquired for the same dataSource and interval. Shared locks are always preemptable, but they don't preempt each other.
- A task needs to acquire an exclusive lock before it writes segemtns for an interval. An exclusive lock is acquired as preemptable and can be upgraded as non-preemptable when publishing segments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"segemtns" -> "segments"


Each task can have different lock priorities. The locks of higher-priority tasks can preempt the locks of lower-priority tasks. The lock preemption works based on _optimistic locking_. When a lock is preempted, it is not notified to the owner task immediately. Instead, it's notified when the owner task tries to acquire the same lock again or upgrade it. (Note that lock acquisition is idempotent unless the lock is preempted.) In general, tasks don't content to acquire locks because they usually targets different dataSources or intervals.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: "tasks don't content" -> "tasks don't contend"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jon-wei.. I fixed typos.


A task writing data into a dataSource must acquire exclusive locks for target intervals. Note that execlusive locks are still preemptable. As a result, the task must _upgrade_ its locks as non-preemptable when it executes a critical operation, _publishing segments_. Once the lock is upgraded, it can't be preempted by even higher-priority locks. After publishing segments, the task downgrades its locks as preemptable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"execlusive" -> "exclusive"


Tasks do not need to explicitly release locks, they are released upon task completion. Tasks may potentially release
locks early if they desire. Tasks ids are unique by naming them using UUIDs or the timestamp in which the task was created.
Tasks are also part of a "task group", which is a set of tasks that can share interval locks.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -543,13 +544,12 @@ public void run()
sequenceNames.values()
).get();

final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published);
final SegmentsAndMetadata handedOff;
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOff = driver.registerHandoff(published)
.get();
handedOff = handoffFuture.get();
} else {
handedOff = driver.registerHandoff(published)
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
}

if (handedOff == null) {
Expand Down
103 changes: 92 additions & 11 deletions indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,85 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.joda.time.Interval;

/**
* Represents a lock held by some task. Immutable.
*/
public class TaskLock
{
private final TaskLockType type;
private final String groupId;
private final String dataSource;
private final Interval interval;
private final String version;
private final int priority;
private final boolean upgraded;
private final boolean revoked;

@JsonCreator
public TaskLock(
@JsonProperty("type") TaskLockType type,
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version
@JsonProperty("version") String version,
@JsonProperty("priority") int priority,
@JsonProperty("upgraded") boolean upgraded,
@JsonProperty("revoked") boolean revoked
)
{
this.groupId = groupId;
this.dataSource = dataSource;
this.interval = interval;
this.version = version;
Preconditions.checkArgument(!type.equals(TaskLockType.SHARED) || !upgraded, "lock[%s] cannot be upgraded", type);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line will throw NPE when deserializing existing task locks from the database, since anything created before this patch won't have a "type", and so type.equals cannot work.

I think that means we need some migration code here for how to deal with legacy task locks. Would it make sense to load them as exclusive, upgraded locks? (Since legacy locks were always exclusive and non-preemptible)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Yeah, the lock should be exclusive if type is null.

Preconditions.checkArgument(!upgraded || !revoked, "Upgraded locks cannot be revoked");

this.type = type;
this.groupId = Preconditions.checkNotNull(groupId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception messages are more useful if you do something like Preconditions.checkNotNull(groupId, "groupId") -- otherwise a caller has to dig through the source to find what check happened on the line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added.

this.dataSource = Preconditions.checkNotNull(dataSource);
this.interval = Preconditions.checkNotNull(interval);
this.version = Preconditions.checkNotNull(version);
this.priority = priority;
this.upgraded = upgraded;
this.revoked = revoked;
}

public TaskLock(
TaskLockType type,
String groupId,
String dataSource,
Interval interval,
String version,
int priority
)
{
this(type, groupId, dataSource, interval, version, priority, false, false);
}

public TaskLock upgrade()
{
Preconditions.checkState(!revoked, "Revoked locks cannot be upgraded");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having two states that interact in very specific ways seems confusing to me. I'm already having a tough time keeping the checks straight. I think it's usually simpler to have a single state enum, maybe in this case with three possible values:

  • PREEMPTIBLE
  • NONPREEMPTIBLE
  • REVOKED

It would help make a couple of salient facts more clear:

  • revoked + upgraded is impossible
  • REVOKED is a dead-end and cannot be transitioned out of

It would be similar to the design of the "State" enum in QueryLifecycle or DruidStatement or the "Status" enum in KafkaIndexTask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, but I removed preemptible/non-preemptible states in the latest patch. Please see #4550 (comment).

Preconditions.checkState(!upgraded, "Already upgraded");
return new TaskLock(type, groupId, dataSource, interval, version, priority, true, revoked);
}

public TaskLock downgrade()
{
Preconditions.checkState(!revoked, "Revoked locks cannot be downgraded");
Preconditions.checkState(upgraded, "Already downgraded");
return new TaskLock(type, groupId, dataSource, interval, version, priority, false, revoked);
}

public TaskLock revoke()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revokedCopy() would be a better name; revoke() sounds like a verb that would do something. But this method doesn't really do anything, just returns a value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

{
Preconditions.checkState(!revoked, "Already revoked");
Preconditions.checkState(!upgraded, "Upgraded locks cannot be revoked");
return new TaskLock(type, groupId, dataSource, interval, version, priority, upgraded, true);
}

@JsonProperty
public TaskLockType getType()
{
return type;
}

@JsonProperty
Expand All @@ -72,34 +127,60 @@ public String getVersion()
return version;
}

@JsonProperty
public int getPriority()
{
return priority;
}

@JsonProperty
public boolean isUpgraded()
{
return upgraded;
}

@JsonProperty
public boolean isRevoked()
{
return revoked;
}

@Override
public boolean equals(Object o)
{
if (!(o instanceof TaskLock)) {
return false;
} else {
final TaskLock x = (TaskLock) o;
return Objects.equal(this.groupId, x.groupId) &&
Objects.equal(this.dataSource, x.dataSource) &&
Objects.equal(this.interval, x.interval) &&
Objects.equal(this.version, x.version);
final TaskLock that = (TaskLock) o;
return this.type.equals(that.type) &&
this.groupId.equals(that.groupId) &&
this.dataSource.equals(that.dataSource) &&
this.interval.equals(that.interval) &&
this.version.equals(that.version) &&
this.priority == that.priority &&
this.upgraded == that.upgraded &&
this.revoked == that.revoked;
}
}

@Override
public int hashCode()
{
return Objects.hashCode(groupId, dataSource, interval, version);
return Objects.hashCode(type, groupId, dataSource, interval, version, priority, upgraded, revoked);
}

@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("type", type)
.add("groupId", groupId)
.add("dataSource", dataSource)
.add("interval", interval)
.add("version", version)
.add("priority", priority)
.add("upgraded", upgraded)
.add("revoked", revoked)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.common;

public enum TaskLockType
{
SHARED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any code that creates SHARED locks. Are they used?

Should they be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they will be created in the next pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it.

EXCLUSIVE
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,53 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Throwables;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.LockResult;
import org.joda.time.Interval;

public class LockAcquireAction implements TaskAction<TaskLock>
public class LockAcquireAction implements TaskAction<LockResult>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type of the return can't change, for protocol compatibility reasons. Check out rolling-updates.md -- we allow users to update middleManagers either before or after the overlord and the two versions of code must be able to communicate with each other.

One possible solution could involve adding TaskLock's fields to LockResult, so a LockResult from a new overlord could be deserialized as a TaskLock by an old task (while making sure that the right thing will happen in this case).

It could also involve adding an extra field to LockResult such that a new task could detect that it's talking to an old overlord and act accordingly (the task could know that if the field's not present, it means the overlord is old).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I reverted the return type of LockAcquireAction and LockTryAcquireAction. Also, tested backward compatibility in my local cluster.

{
private final TaskLockType type;

@JsonIgnore
private final Interval interval;

@JsonCreator
public LockAcquireAction(
@JsonProperty("lockType") TaskLockType type,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are protocol compatibility concerns here too. Imagine an old middleManager sending a LockAcquireAction with no lockType to a new overlord. Or imagine an old overlord receiving a LockAcquireAction from a new middleManager, and it would ignore the lockType field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. TaskLockType should be EXCLUSIVE if it is null. Tested backward compatibility in my local cluster.

@JsonProperty("interval") Interval interval
)
{
this.type = type;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add null checks here if they are important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a default lock type if it's null.

this.interval = interval;
}

@JsonProperty("lockType")
public TaskLockType getType()
{
return type;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@Override
public TypeReference<TaskLock> getReturnTypeReference()
public TypeReference<LockResult> getReturnTypeReference()
{
return new TypeReference<TaskLock>()
return new TypeReference<LockResult>()
{
};
}

@Override
public TaskLock perform(Task task, TaskActionToolbox toolbox)
public LockResult perform(Task task, TaskActionToolbox toolbox)
{
try {
return toolbox.getTaskLockbox().lock(task, interval);
return toolbox.getTaskLockbox().lock(type, task, interval);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
Expand All @@ -76,7 +87,8 @@ public boolean isAudited()
public String toString()
{
return "LockAcquireAction{" +
"interval=" + interval +
"lockType=" + type +
", interval=" + interval +
'}';
}
}
Loading