Skip to content

Commit

Permalink
remove webjobs during cancellation, closes #221
Browse files Browse the repository at this point in the history
  • Loading branch information
mfleisch committed May 19, 2024
1 parent 080e87f commit c5b63bc
Showing 1 changed file with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import de.unijena.bioinf.fingerid.blast.FingerblastResult;
import de.unijena.bioinf.fingerid.blast.parameters.ParameterStore;
import de.unijena.bioinf.jjobs.BasicMasterJJob;
import de.unijena.bioinf.jjobs.JJob;
import de.unijena.bioinf.ms.properties.PropertyManager;
import de.unijena.bioinf.ms.rest.model.canopus.CanopusJobInput;
import de.unijena.bioinf.ms.rest.model.covtree.CovtreeJobInput;
import de.unijena.bioinf.ms.webapi.WebJJob;
import de.unijena.bioinf.rest.NetUtils;
Expand Down Expand Up @@ -75,7 +77,7 @@ private static boolean useConfidenceScore() {

private StructureSearchResult structureSearchResult;

// List<WebJJob<CovtreeJobInput, ?, BayesnetScoring, ?>> covtreeJobs = new ArrayList<>();
Set<WebJJob<?,?,?,?>> webJJobs = new HashSet<>();

public FingerblastJJob(@NotNull CSIPredictor predictor, @NotNull WebAPI<?> webAPI) {
this(predictor, webAPI, null);
Expand Down Expand Up @@ -160,6 +162,11 @@ protected List<FingerIdResult> compute() throws Exception {
BayesnetScoring[] s = new BayesnetScoring[idResults.size()];
webAPI.executeBatch((api, client) -> {
for (int i = 0; i < idResults.size(); i++) {
try {//interrupt if canceled
checkForInterruption();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
final FingerIdResult fingeridInput = idResults.get(i);
// fingerblast job: score candidate fingerprints against predicted fingerprint
s[i] = api.fingerprintClient().getCovarianceScoring(
Expand Down Expand Up @@ -191,8 +198,14 @@ protected List<FingerIdResult> compute() throws Exception {
searchJJobs.add(blastJob);
WebJJob<CovtreeJobInput, ?, BayesnetScoring, ?> covTreeJob =
webAPI.submitCovtreeJob(fingeridInput.getMolecularFormula(), predictor.predictorType);
webJJobs.add(covTreeJob);

blastJob.addRequiredJob(covTreeJob);
// covtreeJobs.add(covTreeJob);
//remove jobs to free up memory
blastJob.addJobProgressListener(jobProgressEvent -> {
if (((JJob<?>)jobProgressEvent.getSource()).isFinished())
webJJobs.remove(covTreeJob);
});
}

blastJob.addRequiredJob(formulaJobs.get(i));
Expand Down Expand Up @@ -389,35 +402,37 @@ private ConfidenceJJob executeConfidenceStack(ArrayList<Scored<FingerprintCandid
int confScoreApproxDist,
StructureSearchDB searchDB,
ParameterStore parameterStore,
CanopusResult topFormulaCanopusResult) {
CanopusResult topFormulaCanopusResult) throws InterruptedException {

if (requestedMergedCandidates.isEmpty() || parameterStore == null) return null;


try {
checkForInterruption();
//Start and finish MCES job for requested DBs here, since Epi and conf are dependent on the mces-condensed list
final MCESJJob mcesJJobRequested = new MCESJJob(confScoreApproxDist, requestedMergedCandidates);
submitJob(mcesJJobRequested);
int mcesIndexRequested = mcesJJobRequested.awaitResult();

checkForInterruption();


//MCES-condensed list for requested
ArrayList<Scored<FingerprintCandidate>> requestedMergedCandidatesMCESCondensed = new ArrayList<>();
requestedMergedCandidatesMCESCondensed.add(requestedMergedCandidates.get(0));
Map removedCandidatesrequested = requestedMergedCandidates.subList(1, mcesIndexRequested + 1).stream().collect(Collectors.toMap(c -> c.getCandidate().getInchiKey2D(), Scored<FingerprintCandidate>::getScore));
Map<String, Double> removedCandidatesrequested = requestedMergedCandidates.subList(1, mcesIndexRequested + 1).stream().collect(Collectors.toMap(c -> c.getCandidate().getInchiKey2D(), Scored<FingerprintCandidate>::getScore));
requestedMergedCandidatesMCESCondensed.addAll(requestedMergedCandidates.subList(mcesIndexRequested + 1, requestedMergedCandidates.size()));


/**
*
* Submit epi jobs for requested databases
*/
checkForInterruption();

//Submit epi jobs for requested databases
//epi Job for <exact, requested>
final SubstructureAnnotationJJob epiJJobExactRequested = new SubstructureAnnotationJJob(requestedMergedCandidates.size() >= 5 ? 5 : requestedMergedCandidates.size() >= 2 ? 2 : requestedMergedCandidates.size() >= 1 ? 1 : 0);
epiJJobExactRequested.setInput(fTreeCandidatesMap);
submitJob(epiJJobExactRequested);

checkForInterruption();

//epi job for <approximate, requested>. Remove candidate from ftreeCandidatesMap that are within MCES distance of approximate mode
final SubstructureAnnotationJJob epiJJobApproximateRequested = new SubstructureAnnotationJJob(requestedMergedCandidatesMCESCondensed.size() >= 5 ? 5 : requestedMergedCandidatesMCESCondensed.size() >= 2 ? 2 : requestedMergedCandidatesMCESCondensed.size() >= 1 ? 1 : 0);
HashMap<FTree, FBCandidates> fTreeCandidatesMapMCESCondensedRequested = new HashMap<>();
Expand All @@ -428,33 +443,42 @@ private ConfidenceJJob executeConfidenceStack(ArrayList<Scored<FingerprintCandid
epiJJobApproximateRequested.setInput(fTreeCandidatesMapMCESCondensedRequested);
submitJob(epiJJobApproximateRequested);

checkForInterruption();


final int specHash = Spectrums.mergeSpectra(experiment.getMs2Spectra()).hashCode();
CanopusResult canopusResultTopHit = webAPI.submitCanopusJob(
requestedMergedCandidates.get(0).getCandidate().getInchi().extractFormula(),
experiment.getPrecursorIonType().getCharge(),
requestedMergedCandidates.get(0).getCandidate().getFingerprint().asProbabilistic(),
specHash)
.awaitResult();
WebJJob<CanopusJobInput, ?, CanopusResult, ?> canopusWebJJob = webAPI.submitCanopusJob(
requestedMergedCandidates.get(0).getCandidate().getInchi().extractFormula(),
experiment.getPrecursorIonType().getCharge(),
requestedMergedCandidates.get(0).getCandidate().getFingerprint().asProbabilistic(),
specHash);
webJJobs.add(canopusWebJJob);

checkForInterruption();

CanopusResult canopusResultTopHit = canopusWebJJob.awaitResult();
webJJobs.remove(canopusWebJJob);

checkForInterruption();


//confidence job for requested
final ConfidenceJJob confidenceJJobRequested = (predictor.getConfidenceScorer() != null) && enableConfidence
? new ConfidenceJJob(predictor, experiment, allMergedCandidates, requestedMergedCandidates, requestedMergedCandidatesMCESCondensed, searchDB, parameterStore, topFormulaCanopusResult, canopusResultTopHit, mcesIndexRequested)
: null;


//we use result because it is non blocking...
confidenceJJobRequested.setEpiExact(epiJJobExactRequested::result);
confidenceJJobRequested.setEpiApprox(epiJJobApproximateRequested::result);

confidenceJJobRequested.addRequiredJob(epiJJobExactRequested);
confidenceJJobRequested.addRequiredJob(epiJJobApproximateRequested);

checkForInterruption();

return submitJob(confidenceJJobRequested);
} catch (ExecutionException | UnknownElementException | IOException e) {
e.printStackTrace();
logError("Couldn't compute confidence Job");
logError("Couldn't compute confidence Job", e);
return null;
}

Expand All @@ -463,14 +487,14 @@ private ConfidenceJJob executeConfidenceStack(ArrayList<Scored<FingerprintCandid
@Override
public void cancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
// if (covtreeJobs != null)
// covtreeJobs.forEach(c -> c.cancel(mayInterruptIfRunning));
if (webJJobs != null)
webJJobs.forEach(c -> c.cancel(mayInterruptIfRunning));
}

@Override
protected void cleanup() {
super.cleanup();
// covtreeJobs = null;
webJJobs = null;
}

@Override
Expand Down

0 comments on commit c5b63bc

Please sign in to comment.