Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add minionInstanceTag config to minion-tasks for resource isolation #12459

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -622,15 +622,17 @@ public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryPa
String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE;
if (taskType != null) {
// Schedule task for the given task type
String taskName = tableName != null
List<String> taskNames = tableName != null
? _pinotTaskManager.scheduleTask(taskType, DatabaseUtils.translateTableName(tableName, headers))
: _pinotTaskManager.scheduleTask(taskType, database);
return Collections.singletonMap(taskType, taskName);
return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ','));
} else {
// Schedule tasks for all task types
return tableName != null
Map<String, List<String>> allTaskNames = tableName != null
? _pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, headers))
: _pinotTaskManager.scheduleTasksForDatabase(database);
return allTaskNames.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue())));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
public final static String MAX_CRON_SCHEDULE_DELAY_IN_SECONDS = "MaxCronScheduleDelayInSeconds";
public final static String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
public final static String SCHEDULE_KEY = "schedule";
public final static String MINION_INSTANCE_TAG_CONFIG = "minionInstanceTag";

private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
Expand Down Expand Up @@ -159,7 +160,7 @@ public Map<String, String> createTask(String taskType, String tableName, @Nullab
LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
}
String minionInstanceTag =
taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
taskConfigs.getOrDefault(MINION_INSTANCE_TAG_CONFIG, CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
Expand Down Expand Up @@ -481,27 +482,26 @@ public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
/**
* Public API to schedule tasks (all task types) for all tables in default database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the task scheduled.
* Returns a map from the task type to the list of tasks scheduled.
*/
@Deprecated
public synchronized Map<String, String> scheduleTasks() {
public synchronized Map<String, List<String>> scheduleTasks() {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(CommonConstants.DEFAULT_DATABASE), false);
}

/**
* Public API to schedule tasks (all task types) for all tables in given database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the task scheduled.
* Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, String> scheduleTasksForDatabase(String database) {
public synchronized Map<String, List<String>> scheduleTasksForDatabase(String database) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false);
}

/**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
* from the task type to the task scheduled.
* from the task type to the list of the tasks scheduled.
*/
private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) {
private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);

// Scan all table configs to get the tables with tasks enabled
Expand All @@ -517,7 +517,7 @@ private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWi
}

