Skip to content

Commit

Permalink
refactor integration-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Feb 29, 2024
1 parent 8d9b22d commit 992145b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -667,13 +667,12 @@ public Map<String, Object> getCronSchedulerJobDetails(
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.UPDATE)
@ApiOperation("Schedule tasks and return a map from task type to task name scheduled")
public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
public Map<String, List<String>> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
@ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) {
if (taskType != null) {
// Schedule task for the given task type
String taskName = tableName != null ? _pinotTaskManager.scheduleTask(taskType, tableName)
: _pinotTaskManager.scheduleTask(taskType);
return Collections.singletonMap(taskType, taskName);
return Collections.singletonMap(taskType, tableName != null ? _pinotTaskManager.scheduleTask(taskType, tableName)
: _pinotTaskManager.scheduleTask(taskType));
} else {
// Schedule tasks for all task types
return tableName != null ? _pinotTaskManager.scheduleTasks(tableName) : _pinotTaskManager.scheduleTasks();
Expand Down Expand Up @@ -718,7 +717,7 @@ public void executeAdhocTask(AdhocTaskConfig adhocTaskConfig, @Suspended AsyncRe
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.UPDATE)
@ApiOperation("Schedule tasks (deprecated)")
public Map<String, String> scheduleTasksDeprecated() {
public Map<String, List<String>> scheduleTasksDeprecated() {
return _pinotTaskManager.scheduleTasks();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,19 +472,27 @@ public TaskGeneratorRegistry getTaskGeneratorRegistry() {
return _taskGeneratorRegistry;
}

/**
* Registers a task generator.
* <p>This method can be used to plug in custom task generators.
*/
public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
_taskGeneratorRegistry.registerTaskGenerator(taskGenerator);
}

/**
* Public API to schedule tasks (all task types) for all tables. It might be called from the non-leader controller.
* Returns a map from the task type to the task scheduled.
*/
public synchronized Map<String, String> scheduleTasks() {
public synchronized Map<String, List<String>> scheduleTasks() {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false);
}

/**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
* from the task type to the task scheduled.
*/
private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) {
private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);

// Scan all table configs to get the tables with tasks enabled
Expand All @@ -500,7 +508,7 @@ private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWi
}

// Generate each type of tasks
Map<String, String> tasksScheduled = new HashMap<>();
Map<String, List<String>> tasksScheduled = new HashMap<>();
for (Map.Entry<String, List<TableConfig>> entry : enabledTableConfigMap.entrySet()) {
String taskType = entry.getKey();
List<TableConfig> enabledTableConfigs = entry.getValue();
Expand All @@ -527,7 +535,7 @@ private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWi
* Returns the task name, or {@code null} if no task is scheduled.
*/
@Nullable
private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
private List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
boolean isLeader) {
LOGGER.info("Trying to schedule task type: {}, isLeader: {}", taskGenerator.getTaskType(), isLeader);
Map<String, List<PinotTaskConfig>> pinotMinionInstanceToTaskConfigs = new HashMap<>();
Expand Down Expand Up @@ -573,6 +581,7 @@ private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig>
taskGenerator.nonLeaderCleanUp();
}
int numErrorTasksScheduled = 0;
List<String> submittedTaskNames = new ArrayList<>();
for (String minionInstanceTag : pinotMinionInstanceToTaskConfigs.keySet()) {
List<PinotTaskConfig> pinotTaskConfigs = pinotMinionInstanceToTaskConfigs.get(minionInstanceTag);
int numTasks = pinotTaskConfigs.size();
Expand All @@ -581,8 +590,10 @@ private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig>
// This might lead to lot of logs, maybe sum it up and move outside the loop
LOGGER.info("Submitting {} tasks for task type: {} to minionInstance: {} with task configs: {}", numTasks,
taskType, minionInstanceTag, pinotTaskConfigs);
_helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag, taskGenerator.getTaskTimeoutMs(),
taskGenerator.getNumConcurrentTasksPerInstance(), taskGenerator.getMaxAttemptsPerTask());
String submittedTaskName = _helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag,
taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance(),
taskGenerator.getMaxAttemptsPerTask());
submittedTaskNames.add(submittedTaskName);
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
} catch (Exception e) {
Expand All @@ -594,18 +605,18 @@ private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig>
LOGGER.warn("Failed to schedule {} tasks for task type type {}", numErrorTasksScheduled, taskType);
}
// No job got scheduled
if (numErrorTasksScheduled == pinotMinionInstanceToTaskConfigs.size()) {
if (numErrorTasksScheduled == pinotMinionInstanceToTaskConfigs.size() || submittedTaskNames.isEmpty()) {
return null;
}
// atleast one job got scheduled
return taskType;
return submittedTaskNames;
}

/**
* Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader
* controller. Returns a map from the task type to the task scheduled.
*/
public synchronized Map<String, String> scheduleTasks(String tableNameWithType) {
public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType) {
return scheduleTasks(Collections.singletonList(tableNameWithType), false);
}

Expand All @@ -614,7 +625,7 @@ public synchronized Map<String, String> scheduleTasks(String tableNameWithType)
* task name, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized String scheduleTask(String taskType) {
public synchronized List<String> scheduleTask(String taskType) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);

