Skip to content

Commit

Permalink
Allow different timechunk lock types to coexist in a task group (#16369)
Browse files Browse the repository at this point in the history
Description:
All the streaming ingestion tasks for a given datasource share the same lock for a given interval.
Changing lock types in the supervisor can lead to segment allocation errors due to lock conflicts
for the new tasks while the older tasks are still running.

Fix:
Allow locks of different types (EXCLUSIVE, SHARED, APPEND, REPLACE) to co-exist if they have
the same interval and the same task group.
  • Loading branch information
AmatyaAvadhanula authored May 2, 2024
1 parent e5b40b0 commit b7ae782
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -579,18 +579,20 @@ private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, bool
|| revokeAllIncompatibleActiveLocksIfPossible(conflictPosses, request)) {
posseToUse = createNewTaskLockPosse(request);
} else {
// During a rolling update, tasks of mixed versions can be run at the same time. Old tasks would request
// timeChunkLocks while new tasks would ask segmentLocks. The below check is to allow for old and new tasks
// to get locks of different granularities if they have the same groupId.
final boolean allDifferentGranularity = conflictPosses
// When a rolling upgrade happens or lock types are changed for an ongoing Streaming ingestion supervisor,
// the existing tasks might have or request different lock granularities or types than the new ones.
// To ensure a smooth transition, we must allocate the different lock types for the new tasks
// so that they can coexist and ingest with the required locks.
final boolean allLocksHaveSameTaskGroupAndInterval = conflictPosses
.stream()
.allMatch(
conflictPosse -> conflictPosse.taskLock.getGranularity() != request.getGranularity()
&& conflictPosse.getTaskLock().getGroupId().equals(request.getGroupId())
conflictPosse -> conflictPosse.getTaskLock().getGroupId().equals(request.getGroupId())
&& conflictPosse.getTaskLock().getInterval().equals(request.getInterval())
);
if (allDifferentGranularity) {

if (allLocksHaveSameTaskGroupAndInterval) {
// Lock collision was because of the different granularity in the same group.
// OR because of different lock types for exclusive locks within the same group
// We can add a new taskLockPosse.
posseToUse = createNewTaskLockPosse(request);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -1743,6 +1744,33 @@ public void testReplaceLockCanRevokeAllIncompatible()
validator.expectRevokedLocks(appendLock0, appendLock2, exclusiveLock, replaceLock, sharedLock);
}

@Test
public void testTimechunkLockTypeTransitionForSameTaskGroup()
{
Task task = NoopTask.create();
Task otherGroupTask = NoopTask.create();

// Create an exclusive lock
validator.expectLockCreated(TaskLockType.EXCLUSIVE, task, Intervals.of("2024/2025"));

// Verify that new locks are created for all other conflicting lock requests for the same interval and group
validator.expectLockCreated(TaskLockType.SHARED, task, Intervals.of("2024/2025"));
validator.expectLockCreated(TaskLockType.REPLACE, task, Intervals.of("2024/2025"));
validator.expectLockCreated(TaskLockType.APPEND, task, Intervals.of("2024/2025"));

// Conflicting locks for a different interval cannot be granted
validator.expectLockNotGranted(TaskLockType.EXCLUSIVE, task, Intervals.of("2023/2025"));
validator.expectLockNotGranted(TaskLockType.SHARED, task, Intervals.of("2023/2025"));
validator.expectLockNotGranted(TaskLockType.REPLACE, task, Intervals.of("2023/2025"));
validator.expectLockNotGranted(TaskLockType.APPEND, task, Intervals.of("2023/2025"));

// Locks must not be granted when the task group is different
validator.expectLockNotGranted(TaskLockType.EXCLUSIVE, otherGroupTask, Intervals.of("2024/2025"));
validator.expectLockNotGranted(TaskLockType.SHARED, otherGroupTask, Intervals.of("2024/2025"));
validator.expectLockNotGranted(TaskLockType.REPLACE, otherGroupTask, Intervals.of("2024/2025"));
validator.expectLockNotGranted(TaskLockType.APPEND, otherGroupTask, Intervals.of("2024/2025"));
}

@Test
public void testGetLockedIntervalsForRevokedLocks()
{
Expand Down Expand Up @@ -1977,19 +2005,27 @@ public void testCleanupOnUnlock()
private class TaskLockboxValidator
{

private final List<Task> tasks;
private final Set<Task> tasks;
private final TaskLockbox lockbox;
private final TaskStorage taskStorage;
private final Map<TaskLock, String> lockToTaskIdMap;

TaskLockboxValidator(TaskLockbox lockbox, TaskStorage taskStorage)
{
lockToTaskIdMap = new HashMap<>();
tasks = new ArrayList<>();
tasks = new HashSet<>();
this.lockbox = lockbox;
this.taskStorage = taskStorage;
}

public TaskLock expectLockCreated(TaskLockType type, Task task, Interval interval)
{
final TaskLock lock = tryTaskLock(type, task, interval);
Assert.assertNotNull(lock);
Assert.assertFalse(lock.isRevoked());
return lock;
}

public TaskLock expectLockCreated(TaskLockType type, Interval interval, int priority)
{
final TaskLock lock = tryTaskLock(type, interval, priority);
Expand All @@ -2003,6 +2039,12 @@ public void revokeLock(TaskLock lock)
lockbox.revokeLock(lockToTaskIdMap.get(lock), lock);
}

public void expectLockNotGranted(TaskLockType type, Task task, Interval interval)
{
final TaskLock lock = tryTaskLock(type, task, interval);
Assert.assertNull(lock);
}

public void expectLockNotGranted(TaskLockType type, Interval interval, int priority)
{
final TaskLock lock = tryTaskLock(type, interval, priority);
Expand Down Expand Up @@ -2031,19 +2073,24 @@ public void expectActiveLocks(TaskLock... locks)
}
}

private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority)
private TaskLock tryTaskLock(TaskLockType type, Task task, Interval interval)
{
final Task task = NoopTask.ofPriority(priority);
tasks.add(task);
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
if (tasks.add(task)) {
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
}
TaskLock lock = tryTimeChunkLock(type, task, interval).getTaskLock();
if (lock != null) {
lockToTaskIdMap.put(lock, task.getId());
}
return lock;
}

private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority)
{
return tryTaskLock(type, NoopTask.ofPriority(priority), interval);
}

private Set<TaskLock> getAllActiveLocks()
{
return tasks.stream()
Expand Down

0 comments on commit b7ae782

Please sign in to comment.