Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-21096] Remove in-memory launching queue from RunRecordMonitorService #15800

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@
import io.cdap.cdap.internal.app.services.NoopRunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.FlowControlService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.store.DefaultStore;

Check warning on line 121 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'io.cdap.cdap.internal.app.services.FlowControlService' import. Should be before 'io.cdap.cdap.internal.app.services.RunRecordCorrectorService'.
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
import io.cdap.cdap.internal.capability.CapabilityModule;
import io.cdap.cdap.internal.credential.guice.MasterCredentialProviderModule;
Expand Down Expand Up @@ -188,7 +188,7 @@

private final CConfiguration cConf;

@Inject

Check warning on line 191 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocTypeCheck

Missing a Javadoc comment.
public AppFabricServiceRuntimeModule(CConfiguration cConf) {
this.cConf = cConf;
}
Expand Down Expand Up @@ -420,7 +420,7 @@

bind(ArtifactStore.class).in(Scopes.SINGLETON);
bind(ProfileService.class).in(Scopes.SINGLETON);
bind(RunRecordMonitorService.class).in(Scopes.SINGLETON);
bind(FlowControlService.class).in(Scopes.SINGLETON);
bind(ProgramLifecycleService.class).in(Scopes.SINGLETON);
bind(SystemAppManagementService.class).in(Scopes.SINGLETON);
bind(OwnerAdmin.class).to(DefaultOwnerAdmin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryService;
import org.slf4j.Logger;
Expand All @@ -75,7 +74,7 @@ public class AppFabricProcessorService extends AbstractIdleService {
private final RunRecordCorrectorService runRecordCorrectorService;
private final RunDataTimeToLiveService runDataTimeToLiveService;
private final ProgramRunStatusMonitorService programRunStatusMonitorService;
private final RunRecordMonitorService runRecordCounterService;
private final FlowControlService runRecordCounterService;
private final CoreSchedulerService coreSchedulerService;
private final ProvisioningService provisioningService;
private final BootstrapService bootstrapService;
Expand Down Expand Up @@ -111,7 +110,7 @@ public AppFabricProcessorService(CConfiguration cConf,
ProvisioningService provisioningService,
BootstrapService bootstrapService,
SystemAppManagementService systemAppManagementService,
RunRecordMonitorService runRecordCounterService,
FlowControlService runRecordCounterService,
RunDataTimeToLiveService runDataTimeToLiveService,
OperationNotificationSubscriberService operationNotificationSubscriberService,
ScheduleNotificationSubscriberService scheduleNotificationSubscriberService) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright © 2022 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.services;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.app.runtime.ProgramOptions;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.proto.ProgramRunStatus;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Maintain and provides total number of launching and running run-records. This class is used by
* flow-control mechanism for launch requests.
*/
public class FlowControlService extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(FlowControlService.class);

private final MetricsCollectionService metricsCollectionService;
private final TransactionRunner transactionRunner;

/**
* Monitors the program flow control.
*
* @param metricsCollectionService collect metrics
*/
@Inject
public FlowControlService(
MetricsCollectionService metricsCollectionService,
TransactionRunner transactionRunner) {
this.metricsCollectionService = metricsCollectionService;
this.transactionRunner = transactionRunner;
}

@Override
protected void startUp() throws Exception {
LOG.info("FlowControlService started.");
}

@Override
protected void shutDown() throws Exception {
LOG.info("FlowControlService successfully shut down.");
}

/**
* Add a new in-flight launch request and return total number of launching and running programs.
*
* @param programRunId run id associated with the launch request
* @return total number of launching and running program runs.
*/
public Counter addRequestAndGetCounter(ProgramRunId programRunId, ProgramOptions programOptions,
ProgramDescriptor programDescriptor) throws Exception {
if (RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) == -1) {
throw new Exception("None time-based UUIDs are not supported");
}

Counter counter = TransactionRunners.run(transactionRunner, context -> {
AppMetadataStore store = AppMetadataStore.create(context);
store.recordProgramPending(programRunId,
programOptions.getArguments().asMap(),
programOptions.getUserArguments().asMap(),
programDescriptor.getArtifactId().toApiArtifactId());
int launchingCount = store.getFlowControlLaunchingCount();
int runningCount = store.getFlowControlRunningCount();
return new Counter(launchingCount, runningCount);
});
LOG.info("Added request with runId {}.", programRunId);

Check notice

Code scanning / SonarCloud

Logging should not be vulnerable to injection attacks Low

Change this code to not log user-controlled data. See more on SonarQube Cloud
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, counter.getLaunchingCount());

LOG.info(
"Counter has {} concurrent launching and {} running programs.",
counter.getLaunchingCount(),
counter.getRunningCount());
return counter;
}

/**
* Get total number of launching and running programs.
*
* @return Counter with total number of launching and running program runs.
*/
public Counter getCounter() {
return TransactionRunners.run(transactionRunner, context -> {
AppMetadataStore store = AppMetadataStore.create(context);
return new Counter(store.getFlowControlLaunchingCount(), store.getFlowControlRunningCount());
});
}

