Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PinotTaskManager class #12964

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -618,22 +618,20 @@ public Map<String, Object> getCronSchedulerJobDetails(
@ApiOperation("Schedule tasks and return a map from task type to task name scheduled")
public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
@ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName,
@ApiParam(value = "Minion Instance tag to schedule the task explicitly on")
@QueryParam("minionInstanceTag") @Nullable String minionInstanceTag,
@Context HttpHeaders headers) {
@ApiParam(value = "Minion Instance tag to schedule the task explicitly on") @QueryParam("minionInstanceTag")
@Nullable String minionInstanceTag, @Context HttpHeaders headers) {
String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE;
if (taskType != null) {
// Schedule task for the given task type
List<String> taskNames = tableName != null
? _pinotTaskManager.scheduleTask(taskType,
List<String> taskNames = tableName != null ? _pinotTaskManager.scheduleTaskForTable(taskType,
DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
: _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag);
return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ','));
} else {
// Schedule tasks for all task types
Map<String, List<String>> allTaskNames = tableName != null
? _pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
: _pinotTaskManager.scheduleTasksForDatabase(database, minionInstanceTag);
Map<String, List<String>> allTaskNames = tableName != null ? _pinotTaskManager.scheduleAllTasksForTable(
DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
: _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag);
return allTaskNames.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void execute(JobExecutionContext jobExecutionContext)
return;
}
long jobStartTime = System.currentTimeMillis();
pinotTaskManager.scheduleTask(taskType, table);
pinotTaskManager.scheduleTaskForTable(taskType, table, null);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
jobExecutionContext.getNextFireTime());
pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table, taskType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -480,30 +479,72 @@ public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
}

/**
* Public API to schedule tasks (all task types) for all tables in all databases.
* Schedules tasks (all task types) for all tables.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleTasks() {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, null);
public synchronized Map<String, List<String>> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, minionInstanceTag);
}

/**
* Public API to schedule tasks (all task types) for all tables in given database.
* Schedules tasks (all task types) for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleTasksForDatabase(@Nullable String database,
public synchronized Map<String, List<String>> scheduleAllTasksForDatabase(@Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag);
}

/**
* Schedules tasks (all task types) for the given table.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForTable(String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
}

/**
* Schedules task for the given task type for all tables.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized List<String> scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
}

/**
* Schedules task for the given task type for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
}

/**
* Schedules task for the given task type for the give table.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized List<String> scheduleTaskForTable(String taskType, String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, List.of(tableNameWithType), minionInstanceTag);
}

/**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
* from the task type to the list of the tasks scheduled.
*/
private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType,
boolean isLeader, @Nullable String minionInstanceTag) {
private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader,
@Nullable String minionInstanceTag) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);

// Scan all table configs to get the tables with tasks enabled
Expand Down Expand Up @@ -541,6 +582,27 @@ private synchronized Map<String, List<String>> scheduleTasks(List<String> tableN
return tasksScheduled;
}

@Nullable
private synchronized List<String> scheduleTask(String taskType, List<String> tables,
@Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);

// Scan all table configs to get the tables with task enabled
List<TableConfig> enabledTableConfigs = new ArrayList<>();
for (String tableNameWithType : tables) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
.isTaskTypeEnabled(taskType)) {
enabledTableConfigs.add(tableConfig);
}
}

_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag);
}

/**
* Helper method to schedule task with the given task generator for the given tables that have the task enabled.
* Returns the list of task names, or {@code null} if no task is scheduled.
Expand All @@ -554,8 +616,8 @@ private List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableCo
for (TableConfig tableConfig : enabledTableConfigs) {
String tableName = tableConfig.getTableName();
try {
String minionInstanceTag = minionInstanceTagForTask != null
? minionInstanceTagForTask : taskGenerator.getMinionInstanceTag(tableConfig);
String minionInstanceTag = minionInstanceTagForTask != null ? minionInstanceTagForTask
: taskGenerator.getMinionInstanceTag(tableConfig);
List<PinotTaskConfig> presentTaskConfig =
minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag, k -> new ArrayList<>());
taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
Expand Down Expand Up @@ -622,86 +684,6 @@ private List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableCo
return submittedTaskNames;
}

/**
* Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader
* controller. Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType) {
return scheduleTasks(Collections.singletonList(tableNameWithType), false, null);
}

/**
* Public API to schedule tasks (all task types) for the given table on a specific instance tag.
* It might be called from the non-leader controller. Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTasks(Collections.singletonList(tableNameWithType), false, minionInstanceTag);
}

/**
* Public API to schedule task for the given task type in all databases.
* It might be called from the non-leader controller.
* Returns the list of task names, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized List<String> scheduleTask(String taskType, @Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
}

/**
* Public API to schedule task for the given task type in given database.
* It might be called from the non-leader controller.
* Returns the list of task name, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
}

@Nullable
private List<String> scheduleTask(String taskType, List<String> tables, @Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);

// Scan all table configs to get the tables with task enabled
List<TableConfig> enabledTableConfigs = new ArrayList<>();
for (String tableNameWithType : tables) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
.isTaskTypeEnabled(taskType)) {
enabledTableConfigs.add(tableConfig);
}
}

_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag);
}

/**
* Public API to schedule task for the given task type on the given table. It might be called from the non-leader
* controller. Returns the list of task names, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized List<String> scheduleTask(String taskType, String tableNameWithType,
@Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);

TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType);

Preconditions.checkState(
tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType),
"Table: %s does not have task type: %s enabled", tableNameWithType, taskType);

_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false, minionInstanceTag);
}

@Override
protected void processTables(List<String> tableNamesWithType, Properties taskProperties) {
scheduleTasks(tableNamesWithType, true, null);
Expand Down
Loading
Loading