From 992145b80909fc371f788c477d7625d02588051d Mon Sep 17 00:00:00 2001 From: tibrewalpratik Date: Thu, 29 Feb 2024 16:05:10 +0530 Subject: [PATCH] refactor integration-tests --- .../resources/PinotTaskRestletResource.java | 9 ++- .../helix/core/minion/PinotTaskManager.java | 33 ++++++--- ...rgeRollupMinionClusterIntegrationTest.java | 69 +++++++++++++------ .../SimpleMinionClusterIntegrationTest.java | 23 ++++--- .../minion/tasks/TestTaskGenerator.java | 2 +- 5 files changed, 88 insertions(+), 48 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java index a727c858e20d..37ead699b404 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java @@ -667,13 +667,12 @@ public Map getCronSchedulerJobDetails( @Produces(MediaType.APPLICATION_JSON) @Authenticate(AccessType.UPDATE) @ApiOperation("Schedule tasks and return a map from task type to task name scheduled") - public Map scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType, + public Map> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType, @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) { if (taskType != null) { // Schedule task for the given task type - String taskName = tableName != null ? _pinotTaskManager.scheduleTask(taskType, tableName) - : _pinotTaskManager.scheduleTask(taskType); - return Collections.singletonMap(taskType, taskName); + return Collections.singletonMap(taskType, tableName != null ? _pinotTaskManager.scheduleTask(taskType, tableName) + : _pinotTaskManager.scheduleTask(taskType)); } else { // Schedule tasks for all task types return tableName != null ? _pinotTaskManager.scheduleTasks(tableName) : _pinotTaskManager.scheduleTasks(); @@ -718,7 +717,7 @@ public void executeAdhocTask(AdhocTaskConfig adhocTaskConfig, @Suspended AsyncRe @Produces(MediaType.APPLICATION_JSON) @Authenticate(AccessType.UPDATE) @ApiOperation("Schedule tasks (deprecated)") - public Map scheduleTasksDeprecated() { + public Map> scheduleTasksDeprecated() { return _pinotTaskManager.scheduleTasks(); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 7e8e3f7497ca..64df4a5952cc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -472,11 +472,19 @@ public TaskGeneratorRegistry getTaskGeneratorRegistry() { return _taskGeneratorRegistry; } + /** + * Registers a task generator. + *

This method can be used to plug in custom task generators. + */ + public void registerTaskGenerator(PinotTaskGenerator taskGenerator) { + _taskGeneratorRegistry.registerTaskGenerator(taskGenerator); + } + /** * Public API to schedule tasks (all task types) for all tables. It might be called from the non-leader controller. * Returns a map from the task type to the task scheduled. */ - public synchronized Map scheduleTasks() { + public synchronized Map> scheduleTasks() { return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false); } @@ -484,7 +492,7 @@ public synchronized Map scheduleTasks() { * Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map * from the task type to the task scheduled. */ - private synchronized Map scheduleTasks(List tableNamesWithType, boolean isLeader) { + private synchronized Map> scheduleTasks(List tableNamesWithType, boolean isLeader) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L); // Scan all table configs to get the tables with tasks enabled @@ -500,7 +508,7 @@ private synchronized Map scheduleTasks(List tableNamesWi } // Generate each type of tasks - Map tasksScheduled = new HashMap<>(); + Map> tasksScheduled = new HashMap<>(); for (Map.Entry> entry : enabledTableConfigMap.entrySet()) { String taskType = entry.getKey(); List enabledTableConfigs = entry.getValue(); @@ -527,7 +535,7 @@ private synchronized Map scheduleTasks(List tableNamesWi * Returns the task name, or {@code null} if no task is scheduled. */ @Nullable - private String scheduleTask(PinotTaskGenerator taskGenerator, List enabledTableConfigs, + private List scheduleTask(PinotTaskGenerator taskGenerator, List enabledTableConfigs, boolean isLeader) { LOGGER.info("Trying to schedule task type: {}, isLeader: {}", taskGenerator.getTaskType(), isLeader); Map> pinotMinionInstanceToTaskConfigs = new HashMap<>(); @@ -573,6 +581,7 @@ private String scheduleTask(PinotTaskGenerator taskGenerator, List taskGenerator.nonLeaderCleanUp(); } int numErrorTasksScheduled = 0; + List submittedTaskNames = new ArrayList<>(); for (String minionInstanceTag : pinotMinionInstanceToTaskConfigs.keySet()) { List pinotTaskConfigs = pinotMinionInstanceToTaskConfigs.get(minionInstanceTag); int numTasks = pinotTaskConfigs.size(); @@ -581,8 +590,10 @@ private String scheduleTask(PinotTaskGenerator taskGenerator, List // This might lead to lot of logs, maybe sum it up and move outside the loop LOGGER.info("Submitting {} tasks for task type: {} to minionInstance: {} with task configs: {}", numTasks, taskType, minionInstanceTag, pinotTaskConfigs); - _helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag, taskGenerator.getTaskTimeoutMs(), - taskGenerator.getNumConcurrentTasksPerInstance(), taskGenerator.getMaxAttemptsPerTask()); + String submittedTaskName = _helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag, + taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance(), + taskGenerator.getMaxAttemptsPerTask()); + submittedTaskNames.add(submittedTaskName); _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks); } } catch (Exception e) { @@ -594,18 +605,18 @@ private String scheduleTask(PinotTaskGenerator taskGenerator, List LOGGER.warn("Failed to schedule {} tasks for task type type {}", numErrorTasksScheduled, taskType); } // No job got scheduled - if (numErrorTasksScheduled == pinotMinionInstanceToTaskConfigs.size()) { + if (numErrorTasksScheduled == pinotMinionInstanceToTaskConfigs.size() || submittedTaskNames.isEmpty()) { return null; } // atleast one job got scheduled - return taskType; + return submittedTaskNames; } /** * Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader * controller. Returns a map from the task type to the task scheduled. */ - public synchronized Map scheduleTasks(String tableNameWithType) { + public synchronized Map> scheduleTasks(String tableNameWithType) { return scheduleTasks(Collections.singletonList(tableNameWithType), false); } @@ -614,7 +625,7 @@ public synchronized Map scheduleTasks(String tableNameWithType) * task name, or {@code null} if no task is scheduled. */ @Nullable - public synchronized String scheduleTask(String taskType) { + public synchronized List scheduleTask(String taskType) { PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); @@ -638,7 +649,7 @@ public synchronized String scheduleTask(String taskType) { * controller. Returns the task name, or {@code null} if no task is scheduled. */ @Nullable - public synchronized String scheduleTask(String taskType, String tableNameWithType) { + public synchronized List scheduleTask(String taskType, String tableNameWithType) { PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java index 7eb32ccb0a6b..b655416c878c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java @@ -409,9 +409,13 @@ public void testOfflineTableSingleLevelConcat() long expectedWatermark = 16000 * 86_400_000L; String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_TEST_TABLE); int numTasks = 0; - for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); - tasks != null; tasks = - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + List taskList; + for (String tasks = + _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); + tasks != null; + taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, + numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); @@ -521,9 +525,13 @@ public void testOfflineTableSingleLevelConcatWithMetadataPush() long expectedWatermark = 16000 * 86_400_000L; String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE); int numTasks = 0; - for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); - tasks != null; tasks = - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + List taskList; + for (String tasks = + _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); + tasks != null; + taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, + numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); @@ -626,9 +634,13 @@ public void testOfflineTableSingleLevelRollup() long expectedWatermark = 16050 * 86_400_000L; String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_ROLLUP_TEST_TABLE); int numTasks = 0; - for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); - tasks != null; tasks = - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + List taskList; + for (String tasks = + _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); + tasks != null; + taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, + numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); @@ -774,9 +786,13 @@ public void testOfflineTableMultiLevelConcat() String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(MULTI_LEVEL_CONCAT_TEST_TABLE); int numTasks = 0; - for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); - tasks != null; tasks = - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + List taskList; + for (String tasks = + _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); + tasks != null; + taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, + numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); @@ -903,9 +919,13 @@ public void testRealtimeTableSingleLevelConcat() long expectedWatermark = 16000 * 86_400_000L; String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); int numTasks = 0; - for (String tasks = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); - tasks != null; tasks = - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + List taskList; + for (String tasks = + taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); + tasks != null; + taskList = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, + numTasks++) { // assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); @@ -1005,9 +1025,13 @@ public void testRealtimeTableProcessAllModeMultiLevelConcat() long[] expectedNumBucketsToProcess200Days = {0, 0, 1, 1, 0, 0, 1, 1}; String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); int numTasks = 0; - for (String tasks = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); - tasks != null; tasks = - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + List taskList; + for (String tasks = + taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); + tasks != null; taskList = + taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, + numTasks++) { assertTrue(helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); @@ -1044,9 +1068,12 @@ public void testRealtimeTableProcessAllModeMultiLevelConcat() uploadSegments(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, TableType.REALTIME, _tarDir5); waitForAllDocsLoaded(600_000L); - for (String tasks = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); - tasks != null; tasks = - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + for (String tasks = + taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); + tasks != null; taskList = + taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, + numTasks++) { waitForTaskToComplete(); // Check metrics long numBucketsToProcess = MetricValueUtils.getGaugeValue(_controllerStarter.getControllerMetrics(), diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index 86b4231776cd..1db953f00f40 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.helix.model.HelixConfigScope; @@ -134,24 +135,26 @@ public void testStopResumeDeleteTaskQueue() { // No tasks before we start. assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0); - // Should create the task queues and generate a task - String task1 = _taskManager.scheduleTasks().get(TASK_TYPE); + // Should create the task queues and generate a task in the same minion instance + List task1 = _taskManager.scheduleTasks().get(TASK_TYPE); assertNotNull(task1); + assertEquals(task1.size(), 1); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE))); - assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1)); + assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1.get(0))); // Since we have two tables, two sub-tasks are generated -- one for each table. // The default concurrent sub-tasks per minion instance is 1, and we have one minion // instance spun up. So, one sub-tasks gets scheduled in a minion, and the other one // waits. - verifyTaskCount(task1, 0, 1, 1, 2); + verifyTaskCount(task1.get(0), 0, 1, 1, 2); // Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait // since we have one minion instance that is still running one of the sub-tasks. - String task2 = _taskManager.scheduleTask(TASK_TYPE); + List task2 = _taskManager.scheduleTask(TASK_TYPE); assertNotNull(task2); - assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2)); - verifyTaskCount(task2, 0, 2, 0, 2); + assertEquals(task2.size(), 1); + assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0))); + verifyTaskCount(task2.get(0), 0, 2, 0, 2); // Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2. // Our test task generator does not generate if there are already this many sub-tasks in the @@ -219,8 +222,8 @@ public void testStopResumeDeleteTaskQueue() { // Task deletion requires the task queue to be stopped, // so deleting task1 here before resuming the task queue. - assertTrue(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1)); - _helixTaskResourceManager.deleteTask(task1, false); + assertTrue(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1.get(0))); + _helixTaskResourceManager.deleteTask(task1.get(0), false); // Resume the task queue, and let the task complete _helixTaskResourceManager.resumeTaskQueue(TASK_TYPE); HOLD.set(false); @@ -234,7 +237,7 @@ public void testStopResumeDeleteTaskQueue() { } } // Task deletion happens eventually along with other state transitions. - assertFalse(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1)); + assertFalse(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1.get(0))); assertEquals(taskStates.size(), (NUM_TASKS - 1)); assertTrue(TASK_START_NOTIFIED.get()); assertTrue(TASK_SUCCESS_NOTIFIED.get()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java index 1efa7fb37079..644e9bd9c890 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java @@ -45,7 +45,7 @@ public String getTaskType() { @Override public List generateTasks(List tableConfigs) { - assertEquals(tableConfigs.size(), SimpleMinionClusterIntegrationTest.NUM_TASKS); + assertEquals(tableConfigs.size(), 1); // Generate at most 2 tasks if (_clusterInfoAccessor.getTaskStates(SimpleMinionClusterIntegrationTest.TASK_TYPE).size()