Skip to content

Commit

Permalink
Remove in-memory launching queue in RunRecordMonitorService
Browse files Browse the repository at this point in the history
  • Loading branch information
vsethi09 committed Jan 15, 2025
1 parent 9dc64ce commit dc068c3
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ public RunId runInternal(ProgramId programId, Map<String, String> userArgs,

ProgramRunId programRunId = programId.run(runId);
RunRecordMonitorService.Counter counter = runRecordMonitorService.addRequestAndGetCount(
programRunId);
programRunId, programOptions, programDescriptor);

boolean done = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,25 @@ private void restoreActiveRuns() {
}
try {
LOG.info("Found active run: {}", runRecordDetail.getProgramRunId());
ProgramOptions programOptions =
new SimpleProgramOptions(
runRecordDetail.getProgramRunId().getParent(),
new BasicArguments(runRecordDetail.getSystemArgs()),
new BasicArguments(runRecordDetail.getUserArgs()));
ProgramDescriptor programDescriptor = this.store.loadProgram(
runRecordDetail.getProgramRunId().getParent());
if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId(), programOptions, programDescriptor);
} else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId(), programOptions, programDescriptor);
// It is unknown what is the state of program runs in STARTING state.
// A STARTING message is published again to retry STARTING logic.
ProgramOptions programOptions =
new SimpleProgramOptions(
runRecordDetail.getProgramRunId().getParent(),
new BasicArguments(runRecordDetail.getSystemArgs()),
new BasicArguments(runRecordDetail.getUserArgs()));
LOG.debug("Retrying to start run {}.", runRecordDetail.getProgramRunId());
programStateWriter.start(
runRecordDetail.getProgramRunId(),
programOptions,
null,
this.store.loadProgram(runRecordDetail.getProgramRunId().getParent()));
programDescriptor);
}
} catch (Exception e) {
ProgramRunId programRunId = runRecordDetail.getProgramRunId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,25 @@

package io.cdap.cdap.internal.app.services;

import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.app.runtime.ProgramOptions;
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.Metrics.FlowControl;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.proto.ProgramRunStatus;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,45 +43,30 @@
* flow-control mechanism for launch requests. It also has a cleanup mechanism to automatically
* remove old (i.e., configurable) entries from the counter as a safe-guard mechanism.
*/
public class RunRecordMonitorService extends AbstractScheduledService {
public class RunRecordMonitorService extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(RunRecordMonitorService.class);

/**
* Contains ProgramRunIds of runs that have been accepted, but have not been added to metadata
* store plus all run records with {@link ProgramRunStatus#PENDING} or {@link
* ProgramRunStatus#STARTING} status.
*/
private final BlockingQueue<ProgramRunId> launchingQueue;
private static final Set<ProgramType> CONTROL_FLOW_PROGRAM_TYPES = EnumSet.of(ProgramType.MAPREDUCE,
ProgramType.WORKFLOW,
ProgramType.SPARK,
ProgramType.WORKER);

private final ProgramRuntimeService runtimeService;
private final long ageThresholdSec;
private final CConfiguration cConf;
private final MetricsCollectionService metricsCollectionService;
private final int maxConcurrentRuns;
private ScheduledExecutorService executor;
private final TransactionRunner transactionRunner;

/**
* Tracks the program runs.
*
* @param cConf configuration
* @param runtimeService service to get info on programs
* @param metricsCollectionService collect metrics
*/
@Inject
public RunRecordMonitorService(
CConfiguration cConf,
MetricsCollectionService metricsCollectionService,
ProgramRuntimeService runtimeService,
MetricsCollectionService metricsCollectionService) {
this.cConf = cConf;
this.runtimeService = runtimeService;
TransactionRunner transactionRunner) {
this.metricsCollectionService = metricsCollectionService;

this.launchingQueue =
new PriorityBlockingQueue<>(
128, Comparator.comparingLong(o -> RunIds.getTime(o.getRun(), TimeUnit.MILLISECONDS)));
this.ageThresholdSec = cConf.getLong(Constants.AppFabric.MONITOR_RECORD_AGE_THRESHOLD_SECONDS);
this.maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS);
this.transactionRunner = transactionRunner;
}

@Override
Expand All @@ -92,43 +76,22 @@ protected void startUp() throws Exception {

@Override
protected void shutDown() throws Exception {
if (executor != null) {
executor.shutdownNow();
}
LOG.info("RunRecordMonitorService successfully shut down.");
}

@Override
protected void runOneIteration() throws Exception {
cleanupQueue();
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(
0, cConf.getInt(Constants.AppFabric.MONITOR_CLEANUP_INTERVAL_SECONDS), TimeUnit.SECONDS);
}

@Override
protected final ScheduledExecutorService executor() {
executor =
Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("run-record-monitor-service-cleanup-scheduler"));
return executor;
}

/**
* Add a new in-flight launch request and return total number of launching and running programs.
*
* @param programRunId run id associated with the launch request
* @return total number of launching and running program runs.
*/
public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception {
public Counter addRequestAndGetCount(ProgramRunId programRunId, ProgramOptions programOptions,
ProgramDescriptor programDescriptor) throws Exception {
if (RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) == -1) {
throw new Exception("None time-based UUIDs are not supported");
}

int launchingCount = addRequest(programRunId);
int launchingCount = addRequest(programRunId, programOptions, programDescriptor);
int runningCount = getProgramsRunningCount();

LOG.info(
Expand All @@ -144,26 +107,34 @@ public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception
* @return total number of launching and running program runs.
*/
public Counter getCount() {
int launchingCount = launchingQueue.size();
int launchingCount = TransactionRunners.run(transactionRunner, context -> {
AppMetadataStore store = AppMetadataStore.create(context);
return store.countLaunching(CONTROL_FLOW_PROGRAM_TYPES, null);
});
int runningCount = getProgramsRunningCount();

return new Counter(launchingCount, runningCount);
}

/**
* Add a new in-flight launch request.
*
* @param programRunId run id associated with the launch request
*
* @return Returns the count of launching programs.
*/
public int addRequest(ProgramRunId programRunId) {
int result;
synchronized (launchingQueue) {
launchingQueue.add(programRunId);
result = launchingQueue.size();
}
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, result);
public int addRequest(ProgramRunId programRunId, ProgramOptions programOptions,
ProgramDescriptor programDescriptor) {
int launchingCount = TransactionRunners.run(transactionRunner, context -> {
AppMetadataStore store = AppMetadataStore.create(context);
store.recordProgramPending(programRunId,
programOptions.getArguments().asMap(),
programOptions.getUserArguments().asMap(),
programDescriptor.getArtifactId().toApiArtifactId());
return store.countLaunching(CONTROL_FLOW_PROGRAM_TYPES, null);
});
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingCount);
LOG.info("Added request with runId {}.", programRunId);
return result;
return launchingCount;
}

/**
Expand All @@ -176,28 +147,25 @@ public int addRequest(ProgramRunId programRunId) {
* Constants.Metrics.FlowControl#RUNNING_COUNT}
*/
public void removeRequest(ProgramRunId programRunId, boolean emitRunningChange) {
if (launchingQueue.remove(programRunId)) {
LOG.info(
"Removed request with runId {}. Counter has {} concurrent launching requests.",
programRunId,
launchingQueue.size());
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size());
}

// TODO: See if this func can be refactored as it only emits metrics. Merge it with other
// functions if needed.
// TransactionRunners.run(transactionRunner, context -> {
// return AppMetadataStore.create(context).recordProgramRejected(programRunId);
// });
emitLaunchingMetrics();
if (emitRunningChange) {
emitRunningMetrics();
}
}

public void emitLaunchingMetrics(long value) {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, value);
}

