Skip to content

Commit

Permalink
Add retry policy to wait for job id to persist during rebalancing (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
tahaboty-stripe authored Jun 18, 2024
1 parent 5ec7028 commit abaf3c8
Showing 1 changed file with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ public RebalanceResult rebalance(
LOGGER.error("Caught exception/error while rebalancing table: {}", tableNameWithType, t);
}
});
waitForJobIdToPersist(dryRunResult.getJobId(), tableNameWithType);
return new RebalanceResult(dryRunResult.getJobId(), RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(),
dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment());
Expand All @@ -697,6 +698,25 @@ public RebalanceResult rebalance(
}
}

/**
* Waits for jobId to be persisted using a retry policy.
* Tables with 100k+ segments take up to a few seconds for the jobId to persist. This ensures the jobId is present
* before returning the jobId to the caller, so they can correctly poll the jobId.
*/
public void waitForJobIdToPersist(String jobId, String tableNameWithType) {
try {
// This retry policy waits at most for 7.5s to 15s in total. This is chosen to cover typical delays for tables
// with many segments and avoid excessive HTTP request timeouts.
RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0).attempt(() -> getControllerJobMetadata(jobId) != null);
} catch (Exception e) {
LOGGER.warn("waiting for jobId not successful while rebalancing table: {}", tableNameWithType);
}
}

public Map<String, String> getControllerJobMetadata(String jobId) {
return _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.TABLE_REBALANCE);
}

@DELETE
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.UPDATE)
Expand Down Expand Up @@ -745,8 +765,7 @@ public List<String> cancelRebalance(
public ServerRebalanceJobStatusResponse rebalanceStatus(
@ApiParam(value = "Rebalance Job Id", required = true) @PathParam("jobId") String jobId)
throws JsonProcessingException {
Map<String, String> controllerJobZKMetadata =
_pinotHelixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.TABLE_REBALANCE);
Map<String, String> controllerJobZKMetadata = getControllerJobMetadata(jobId);

if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + jobId,
Expand Down

0 comments on commit abaf3c8

Please sign in to comment.