// Generate each type of tasks
Map<String, String> tasksScheduled = new HashMap<>();
Map<String, List<String>> tasksScheduled = new HashMap<>();
for (Map.Entry<String, List<TableConfig>> entry : enabledTableConfigMap.entrySet()) {
String taskType = entry.getKey();
List<TableConfig> enabledTableConfigs = entry.getValue();
Expand All @@ -541,90 +541,110 @@ private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWi

/**
* Helper method to schedule task with the given task generator for the given tables that have the task enabled.
* Returns the task name, or {@code null} if no task is scheduled.
* Returns the list of task names, or {@code null} if no task is scheduled.
*/
@Nullable
private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
private List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
boolean isLeader) {
LOGGER.info("Trying to schedule task type: {}, isLeader: {}", taskGenerator.getTaskType(), isLeader);
List<PinotTaskConfig> pinotTaskConfigs;
try {
/* TODO taskGenerator may skip generating tasks for some of the tables being passed to it.
In that case, we should not be storing success timestamps for those table. Same with exceptions that should
only be associated with the table for which it was raised and not every eligible table. We can have the
generateTasks() return a list of TaskGeneratorMostRecentRunInfo for each table
*/
pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
long successRunTimestamp = System.currentTimeMillis();
for (TableConfig tableConfig : enabledTableConfigs) {
_taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), taskGenerator.getTaskType(),
Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new HashMap<>();
String taskType = taskGenerator.getTaskType();
for (TableConfig tableConfig : enabledTableConfigs) {
String tableName = tableConfig.getTableName();
try {
String minionInstanceTag = taskGenerator.getMinionInstanceTag(tableConfig);
List<PinotTaskConfig> presentTaskConfig =
minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag, k -> new ArrayList<>());
taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
minionInstanceTagToTaskConfigs.put(minionInstanceTag, presentTaskConfig);
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved
long successRunTimestamp = System.currentTimeMillis();
_taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addSuccessRunTs(successRunTimestamp));
// before the first task schedule, the follow two gauge metrics will be empty
// TODO: find a better way to report task generation information
_controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(),
_controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION,
() -> System.currentTimeMillis() - successRunTimestamp);
_controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(),
_controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 0L);
}
} catch (Exception e) {
StringWriter errors = new StringWriter();
try (PrintWriter pw = new PrintWriter(errors)) {
e.printStackTrace(pw);
}
long successRunTimestamp = System.currentTimeMillis();
for (TableConfig tableConfig : enabledTableConfigs) {
_taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), taskGenerator.getTaskType(),
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(successRunTimestamp,
} catch (Exception e) {
StringWriter errors = new StringWriter();
try (PrintWriter pw = new PrintWriter(errors)) {
e.printStackTrace(pw);
}
long failureRunTimestamp = System.currentTimeMillis();
_taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(failureRunTimestamp,
errors.toString()));
// before the first task schedule, the follow gauge metric will be empty
// TODO: find a better way to report task generation information
_controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(),
_controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 1L);
LOGGER.error("Failed to generate tasks for task type {} for table {}", taskType, tableName, e);
}
throw e;
}
if (!isLeader) {
taskGenerator.nonLeaderCleanUp();
}
String taskType = taskGenerator.getTaskType();
int numTasks = pinotTaskConfigs.size();
if (numTasks > 0) {
LOGGER.info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType, pinotTaskConfigs);
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
return _helixTaskResourceManager.submitTask(pinotTaskConfigs, taskGenerator.getTaskTimeoutMs(),
taskGenerator.getNumConcurrentTasksPerInstance(), taskGenerator.getMaxAttemptsPerTask());
}
LOGGER.info("No task to schedule for task type: {}", taskType);
return null;
int numErrorTasksScheduled = 0;
List<String> submittedTaskNames = new ArrayList<>();
for (String minionInstanceTag : minionInstanceTagToTaskConfigs.keySet()) {
List<PinotTaskConfig> pinotTaskConfigs = minionInstanceTagToTaskConfigs.get(minionInstanceTag);
int numTasks = pinotTaskConfigs.size();
try {
if (numTasks > 0) {
// This might lead to lot of logs, maybe sum it up and move outside the loop
LOGGER.info("Submitting {} tasks for task type: {} to minionInstance: {} with task configs: {}", numTasks,
taskType, minionInstanceTag, pinotTaskConfigs);
String submittedTaskName = _helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag,
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved
taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance(),
taskGenerator.getMaxAttemptsPerTask());
submittedTaskNames.add(submittedTaskName);
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
} catch (Exception e) {
numErrorTasksScheduled++;
LOGGER.error("Failed to schedule task type {} on minion instance {} with task configs: {}",
taskType, minionInstanceTag, pinotTaskConfigs, e);
}
}
if (numErrorTasksScheduled > 0) {
LOGGER.warn("Failed to schedule {} tasks for task type type {}", numErrorTasksScheduled, taskType);
}
// No job got scheduled
if (numErrorTasksScheduled == minionInstanceTagToTaskConfigs.size() || submittedTaskNames.isEmpty()) {
return null;
}
// atleast one job got scheduled
return submittedTaskNames;
}

