Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-21046][CDAP-21048] Fixing flow control metrics on startup: #15737

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.twill.internal.CompositeService;
Expand All @@ -101,6 +100,9 @@
* events topic
*/
public class ProgramNotificationSubscriberService extends AbstractIdleService {

private static final Logger LOG =
LoggerFactory.getLogger(ProgramNotificationSubscriberService.class);
private final MessagingService messagingService;
private final CConfiguration cConf;
private final MetricsCollectionService metricsCollectionService;
Expand Down Expand Up @@ -145,6 +147,8 @@ protected void startUp() throws Exception {
List<Service> children = new ArrayList<>();
String topicPrefix = cConf.get(Constants.AppFabric.PROGRAM_STATUS_EVENT_TOPIC);
int numPartitions = cConf.getInt(Constants.AppFabric.PROGRAM_STATUS_EVENT_NUM_PARTITIONS);
// Active runs should be restored only once not for every shard that is created.
restoreActiveRuns();
// Add bare one - we always listen to it
children.add(createChildService("program.status", topicPrefix));
// If number of partitions is more than 1 - create partitioned services
Expand All @@ -153,8 +157,60 @@ protected void startUp() throws Exception {
.forEach(i -> children.add(createChildService("program.status." + i, topicPrefix + i)));
}
delegate = new CompositeService(children);

delegate.startAndWait();
// Explicitly emit both launching and running counts on startup.
emitFlowControlMetrics();
}

private void emitFlowControlMetrics() {
runRecordMonitorService.emitLaunchingMetrics();
runRecordMonitorService.emitRunningMetrics();
}

private void restoreActiveRuns() {
LOG.info("Restoring active runs");
int batchSize = cConf.getInt(Constants.RuntimeMonitor.INIT_BATCH_SIZE);
RetryStrategy retryStrategy =
RetryStrategies.fromConfiguration(cConf, Constants.Service.RUNTIME_MONITOR_RETRY_PREFIX);
long startTs = System.currentTimeMillis();

Retries.runWithRetries(
() ->
store.scanActiveRuns(
batchSize,
(runRecordDetail) -> {
if (runRecordDetail.getStartTs() > startTs) {
return;
}
try {
LOG.info("Found active run: {}", runRecordDetail.getProgramRunId());
if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
} else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
// It is unknown what is the state of program runs in STARTING state.
// A STARTING message is published again to retry STARTING logic.
ProgramOptions programOptions =
new SimpleProgramOptions(
runRecordDetail.getProgramRunId().getParent(),
new BasicArguments(runRecordDetail.getSystemArgs()),
new BasicArguments(runRecordDetail.getUserArgs()));
LOG.debug("Retrying to start run {}.", runRecordDetail.getProgramRunId());
programStateWriter.start(
runRecordDetail.getProgramRunId(),
programOptions,
null,
this.store.loadProgram(runRecordDetail.getProgramRunId().getParent()));
}
} catch (Exception e) {
ProgramRunId programRunId = runRecordDetail.getProgramRunId();
LOG.warn(
"Retrying to start run {} failed. Marking it as failed.", programRunId, e);
programStateWriter.error(programRunId, e);
}
}),
retryStrategy,
e -> true);
}