Expand All @@ -638,7 +649,7 @@ public synchronized String scheduleTask(String taskType) {
* controller. Returns the task name, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized String scheduleTask(String taskType, String tableNameWithType) {
public synchronized List<String> scheduleTask(String taskType, String tableNameWithType) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,13 @@ public void testOfflineTableSingleLevelConcat()
long expectedWatermark = 16000 * 86_400_000L;
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_TEST_TABLE);
int numTasks = 0;
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
List<String> taskList;
for (String tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
tasks != null;
taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
tasks = taskList != null ? taskList.get(0) : null,
numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -521,9 +525,13 @@ public void testOfflineTableSingleLevelConcatWithMetadataPush()
long expectedWatermark = 16000 * 86_400_000L;
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
int numTasks = 0;
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
List<String> taskList;
for (String tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
tasks != null;
taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
tasks = taskList != null ? taskList.get(0) : null,
numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -626,9 +634,13 @@ public void testOfflineTableSingleLevelRollup()
long expectedWatermark = 16050 * 86_400_000L;
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_ROLLUP_TEST_TABLE);
int numTasks = 0;
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
List<String> taskList;
for (String tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
tasks != null;
taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
tasks = taskList != null ? taskList.get(0) : null,
numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -774,9 +786,13 @@ public void testOfflineTableMultiLevelConcat()

String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(MULTI_LEVEL_CONCAT_TEST_TABLE);
int numTasks = 0;
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
List<String> taskList;
for (String tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
tasks != null;
taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
tasks = taskList != null ? taskList.get(0) : null,
numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -903,9 +919,13 @@ public void testRealtimeTableSingleLevelConcat()
long expectedWatermark = 16000 * 86_400_000L;
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
int numTasks = 0;
for (String tasks = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
List<String> taskList;
for (String tasks =
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
tasks != null;
taskList = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
tasks = taskList != null ? taskList.get(0) : null,
numTasks++) {
// assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -1005,9 +1025,13 @@ public void testRealtimeTableProcessAllModeMultiLevelConcat()
long[] expectedNumBucketsToProcess200Days = {0, 0, 1, 1, 0, 0, 1, 1};
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
int numTasks = 0;
for (String tasks = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
List<String> taskList;
for (String tasks =
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
tasks != null; taskList =
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
tasks = taskList != null ? taskList.get(0) : null,
numTasks++) {
assertTrue(helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));

Expand Down Expand Up @@ -1044,9 +1068,12 @@ public void testRealtimeTableProcessAllModeMultiLevelConcat()
uploadSegments(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, TableType.REALTIME, _tarDir5);
waitForAllDocsLoaded(600_000L);

for (String tasks = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
for (String tasks =
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
tasks != null; taskList =
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
tasks = taskList != null ? taskList.get(0) : null,
numTasks++) {
waitForTaskToComplete();
// Check metrics
long numBucketsToProcess = MetricValueUtils.getGaugeValue(_controllerStarter.getControllerMetrics(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.model.HelixConfigScope;
Expand Down Expand Up @@ -134,24 +135,26 @@ public void testStopResumeDeleteTaskQueue() {
// No tasks before we start.
assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0);

// Should create the task queues and generate a task
String task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
// Should create the task queues and generate a task in the same minion instance
List<String> task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
assertNotNull(task1);
assertEquals(task1.size(), 1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE)));
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1));
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1.get(0)));

// Since we have two tables, two sub-tasks are generated -- one for each table.
// The default concurrent sub-tasks per minion instance is 1, and we have one minion
// instance spun up. So, one sub-tasks gets scheduled in a minion, and the other one
// waits.
verifyTaskCount(task1, 0, 1, 1, 2);
verifyTaskCount(task1.get(0), 0, 1, 1, 2);
// Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait
// since we have one minion instance that is still running one of the sub-tasks.
String task2 = _taskManager.scheduleTask(TASK_TYPE);
List<String> task2 = _taskManager.scheduleTask(TASK_TYPE);
assertNotNull(task2);
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2));
verifyTaskCount(task2, 0, 2, 0, 2);
assertEquals(task2.size(), 1);
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0)));
verifyTaskCount(task2.get(0), 0, 2, 0, 2);

// Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
// Our test task generator does not generate if there are already this many sub-tasks in the
Expand Down Expand Up @@ -219,8 +222,8 @@ public void testStopResumeDeleteTaskQueue() {

// Task deletion requires the task queue to be stopped,
// so deleting task1 here before resuming the task queue.
assertTrue(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
_helixTaskResourceManager.deleteTask(task1, false);
assertTrue(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1.get(0)));
_helixTaskResourceManager.deleteTask(task1.get(0), false);
// Resume the task queue, and let the task complete
_helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
HOLD.set(false);
Expand All @@ -234,7 +237,7 @@ public void testStopResumeDeleteTaskQueue() {
}
}
// Task deletion happens eventually along with other state transitions.
assertFalse(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
assertFalse(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1.get(0)));
assertEquals(taskStates.size(), (NUM_TASKS - 1));
assertTrue(TASK_START_NOTIFIED.get());
assertTrue(TASK_SUCCESS_NOTIFIED.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String getTaskType() {

@Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
assertEquals(tableConfigs.size(), SimpleMinionClusterIntegrationTest.NUM_TASKS);
assertEquals(tableConfigs.size(), 1);

// Generate at most 2 tasks
if (_clusterInfoAccessor.getTaskStates(SimpleMinionClusterIntegrationTest.TASK_TYPE).size()
Expand Down

0 comments on commit 992145b

Please sign in to comment.