/**
* Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader
* controller. Returns a map from the task type to the task scheduled.
* controller. Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, String> scheduleTasks(String tableNameWithType) {
public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType) {
return scheduleTasks(Collections.singletonList(tableNameWithType), false);
}

/**
* Public API to schedule task for the given task type in default database.
* It might be called from the non-leader controller.
* Returns the task name, or {@code null} if no task is scheduled.
* Returns the list of task names, or {@code null} if no task is scheduled.
*/
@Deprecated
@Nullable
public synchronized String scheduleTask(String taskType) {
public synchronized List<String> scheduleTask(String taskType) {
return scheduleTaskForDatabase(taskType, CommonConstants.DEFAULT_DATABASE);
}

/**
* Public API to schedule task for the given task type in given database.
* It might be called from the non-leader controller.
* Returns the task name, or {@code null} if no task is scheduled.
* Returns the list of task name, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized String scheduleTaskForDatabase(String taskType, String database) {
public synchronized List<String> scheduleTaskForDatabase(String taskType, String database) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);

Expand All @@ -645,10 +665,10 @@ public synchronized String scheduleTaskForDatabase(String taskType, String datab

/**
* Public API to schedule task for the given task type on the given table. It might be called from the non-leader
* controller. Returns the task name, or {@code null} if no task is scheduled.
* controller. Returns the list of task names, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized String scheduleTask(String taskType, String tableNameWithType) {
public synchronized List<String> scheduleTask(String taskType, String tableNameWithType) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,15 @@ public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String,
throws Exception {
throw new UnknownTaskTypeException("Adhoc task generation is not supported for task type - " + this.getTaskType());
}

@Override
public void generateTasks(List<TableConfig> tableConfigs, List<PinotTaskConfig> pinotTaskConfigs)
throws Exception {
pinotTaskConfigs.addAll(generateTasks(tableConfigs));
}

@Override
public String getMinionInstanceTag(TableConfig tableConfig) {
return TaskGeneratorUtils.extractMinionInstanceTag(tableConfig, getTaskType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.CommonConstants;


/**
Expand Down Expand Up @@ -53,6 +54,11 @@ public interface PinotTaskGenerator {
List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
throws Exception;

/**
* Generates a list of task based on the given table configs, it also gets list of existing task configs
*/
void generateTasks(List<TableConfig> tableConfigs, List<PinotTaskConfig> pinotTaskConfigs) throws Exception;

/**
* Returns the timeout in milliseconds for each task, 3600000 (1 hour) by default.
*/
Expand All @@ -79,4 +85,11 @@ default int getMaxAttemptsPerTask() {
*/
default void nonLeaderCleanUp() {
}

/**
* Gets the minionInstanceTag for the tableConfig
*/
default String getMinionInstanceTag(TableConfig tableConfig) {
return CommonConstants.Helix.UNTAGGED_MINION_INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.data.Segment;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants;


public class TaskGeneratorUtils {
Expand Down Expand Up @@ -106,4 +110,19 @@ public static void forRunningTasks(String tableNameWithType, String taskType, Cl
}
}
}

/**
* Extract minionInstanceTag from the task config map. Returns "minion_untagged" in case of no config found.
*/
public static String extractMinionInstanceTag(TableConfig tableConfig, String taskType) {
TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
if (tableTaskConfig != null) {
Map<String, String> configs = tableTaskConfig.getConfigsForTaskType(taskType);
if (configs != null && !configs.isEmpty()) {
return configs.getOrDefault(PinotTaskManager.MINION_INSTANCE_TAG_CONFIG,
CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
}
}
return CommonConstants.Helix.UNTAGGED_MINION_INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -98,4 +104,36 @@ private static ClusterInfoAccessor createMockClusterInfoAccessor() {
when(mockClusterInfoAcessor.getPinotHelixResourceManager()).thenReturn(mockHelixResourceManager);
return mockClusterInfoAcessor;
}

@Test
public void testExtractMinionInstanceTag() {
// correct minionInstanceTag extraction
Map<String, String> tableTaskConfigs = new HashMap<>();
tableTaskConfigs.put("100days.mergeType", "concat");
tableTaskConfigs.put("100days.bufferTimePeriod", "1d");
tableTaskConfigs.put("100days.bucketTimePeriod", "100d");
tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000");
tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000");
tableTaskConfigs.put(PinotTaskManager.MINION_INSTANCE_TAG_CONFIG, "minionInstance1");
TableTaskConfig tableTaskConfig =
new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("sampleTable")
.setTaskConfig(tableTaskConfig).build();
assertEquals(TaskGeneratorUtils.extractMinionInstanceTag(tableConfig,
MinionConstants.MergeRollupTask.TASK_TYPE), "minionInstance1");

// no minionInstanceTag passed
tableTaskConfigs = new HashMap<>();
tableTaskConfigs.put("100days.mergeType", "concat");
tableTaskConfigs.put("100days.bufferTimePeriod", "1d");
tableTaskConfigs.put("100days.bucketTimePeriod", "100d");
tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000");
tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000");
tableTaskConfig =
new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("sampleTable")
.setTaskConfig(tableTaskConfig).build();
assertEquals(TaskGeneratorUtils.extractMinionInstanceTag(tableConfig,
MinionConstants.MergeRollupTask.TASK_TYPE), CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
}
}
Loading
Loading