@Override
Expand All @@ -178,7 +234,6 @@ private ProgramNotificationSingleTopicSubscriberService createChildService(
provisioningService,
programStateWriter,
transactionRunner,
store,
runRecordMonitorService,
name,
topicName,
Expand All @@ -195,7 +250,7 @@ class ProgramNotificationSingleTopicSubscriberService
extends AbstractNotificationSubscriberService {

private static final Logger LOG =
LoggerFactory.getLogger(ProgramNotificationSubscriberService.class);
LoggerFactory.getLogger(ProgramNotificationSingleTopicSubscriberService.class);

private static final Gson GSON =
ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).create();
Expand All @@ -220,8 +275,6 @@ class ProgramNotificationSingleTopicSubscriberService
private final Queue<Runnable> tasks;
private final MetricsCollectionService metricsCollectionService;
private Set<ProgramCompletionNotifier> programCompletionNotifiers;
private final CConfiguration cConf;
private final Store store;
private final RunRecordMonitorService runRecordMonitorService;
private final boolean checkTxSeparation;

Expand All @@ -234,7 +287,6 @@ class ProgramNotificationSingleTopicSubscriberService
ProvisioningService provisioningService,
ProgramStateWriter programStateWriter,
TransactionRunner transactionRunner,
Store store,
RunRecordMonitorService runRecordMonitorService,
String name,
String topicName,
Expand All @@ -259,8 +311,6 @@ class ProgramNotificationSingleTopicSubscriberService
this.metricsCollectionService = metricsCollectionService;
this.programCompletionNotifiers = programCompletionNotifiers;
this.runRecordMonitorService = runRecordMonitorService;
this.cConf = cConf;
this.store = store;

// If number of partitions equals 1, DB deadlock cannot happen as a result of concurrent
// modifications to
Expand All @@ -273,55 +323,6 @@ class ProgramNotificationSingleTopicSubscriberService
@Override
protected void doStartUp() throws Exception {
super.doStartUp();

int batchSize = cConf.getInt(Constants.RuntimeMonitor.INIT_BATCH_SIZE);
RetryStrategy retryStrategy =
RetryStrategies.fromConfiguration(cConf, Constants.Service.RUNTIME_MONITOR_RETRY_PREFIX);
long startTs = System.currentTimeMillis();

AtomicBoolean launching = new AtomicBoolean(false);
Retries.runWithRetries(
() ->
store.scanActiveRuns(
batchSize,
(runRecordDetail) -> {
if (runRecordDetail.getStartTs() > startTs) {
return;
}
try {
if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) {
launching.set(true);
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
} else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) {
launching.set(true);
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
// It is unknown what is the state of program runs in STARTING state.
// A STARTING message is published again to retry STARTING logic.
ProgramOptions programOptions =
new SimpleProgramOptions(
runRecordDetail.getProgramRunId().getParent(),
new BasicArguments(runRecordDetail.getSystemArgs()),
new BasicArguments(runRecordDetail.getUserArgs()));
LOG.debug("Retrying to start run {}.", runRecordDetail.getProgramRunId());
programStateWriter.start(
runRecordDetail.getProgramRunId(),
programOptions,
null,
this.store.loadProgram(runRecordDetail.getProgramRunId().getParent()));
}
} catch (Exception e) {
ProgramRunId programRunId = runRecordDetail.getProgramRunId();
LOG.warn(
"Retrying to start run {} failed. Marking it as failed.", programRunId, e);
programStateWriter.error(programRunId, e);
}
}),
retryStrategy,
e -> true);
if (!launching.get()) {
// there is no launching pipeline
runRecordMonitorService.emitLaunchingMetrics(0);
}
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.Metrics.FlowControl;
import io.cdap.cdap.proto.ProgramRunStatus;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramRunId;
Expand Down Expand Up @@ -184,15 +185,31 @@ public void removeRequest(ProgramRunId programRunId, boolean emitRunningChange)
}

if (emitRunningChange) {
emitMetrics(Constants.Metrics.FlowControl.RUNNING_COUNT, getProgramsRunningCount());
emitRunningMetrics();
}
}

public void emitLaunchingMetrics(long value) {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, value);
}

/**
* Emit the {@link Constants.Metrics.FlowControl#LAUNCHING_COUNT} metric for runs.
*/
public void emitLaunchingMetrics() {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size());
}


/**
* Emit the {@link Constants.Metrics.FlowControl#RUNNING_COUNT} metric for runs.
*/
public void emitRunningMetrics() {
emitMetrics(FlowControl.RUNNING_COUNT, getProgramsRunningCount());
}

private void emitMetrics(String metricName, long value) {
LOG.debug("Setting metric {} to value {}", metricName, value);
metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value);
}

Expand All @@ -208,11 +225,11 @@ private void cleanupQueue() {
// Queue head might have already been removed. So instead of calling poll, we call remove.
if (launchingQueue.remove(programRunId)) {
LOG.info("Removing request with runId {} due to expired retention time.", programRunId);
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size());
}
}

emitMetrics(Constants.Metrics.FlowControl.RUNNING_COUNT, getProgramsRunningCount());
// Always emit both metrics after cleanup.
emitLaunchingMetrics();
emitRunningMetrics();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ protected void configure() {
});
}

public static <T extends Service> T getService(Class<T> clazz) {
for (Service service : services) {
if (clazz.isAssignableFrom(service.getClass())) {
return (T) service;
}
}
return null;
}