public void emitFlowControlMetrics() {

Check warning on line 114 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/FlowControlService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck

Missing a Javadoc comment.
Counter counter = getCounter();
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, counter.getLaunchingCount());
emitMetrics(Constants.Metrics.FlowControl.RUNNING_COUNT, counter.getRunningCount());
}

private void emitMetrics(String metricName, long value) {
LOG.trace("Setting metric {} to value {}", metricName, value);
metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value);
}

/**
* Counts the concurrent program runs.
*/
public class Counter {

/**
* Total number of launch requests that have been accepted but still missing in metadata store +
* * total number of run records with {@link ProgramRunStatus#PENDING} status + total number of
* run records with {@link ProgramRunStatus#STARTING} status.
*/
private final int launchingCount;

/**
* Total number of run records with {@link ProgramRunStatus#RUNNING} status + Total number of run
* records with {@link ProgramRunStatus#SUSPENDED} status + Total number of run records with
* {@link ProgramRunStatus#RESUMING} status.
*/
private final int runningCount;

Counter(int launchingCount, int runningCount) {
this.launchingCount = launchingCount;
this.runningCount = runningCount;
}

public int getLaunchingCount() {
return launchingCount;
}

public int getRunningCount() {
return runningCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
private final int defaultStopTimeoutSecs;
private final int batchSize;
private final ArtifactRepository artifactRepository;
private final RunRecordMonitorService runRecordMonitorService;
private final FlowControlService flowControlService;
private final boolean userProgramLaunchDisabled;

@Inject
Expand All @@ -161,7 +161,7 @@
ProvisionerNotifier provisionerNotifier, ProvisioningService provisioningService,
ProgramStateWriter programStateWriter, CapabilityReader capabilityReader,
ArtifactRepository artifactRepository,
RunRecordMonitorService runRecordMonitorService) {
FlowControlService flowControlService) {
this.maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS);
this.maxConcurrentLaunching = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_LAUNCHING);
this.defaultStopTimeoutSecs = cConf.getInt(Constants.AppFabric.PROGRAM_MAX_STOP_SECONDS);
Expand All @@ -180,7 +180,7 @@
this.programStateWriter = programStateWriter;
this.capabilityReader = capabilityReader;
this.artifactRepository = artifactRepository;
this.runRecordMonitorService = runRecordMonitorService;
this.flowControlService = flowControlService;
}

/**
Expand Down Expand Up @@ -730,8 +730,8 @@
checkCapability(programDescriptor);

ProgramRunId programRunId = programId.run(runId);
RunRecordMonitorService.Counter counter = runRecordMonitorService.addRequestAndGetCount(
programRunId);
FlowControlService.Counter counter = flowControlService.addRequestAndGetCounter(
programRunId, programOptions, programDescriptor);

boolean done = false;
try {
Expand Down Expand Up @@ -765,7 +765,7 @@
done = true;
} finally {
if (!done) {
runRecordMonitorService.removeRequest(programRunId, false);
flowControlService.emitFlowControlMetrics();
}
}

Expand Down Expand Up @@ -1545,7 +1545,7 @@
public void addAppCdapVersion(ProgramId programId, Map<String, String> systemArgs) {
ApplicationSpecification appSpec = store.getApplication(programId.getParent());
if (appSpec != null) {
String appCDAPVersion = appSpec.getAppCDAPVersion();

Check warning on line 1548 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'appCDAPVersion' must contain no more than '1' consecutive capital letters.
if (appCDAPVersion != null) {
systemArgs.put(Constants.APP_CDAP_VERSION, appCDAPVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**

Check warning on line 98 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.SummaryJavadocCheck

First sentence of Javadoc is missing an ending period.
* Service that creates children services, each to handle a single partition of program status
* events topic
*/
Expand All @@ -112,7 +112,7 @@
private final ProgramStateWriter programStateWriter;
private final TransactionRunner transactionRunner;
private final Store store;
private final RunRecordMonitorService runRecordMonitorService;
private final FlowControlService flowControlService;
private Service delegate;
private Set<ProgramCompletionNotifier> programCompletionNotifiers;

Expand All @@ -127,7 +127,7 @@
ProgramStateWriter programStateWriter,
TransactionRunner transactionRunner,
Store store,
RunRecordMonitorService runRecordMonitorService) {
FlowControlService flowControlService) {

this.messagingService = messagingService;
this.cConf = cConf;
Expand All @@ -138,7 +138,7 @@
this.programStateWriter = programStateWriter;
this.transactionRunner = transactionRunner;
this.store = store;
this.runRecordMonitorService = runRecordMonitorService;
this.flowControlService = flowControlService;
this.programCompletionNotifiers = Collections.emptySet();
}

Expand All @@ -163,8 +163,7 @@
}

private void emitFlowControlMetrics() {
runRecordMonitorService.emitLaunchingMetrics();
runRecordMonitorService.emitRunningMetrics();
flowControlService.emitFlowControlMetrics();
}

