Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Zeta] job clean before JobMaster future complete #4087

Merged
merged 3 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
runningJobInfoIMap,
engineConfig);

jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
Expand All @@ -227,7 +228,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
logger.info(String.format(
"The restore %s is in an end state %s, store the job info to JobHistory and clear the job running time info",
jobFullName, jobStatus));
removeJobIMap(jobMaster);
jobMaster.cleanJob();
return;
}

Expand All @@ -250,8 +251,11 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
jobMaster.cancelJob();
jobMaster.run();
} finally {
// storage job state info to HistoryStorage
onJobDone(jobMaster, jobId);
// voidCompletableFuture will be cancelled when zeta master node shutdown to simulate master failure,
// don't update runningJobMasterMap is this case.
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
runningJobMasterMap.remove(jobId);
}
}
});
return;
Expand All @@ -265,7 +269,11 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState);
jobMaster.run();
} finally {
onJobDone(jobMaster, jobId);
// voidCompletableFuture will be cancelled when zeta master node shutdown to simulate master failure,
// don't update runningJobMasterMap is this case.
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
runningJobMasterMap.remove(jobId);
}
}
});
}
Expand Down Expand Up @@ -297,7 +305,6 @@ private void clearCoordinatorService() {

try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
runningJobMasterMap = new ConcurrentHashMap<>();
} catch (InterruptedException e) {
throw new SeaTunnelEngineException("wait clean executor service error", e);
}
Expand Down Expand Up @@ -335,7 +342,9 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
getJobHistoryService(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap, engineConfig);
ownedSlotProfilesIMap,
runningJobInfoIMap,
engineConfig);
executorService.submit(() -> {
try {
runningJobInfoIMap.put(jobId, new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
Expand All @@ -353,7 +362,11 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
try {
jobMaster.run();
} finally {
onJobDone(jobMaster, jobId);
// voidCompletableFuture will be cancelled when zeta master node shutdown to simulate master failure,
// don't update runningJobMasterMap is this case.
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
runningJobMasterMap.remove(jobId);
}
}
});
return new PassiveCompletableFuture<>(voidCompletableFuture);
Expand All @@ -372,37 +385,6 @@ public PassiveCompletableFuture<Void> savePoint(long jobId) {
return new PassiveCompletableFuture<>(voidCompletableFuture);
}

private void onJobDone(JobMaster jobMaster, long jobId) {
// storage job state and metrics to HistoryStorage
jobHistoryService.storeJobInfo(jobId, runningJobMasterMap.get(jobId).getJobDAGInfo());
jobHistoryService.storeFinishedJobState(jobMaster);
removeJobIMap(jobMaster);
runningJobMasterMap.remove(jobId);
}

private void removeJobIMap(JobMaster jobMaster) {
Long jobId = jobMaster.getJobImmutableInformation().getJobId();
runningJobStateTimestampsIMap.remove(jobId);

jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
runningJobStateIMap.remove(pipeline.getPipelineLocation());
runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
pipeline.getCoordinatorVertexList().forEach(coordinator -> {
runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
});

pipeline.getPhysicalVertexList().forEach(task -> {
runningJobStateIMap.remove(task.getTaskGroupLocation());
runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
});
});

// These should be deleted at the end.
runningJobStateIMap.remove(jobId);
runningJobInfoIMap.remove(jobId);
}

public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
Expand Down Expand Up @@ -134,14 +135,18 @@ public class JobMaster {

private Map<Integer, CheckpointPlan> checkpointPlanMap;

private final IMap<Long, JobInfo> runningJobInfoIMap;

public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
@NonNull ResourceManager resourceManager,
@NonNull JobHistoryService jobHistoryService,
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap,
@NonNull IMap ownedSlotProfilesIMap, EngineConfig engineConfig) {
@NonNull IMap ownedSlotProfilesIMap,
@NonNull IMap<Long, JobInfo> runningJobInfoIMap,
EngineConfig engineConfig) {
this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
this.executorService = executorService;
Expand All @@ -152,6 +157,7 @@ public JobMaster(@NonNull Data jobImmutableInformationData,
this.jobHistoryService = jobHistoryService;
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
this.runningJobInfoIMap = runningJobInfoIMap;
this.engineConfig = engineConfig;
}

Expand Down Expand Up @@ -217,10 +223,11 @@ public void initStateFuture() {
jobStatusFuture.whenComplete(withTryCatch(LOGGER, (v, t) -> {
// We need not handle t, Because we will not return t from physicalPlan
if (JobStatus.FAILING.equals(v.getStatus())) {
cleanJob();
physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
}
jobMasterCompleteFuture.complete(new JobResult(physicalPlan.getJobStatus(), v.getError()));
JobResult jobResult = new JobResult(physicalPlan.getJobStatus(), v.getError());
cleanJob();
jobMasterCompleteFuture.complete(jobResult);
}));
}

Expand Down Expand Up @@ -256,6 +263,28 @@ public void handleCheckpointError(long pipelineId, Throwable e) {
});
}

private void removeJobIMap() {
Long jobId = getJobImmutableInformation().getJobId();
runningJobStateTimestampsIMap.remove(jobId);

getPhysicalPlan().getPipelineList().forEach(pipeline -> {
runningJobStateIMap.remove(pipeline.getPipelineLocation());
runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
pipeline.getCoordinatorVertexList().forEach(coordinator -> {
runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
});

pipeline.getPhysicalVertexList().forEach(task -> {
runningJobStateIMap.remove(task.getTaskGroupLocation());
runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
});
});

runningJobStateIMap.remove(jobId);
runningJobInfoIMap.remove(jobId);
}

public JobDAGInfo getJobDAGInfo() {
if (jobDAGInfo == null) {
jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag, jobImmutableInformation, isPhysicalDAGIInfo);
Expand All @@ -277,7 +306,9 @@ public void releasePipelineResource(SubPlan subPlan) {
}

public void cleanJob() {
// TODO Add some job clean operation
jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), getJobDAGInfo());
jobHistoryService.storeFinishedJobState(this);
removeJobIMap();
}

public Address queryTaskGroupAddress(long taskGroupId) {
Expand Down