public static Injector getInjector(CConfiguration cConf, Module overrides) {
return getInjector(cConf, null, overrides);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.Metrics.FlowControl;
import io.cdap.cdap.common.id.Id;
import io.cdap.cdap.common.utils.ProjectInfo;
import io.cdap.cdap.common.utils.Tasks;
Expand Down Expand Up @@ -308,6 +309,67 @@ public void testMetricsEmit() throws Exception {
metricStore.deleteAll();
}

@Test
public void testLaunchingCountMetricsOnRestart() throws Exception {
AppFabricTestHelper.deployApplication(Id.Namespace.DEFAULT, ProgramStateWorkflowApp.class, null,
cConf);
ApplicationDetail appDetail = AppFabricTestHelper.getAppInfo(Id.Namespace.DEFAULT,
ProgramStateWorkflowApp.class.getSimpleName(), cConf);

ProgramRunId workflowRunId = NamespaceId.DEFAULT
.app(ProgramStateWorkflowApp.class.getSimpleName(), appDetail.getAppVersion())
.workflow(ProgramStateWorkflowApp.ProgramStateWorkflow.class.getSimpleName())
.run(RunIds.generate());

ApplicationSpecification appSpec = TransactionRunners.run(transactionRunner, context -> {
return AppMetadataStore.create(context).getApplication(workflowRunId.getParent().getParent())
.getSpec();
});

ProgramDescriptor programDescriptor = new ProgramDescriptor(workflowRunId.getParent(), appSpec);

// Start and run the workflow
Map<String, String> systemArgs = new HashMap<>();
systemArgs.put(ProgramOptionConstants.SKIP_PROVISIONING, Boolean.TRUE.toString());
systemArgs.put(SystemArguments.PROFILE_NAME, ProfileId.NATIVE.getScopedName());
TransactionRunners.run(transactionRunner, context -> {
programStateWriter.start(workflowRunId, new SimpleProgramOptions(workflowRunId.getParent(),
new BasicArguments(systemArgs),
new BasicArguments()), null, programDescriptor);
});
checkProgramStatus(appSpec.getArtifactId(), workflowRunId, ProgramRunStatus.STARTING);

ProgramNotificationSubscriberService notificationService = AppFabricTestHelper.getService(
ProgramNotificationSubscriberService.class);
// Restart the Notification service. We are not using the stopAndWait() because we don't want to
// terminate the main service.
notificationService.shutDown();
notificationService.startUp();

MetricStore metricStore = injector.getInstance(MetricStore.class);
// Wait for metrics to be written.
Tasks.waitFor(1L, () -> queryMetrics(metricStore,
SYSTEM_METRIC_PREFIX + FlowControl.LAUNCHING_COUNT, new HashMap<>()), 10, TimeUnit.SECONDS);
Assert.assertEquals(0L, queryMetrics(metricStore,
SYSTEM_METRIC_PREFIX + FlowControl.RUNNING_COUNT, new HashMap<>()));

TransactionRunners.run(transactionRunner, context -> {
programStateWriter.running(workflowRunId, null);
});
checkProgramStatus(appSpec.getArtifactId(), workflowRunId, ProgramRunStatus.RUNNING);
// Restart the Notification service. We are not using the stopAndWait() because we don't want to
// terminate the main service.
notificationService.shutDown();
notificationService.startUp();
// Running counts are not based on metadata store in RunRecordMonitorService so not asserting it
// here.
Tasks.waitFor(0L, () -> queryMetrics(metricStore,
SYSTEM_METRIC_PREFIX + FlowControl.LAUNCHING_COUNT, new HashMap<>()), 10, TimeUnit.SECONDS);

// Cleanup metrics.
metricStore.deleteAll();
}

private Map<String, String> getAdditionalTagsForProgramMetrics(ProgramRunStatus existingStatus, String provisioner,
ProgramRunClusterStatus clusterStatus) {
Map<String, String> additionalTags = new HashMap<>();
Expand Down Expand Up @@ -460,8 +522,13 @@ private long getMetric(MetricStore metricStore, ProgramRunId programRunId, Profi
.put(Constants.Metrics.Tag.PROGRAM, programRunId.getProgram())
.putAll(additionalTags)
.build();
return queryMetrics(metricStore, metricName, tags);
}

private long queryMetrics(MetricStore metricStore, String metricName,
Map<String, String> tags) {
MetricDataQuery query = new MetricDataQuery(0, 0, Integer.MAX_VALUE, metricName, AggregationFunction.SUM,
tags, new ArrayList<>());
tags, new ArrayList<>());
Collection<MetricTimeSeries> result = metricStore.query(query);
if (result.isEmpty()) {
return 0;
Expand Down
Loading