Skip to content

Commit

Permalink
update ControllerJobType from enum to string (#12518)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhtaoxiang authored Feb 29, 2024
1 parent ac13a19 commit 704f73d
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
Expand Down Expand Up @@ -174,8 +173,8 @@ public static String constructPropertyStorePathForResource(String resourceName)
return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName);
}

public static String constructPropertyStorePathForControllerJob(ControllerJobType jobType) {
return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType.name());
public static String constructPropertyStorePathForControllerJob(String jobType) {
return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType);
}

public static String constructPropertyStorePathForResourceConfig(String resourceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
*/
package org.apache.pinot.common.metadata.controllerjob;

public enum ControllerJobType {
RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE
import com.google.common.collect.ImmutableSet;
import java.util.Set;


public class ControllerJobType {
private ControllerJobType() {
}
public static final String RELOAD_SEGMENT = "RELOAD_SEGMENT";
public static final String FORCE_COMMIT = "FORCE_COMMIT";
public static final String TABLE_REBALANCE = "TABLE_REBALANCE";
public static final String TENANT_REBALANCE = "TENANT_REBALANCE";
public static final Set<String>
VALID_CONTROLLER_JOB_TYPE = ImmutableSet.of(RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1082,20 +1082,15 @@ public Map<String, Map<String, String>> getControllerJobs(
List<String> tableNamesWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
LOGGER);
Set<ControllerJobType> validJobTypes =
java.util.Arrays.stream(ControllerJobType.values()).collect(Collectors.toSet());
Set<ControllerJobType> jobTypesToFilter = null;
Set<String> jobTypesToFilter = null;
if (StringUtils.isNotEmpty(jobTypesString)) {
try {
jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ','))).stream()
.map(type -> ControllerJobType.valueOf(type)).collect(Collectors.toSet());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Valid Types are: " + validJobTypes);
}
jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ',')))
.stream().collect(Collectors.toSet());
}
Map<String, Map<String, String>> result = new HashMap<>();
for (String tableNameWithType : tableNamesWithType) {
result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == null ? validJobTypes : jobTypesToFilter,
result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == null
? ControllerJobType.VALID_CONTROLLER_JOB_TYPE : jobTypesToFilter,
jobMetadata -> jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
.equals(tableNameWithType)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2036,7 +2036,7 @@ public PinotResourceManagerResponse toggleTableState(String tableNameWithType, S
* @return Map representing the job's ZK properties
*/
@Nullable
public Map<String, String> getControllerJobZKMetadata(String jobId, ControllerJobType jobType) {
public Map<String, String> getControllerJobZKMetadata(String jobId, String jobType) {
String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT);
return jobsZnRecord != null ? jobsZnRecord.getMapFields().get(jobId) : null;
Expand All @@ -2046,10 +2046,10 @@ public Map<String, String> getControllerJobZKMetadata(String jobId, ControllerJo
* Returns a Map of jobId to job's ZK metadata that passes the checker, like for specific tables.
* @return A Map of jobId to job properties
*/
public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> jobTypes,
public Map<String, Map<String, String>> getAllJobs(Set<String> jobTypes,
Predicate<Map<String, String>> jobMetadataChecker) {
Map<String, Map<String, String>> controllerJobs = new HashMap<>();
for (ControllerJobType jobType : jobTypes) {
for (String jobType : jobTypes) {
String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT);
if (jobsZnRecord == null) {
Expand All @@ -2059,8 +2059,8 @@ public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> jobTyp
for (Map.Entry<String, Map<String, String>> jobMetadataEntry : jobMetadataMap.entrySet()) {
String jobId = jobMetadataEntry.getKey();
Map<String, String> jobMetadata = jobMetadataEntry.getValue();
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType.name()),
"Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType.name(), jobResourcePath, jobId);
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType),
"Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType, jobResourcePath, jobId);
if (jobMetadataChecker.test(jobMetadata)) {
controllerJobs.put(jobId, jobMetadata);
}
Expand All @@ -2083,7 +2083,7 @@ public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentNa
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numMessagesSent));
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName);
Expand All @@ -2096,7 +2096,7 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT.toString());
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
JsonUtils.objectToString(consumingSegmentsCommitted));
Expand All @@ -2116,7 +2116,7 @@ public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numberOfMessagesSent));
return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT);
Expand All @@ -2129,7 +2129,7 @@ public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId
* @param jobType the type of the job to figure out where job metadata is kept in ZK
* @return boolean representing success / failure of the ZK write step
*/
public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType) {
public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobType) {
return addControllerJobToZK(jobId, jobMetadata, jobType, prev -> true);
}

Expand All @@ -2141,7 +2141,7 @@ public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadat
* @param prevJobMetadataChecker to check the previous job metadata before adding new one
* @return boolean representing success / failure of the ZK write step
*/
public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType,
public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobType,
Predicate<Map<String, String>> prevJobMetadataChecker) {
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS) != null,
"Submission Time in JobMetadata record not set. Cannot expire these records");
Expand Down Expand Up @@ -2178,7 +2178,7 @@ public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadat
* @param updater to modify the job metadata in place
* @return boolean representing success / failure of the ZK write step
*/
public boolean updateJobsForTable(String tableNameWithType, ControllerJobType jobType,
public boolean updateJobsForTable(String tableNameWithType, String jobType,
Consumer<Map<String, String>> updater) {
String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
Stat stat = new Stat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ static Map<String, String> createJobMetadata(String tableNameWithType, String jo
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name());
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE);
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(tableRebalanceProgressStats));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void trackStatsInZk() {
jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE.name());
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE);
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(_progressStats));
Expand Down

0 comments on commit 704f73d

Please sign in to comment.