/**
* Emit the {@link Constants.Metrics.FlowControl#LAUNCHING_COUNT} metric for runs.
*/
public void emitLaunchingMetrics() {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size());
int launchingCount = TransactionRunners.run(transactionRunner, context -> {
return AppMetadataStore.create(context).countLaunching(CONTROL_FLOW_PROGRAM_TYPES,null);
});
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingCount);
}


Expand All @@ -213,59 +181,21 @@ private void emitMetrics(String metricName, long value) {
metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value);
}

private void cleanupQueue() {
while (true) {
ProgramRunId programRunId = launchingQueue.peek();
if (programRunId == null
|| RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) + (ageThresholdSec * 1000)
>= System.currentTimeMillis()) {
// Queue is empty or queue head has not expired yet.
break;
}
// Queue head might have already been removed. So instead of calling poll, we call remove.
if (launchingQueue.remove(programRunId)) {
LOG.info("Removing request with runId {} due to expired retention time.", programRunId);
}
}
// Always emit both metrics after cleanup.
emitLaunchingMetrics();
emitRunningMetrics();
}

/**
* Returns the total number of programs in running state. The count includes batch (i.e., {@link
* ProgramType#WORKFLOW}), streaming (i.e., {@link ProgramType#SPARK}) with no parent and
* replication (i.e., {@link ProgramType#WORKER}) jobs.
*/
private int getProgramsRunningCount() {
List<ProgramRuntimeService.RuntimeInfo> list =
runtimeService.listAll(
ProgramType.WORKFLOW, ProgramType.WORKER, ProgramType.SPARK, ProgramType.MAPREDUCE);

int launchingCount = launchingQueue.size();

// We use program controllers (instead of querying metadata store) to count the total number of
// programs in running state.
// A program controller is created when a launch request is in the middle of starting state.
// Therefore, the returning running count is NOT precise.
int impreciseRunningCount =
(int) list.stream()
.filter(r -> isRunning(r.getController().getState().getRunStatus()))
.count();

if (maxConcurrentRuns < 0 || (launchingCount + impreciseRunningCount < maxConcurrentRuns)) {
// It is safe to return the imprecise value since either flow control for runs is disabled
// (i.e., -1) or flow control will not reject an incoming request yet.
return impreciseRunningCount;
}

// Flow control is at the threshold. We return the precise count.
return (int) list.stream()
.filter(
r ->
isRunning(r.getController().getState().getRunStatus())
&& !launchingQueue.contains(r.getController().getProgramRunId()))
.count();
// List<ProgramRuntimeService.RuntimeInfo> list =
// runtimeService.listAll(
// ProgramType.WORKFLOW, ProgramType.WORKER, ProgramType.SPARK, ProgramType.MAPREDUCE);
// return (int) list.stream()
// .filter(r -> isRunning(r.getController().getState().getRunStatus()))
// .count();
return TransactionRunners.run(transactionRunner, context -> {
return AppMetadataStore.create(context).countRunning(CONTROL_FLOW_PROGRAM_TYPES,null);
});
}

private boolean isRunning(ProgramRunStatus status) {
Expand All @@ -278,6 +208,15 @@ private boolean isRunning(ProgramRunStatus status) {
return false;
}


// private Counter getFlowControlMetrics(boolean launching, boolean running) {
// return TransactionRunners.run(transactionRunner, context -> {
// AppMetadataStore store = AppMetadataStore.create(context);
// int launchingCount = store.countLaunchingWorkflows(null);
// int runningCount = store.count
// });
// }

/**
* Counts the concurrent program runs.
*/
Expand Down
Loading

0 comments on commit dc068c3

Please sign in to comment.