private void restoreActiveRuns() {
Expand All @@ -184,23 +183,22 @@
}
try {
LOG.info("Found active run: {}", runRecordDetail.getProgramRunId());
if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
} else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
ProgramOptions programOptions =
new SimpleProgramOptions(
runRecordDetail.getProgramRunId().getParent(),
new BasicArguments(runRecordDetail.getSystemArgs()),
new BasicArguments(runRecordDetail.getUserArgs()));
ProgramDescriptor programDescriptor = this.store.loadProgram(
runRecordDetail.getProgramRunId().getParent());
if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) {
// It is unknown what is the state of program runs in STARTING state.
// A STARTING message is published again to retry STARTING logic.
ProgramOptions programOptions =
new SimpleProgramOptions(
runRecordDetail.getProgramRunId().getParent(),
new BasicArguments(runRecordDetail.getSystemArgs()),
new BasicArguments(runRecordDetail.getUserArgs()));
LOG.debug("Retrying to start run {}.", runRecordDetail.getProgramRunId());
programStateWriter.start(
runRecordDetail.getProgramRunId(),
programOptions,
null,
this.store.loadProgram(runRecordDetail.getProgramRunId().getParent()));
programDescriptor);
}
} catch (Exception e) {
ProgramRunId programRunId = runRecordDetail.getProgramRunId();
Expand Down Expand Up @@ -234,7 +232,7 @@
provisioningService,
programStateWriter,
transactionRunner,
runRecordMonitorService,
flowControlService,
name,
topicName,
programCompletionNotifiers);
Expand All @@ -246,7 +244,7 @@
* No transactions should be started in any of the overrided methods since they are already wrapped
* in a transaction.
*/
class ProgramNotificationSingleTopicSubscriberService

Check warning on line 247 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.design.OneTopLevelClassCheck

Top-level class ProgramNotificationSingleTopicSubscriberService has to reside in its own source file.
extends AbstractNotificationSubscriberService {

private static final Logger LOG =
Expand Down Expand Up @@ -275,7 +273,7 @@
private final Queue<Runnable> tasks;
private final MetricsCollectionService metricsCollectionService;
private Set<ProgramCompletionNotifier> programCompletionNotifiers;
private final RunRecordMonitorService runRecordMonitorService;
private final FlowControlService flowControlService;
private final boolean checkTxSeparation;

ProgramNotificationSingleTopicSubscriberService(
Expand All @@ -287,7 +285,7 @@
ProvisioningService provisioningService,
ProgramStateWriter programStateWriter,
TransactionRunner transactionRunner,
RunRecordMonitorService runRecordMonitorService,
FlowControlService flowControlService,
String name,
String topicName,
Set<ProgramCompletionNotifier> programCompletionNotifiers) {
Expand All @@ -310,7 +308,7 @@
this.tasks = new LinkedList<>();
this.metricsCollectionService = metricsCollectionService;
this.programCompletionNotifiers = programCompletionNotifiers;
this.runRecordMonitorService = runRecordMonitorService;
this.flowControlService = flowControlService;

// If number of partitions equals 1, DB deadlock cannot happen as a result of concurrent
// modifications to
Expand Down Expand Up @@ -582,7 +580,7 @@
appMetadataStore.recordProgramRunning(
programRunId, logicalStartTimeSecs, twillRunId, messageIdBytes);
writeToHeartBeatTable(recordedRunRecord, logicalStartTimeSecs, programHeartbeatTable);
runRecordMonitorService.removeRequest(programRunId, true);
flowControlService.emitFlowControlMetrics();
long startDelayTime =
logicalStartTimeSecs - RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS);
emitStartingTimeMetric(programRunId, startDelayTime, recordedRunRecord);
Expand Down Expand Up @@ -660,7 +658,7 @@
Constants.Metrics.Program.PROGRAM_REJECTED_RUNS,
null)
.ifPresent(runnables::add);
runRecordMonitorService.removeRequest(programRunId, true);
flowControlService.emitFlowControlMetrics();
break;
default:
// This should not happen
Expand Down Expand Up @@ -774,7 +772,7 @@
programCompletionNotifiers.forEach(
notifier ->
notifier.onProgramCompleted(programRunId, recordedRunRecord.getStatus()));
runRecordMonitorService.removeRequest(programRunId, true);
flowControlService.emitFlowControlMetrics();
});
}
return recordedRunRecord;
Expand Down Expand Up @@ -917,7 +915,7 @@
ProgramDescriptor programDescriptor =
GSON.fromJson(
properties.get(ProgramOptionConstants.PROGRAM_DESCRIPTOR), ProgramDescriptor.class);
switch (clusterStatus) {

Check warning on line 918 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.MissingSwitchDefaultCheck

switch without "default" clause.
case PROVISIONING:
appMetadataStore.recordProgramProvisioning(
programRunId,
Expand Down
Loading
Loading