Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix][Zeta] fix Job will lost control when JobMaster init failed #4045

Merged
merged 1 commit into from
Feb 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -214,10 +214,12 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
ownedSlotProfilesIMap,
engineConfig);

jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
try {
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
jobMaster.initCheckPointManager();
} catch (Exception e) {
throw new SeaTunnelEngineException(String.format("Job id %s init JobMaster failed", jobId), e);
jobMaster.cancelJob();
throw new SeaTunnelEngineException(String.format("Job id %s init CheckPointManager failed", jobId), e);
}

String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
@@ -339,6 +341,7 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
runningJobInfoIMap.put(jobId, new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
runningJobMasterMap.put(jobId, jobMaster);
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
jobMaster.initCheckPointManager();
} catch (Throwable e) {
logger.severe(String.format("submit job %s error %s ", jobId, ExceptionUtils.getMessage(e)));
voidCompletableFuture.completeExceptionally(e);
Original file line number Diff line number Diff line change
@@ -144,8 +144,11 @@ public void addPipelineEndCallback(SubPlan subPlan) {
future.thenAcceptAsync(pipelineState -> {
try {
// Notify checkpoint manager when the pipeline end, Whether the pipeline will be restarted or not
jobMaster.getCheckpointManager()
.listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState()).join();
if (jobMaster.getCheckpointManager() != null) {
jobMaster.getCheckpointManager()
.listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState())
.join();
}
if (PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
if (canRestorePipeline(subPlan)) {
subPlan.restorePipeline();
Original file line number Diff line number Diff line change
@@ -262,7 +262,9 @@ public void cancelPipeline() {
}

private void cancelCheckpointCoordinator() {
jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId, PipelineStatus.CANCELING).join();
if (jobMaster.getCheckpointManager() != null) {
jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId, PipelineStatus.CANCELING).join();
}
}

private void cancelPipelineTasks() {
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
@@ -130,6 +131,8 @@ public class JobMaster {

private boolean isRunning = true;

private Map<Integer, CheckpointPlan> checkpointPlanMap;

public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
@@ -151,7 +154,7 @@ public JobMaster(@NonNull Data jobImmutableInformationData,
this.engineConfig = engineConfig;
}

public void init(long initializationTimestamp) throws Exception {
public void init(long initializationTimestamp) {
jobImmutableInformation = nodeEngine.getSerializationService().toObject(
jobImmutableInformationData);
LOGGER.info(String.format("Init JobMaster for Job %s (%s) ", jobImmutableInformation.getJobConfig().getName(),
@@ -162,8 +165,6 @@ public void init(long initializationTimestamp) throws Exception {
classLoader = new SeaTunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls());
logicalDag = CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
classLoader, jobImmutableInformation.getLogicalDag());
CheckpointConfig checkpointConfig = mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
jobImmutableInformation.getJobConfig().getEnvOptions());

final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = PlanUtils.fromLogicalDAG(logicalDag,
nodeEngine,
@@ -176,14 +177,20 @@ public void init(long initializationTimestamp) throws Exception {
engineConfig.getQueueType());
this.physicalPlan = planTuple.f0();
this.physicalPlan.setJobMaster(this);
this.checkpointPlanMap = planTuple.f1();
this.initStateFuture();
}

public void initCheckPointManager() throws CheckpointStorageException {
CheckpointConfig checkpointConfig = mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
jobImmutableInformation.getJobConfig().getEnvOptions());
this.checkpointManager = new CheckpointManager(
jobImmutableInformation.getJobId(),
jobImmutableInformation.isStartWithSavePoint(),
nodeEngine,
this,
planTuple.f1(),
checkpointPlanMap,
checkpointConfig);
this.initStateFuture();
}

// TODO replace it after ReadableConfig Support parse yaml format, then use only one config to read engine and env config.
@@ -323,8 +330,9 @@ public List<RawJobMetrics> getCurrJobMetrics(Collection<Map<TaskGroupLocation, S
groupLocation.forEach((taskGroupLocation, slotProfile) -> {
if (taskGroupLocation.getJobId() == this.getJobImmutableInformation().getJobId()) {
try {
RawJobMetrics rawJobMetrics = (RawJobMetrics) NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
new GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
RawJobMetrics rawJobMetrics =
(RawJobMetrics) NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
new GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
metrics.add(rawJobMetrics);
} catch (Exception e) {
throw new SeaTunnelException(e.getMessage());
@@ -336,7 +344,8 @@ public List<RawJobMetrics> getCurrJobMetrics(Collection<Map<TaskGroupLocation, S
}

public void savePipelineMetricsToHistory(PipelineLocation pipelineLocation) {
List<RawJobMetrics> currJobMetrics = this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation)));
List<RawJobMetrics> currJobMetrics =
this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation)));
JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
long jobId = this.getJobImmutableInformation().getJobId();
synchronized (this) {
@@ -346,8 +355,9 @@ public void savePipelineMetricsToHistory(PipelineLocation pipelineLocation) {
this.cleanTaskGroupContext(pipelineLocation);
}

public void removeMetricsContext(PipelineLocation pipelineLocation, PipelineStatus pipelineStatus){
if (pipelineStatus.equals(PipelineStatus.FINISHED) && !checkpointManager.isSavePointEnd() || pipelineStatus.equals(PipelineStatus.CANCELED)){
public void removeMetricsContext(PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
if (pipelineStatus.equals(PipelineStatus.FINISHED) && !checkpointManager.isSavePointEnd() ||
pipelineStatus.equals(PipelineStatus.CANCELED)) {
IMap<TaskLocation, MetricsContext> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
map.keySet().stream().filter(taskLocation -> {
@@ -399,7 +409,7 @@ public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
/**
* Execute savePoint, which will cause the job to end.
*/
public CompletableFuture<Void> savePoint(){
public CompletableFuture<Void> savePoint() {
PassiveCompletableFuture<CompletedCheckpoint>[] passiveCompletableFutures =
checkpointManager.triggerSavepoints();
return CompletableFuture.allOf(passiveCompletableFutures);
@@ -414,7 +424,8 @@ public void setOwnedSlotProfiles(@NonNull PipelineLocation pipelineLocation,
@NonNull Map<TaskGroupLocation, SlotProfile> pipelineOwnedSlotProfiles) {
ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
try {
RetryUtils.retryWithException(() -> pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)),
RetryUtils.retryWithException(
() -> pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)),
new RetryUtils.RetryMaterial(20, true,
exception -> exception instanceof NullPointerException && isRunning, 1000));
} catch (Exception e) {