Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-21096] Split ProgramLifecycleService for Appfabric service and processor #15824

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import io.cdap.cdap.gateway.handlers.ProfileHttpHandler;
import io.cdap.cdap.gateway.handlers.ProgramLifecycleHttpHandler;
import io.cdap.cdap.gateway.handlers.ProgramLifecycleHttpHandlerInternal;
import io.cdap.cdap.gateway.handlers.ProgramRuntimeHttpHandler;
import io.cdap.cdap.gateway.handlers.ProvisionerHttpHandler;
import io.cdap.cdap.gateway.handlers.SourceControlManagementHttpHandler;
import io.cdap.cdap.gateway.handlers.TransactionHttpHandler;
Expand Down Expand Up @@ -114,11 +115,11 @@
import io.cdap.cdap.internal.app.runtime.schedule.store.TriggerMisfireLogger;
import io.cdap.cdap.internal.app.runtime.workflow.BasicWorkflowStateWriter;
import io.cdap.cdap.internal.app.runtime.workflow.WorkflowStateWriter;
import io.cdap.cdap.internal.app.services.FlowControlService;
import io.cdap.cdap.internal.app.services.LocalRunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.NoopRunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.FlowControlService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
Expand Down Expand Up @@ -249,17 +250,11 @@ protected void configure() {
Names.named("appfabric.handler.hooks"));
handlerHookNamesBinder.addBinding().toInstance(Constants.Service.APP_FABRIC_HTTP);

// TODO (CDAP-21112): Remove the addtional handler binding for in-memory and use the binding from
// AppFabricServiceModule, after fixing in-memory cache issue in ProgramRuntimeService and
// RunRecordMonitorService.
// For ProgramLifecycleHttpHandlerTest the ProgramRuntimeHttpHandler needs to be present
// in the appfabric service.
Multibinder<HttpHandler> handlerBinder = Multibinder.newSetBinder(
binder(), HttpHandler.class, Names.named(Constants.AppFabric.SERVER_HANDLERS_BINDING));
handlerBinder.addBinding().to(BootstrapHttpHandler.class);
handlerBinder.addBinding().to(AppLifecycleHttpHandler.class);
handlerBinder.addBinding().to(AppLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(ProgramLifecycleHttpHandler.class);
handlerBinder.addBinding().to(ProgramLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(WorkflowHttpHandler.class);
handlerBinder.addBinding().to(ProgramRuntimeHttpHandler.class);

// TODO: Uncomment after CDAP-7688 is resolved
// servicesNamesBinder.addBinding().toInstance(Constants.Service.MESSAGING_SERVICE);
Expand Down Expand Up @@ -529,6 +524,11 @@ protected void configure() {
handlerBinder.addBinding().to(CredentialProviderHttpHandler.class);
handlerBinder.addBinding().to(CredentialProviderHttpHandlerInternal.class);
handlerBinder.addBinding().to(OperationHttpHandler.class);
handlerBinder.addBinding().to(AppLifecycleHttpHandler.class);
handlerBinder.addBinding().to(AppLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(ProgramLifecycleHttpHandler.class);
handlerBinder.addBinding().to(ProgramLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(WorkflowHttpHandler.class);

FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
Expand All @@ -545,14 +545,8 @@ protected void configure() {
Multibinder<HttpHandler> processorHandlerBinder = Multibinder.newSetBinder(
binder(), HttpHandler.class, Names.named(AppFabric.PROCESSOR_HANDLERS_BINDING));
CommonHandlers.add(processorHandlerBinder);
// TODO (CDAP-21112): Move HTTP handler from Appfabric processor to server after fixing
// ProgramRuntimeService and RunRecordMonitorService.
processorHandlerBinder.addBinding().to(ProgramRuntimeHttpHandler.class);
processorHandlerBinder.addBinding().to(BootstrapHttpHandler.class);
processorHandlerBinder.addBinding().to(AppLifecycleHttpHandler.class);
processorHandlerBinder.addBinding().to(AppLifecycleHttpHandlerInternal.class);
processorHandlerBinder.addBinding().to(ProgramLifecycleHttpHandler.class);
processorHandlerBinder.addBinding().to(ProgramLifecycleHttpHandlerInternal.class);
processorHandlerBinder.addBinding().to(WorkflowHttpHandler.class);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.io.Closeables;
Expand All @@ -26,12 +27,14 @@
import com.google.inject.Inject;
import io.cdap.cdap.app.deploy.ProgramRunDispatcherContext;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.twill.TwillAppNames;
import io.cdap.cdap.internal.app.deploy.ProgramRunDispatcherFactory;
import io.cdap.cdap.internal.app.runtime.AbstractListener;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.internal.app.runtime.service.SimpleRuntimeInfo;
import io.cdap.cdap.proto.InMemoryProgramLiveInfo;
import io.cdap.cdap.proto.NotRunningProgramLiveInfo;
Expand All @@ -46,6 +49,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -59,6 +64,7 @@
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -121,27 +127,38 @@ void setRemoteTwillRunnerService(
@Override
public final RuntimeInfo run(ProgramDescriptor programDescriptor, ProgramOptions options,
RunId runId) {
ProgramRunDispatcherContext dispatcherContext = new ProgramRunDispatcherContext(
programDescriptor, options, runId,
isDistributed());
ProgramId programId = programDescriptor.getProgramId();
ProgramRunId programRunId = programId.run(runId);
DelayedProgramController controller = new DelayedProgramController(programRunId);
RuntimeInfo runtimeInfo = createRuntimeInfo(controller, programId,
dispatcherContext::executeCleanupTasks);
updateRuntimeInfo(runtimeInfo);
executor.execute(() -> {
try {
controller.setProgramController(
programRunDispatcherFactory.getProgramRunDispatcher(programId.getType())
.dispatchProgram(dispatcherContext));
} catch (Exception e) {
controller.failed(e);
programStateWriter.error(programRunId, e);
LOG.error("Exception while trying to run program run {}", programRunId, e);
Lock lock = runtimeInfosLock.writeLock();
lock.lock();
try {
RuntimeInfo runtimeInfo = lookup(programRunId.getParent(), runId);
if (runtimeInfo != null) {
return runtimeInfo;
}
});
return runtimeInfo;

ProgramRunDispatcherContext dispatcherContext = new ProgramRunDispatcherContext(
programDescriptor, options, runId,
isDistributed());
DelayedProgramController controller = new DelayedProgramController(programRunId);
runtimeInfo = createRuntimeInfo(controller, programId,
dispatcherContext::executeCleanupTasks);
updateRuntimeInfo(runtimeInfo);
executor.execute(() -> {
try {
controller.setProgramController(
programRunDispatcherFactory.getProgramRunDispatcher(programId.getType())
.dispatchProgram(dispatcherContext));
} catch (Exception e) {
controller.failed(e);
programStateWriter.error(programRunId, e);
LOG.error("Exception while trying to run program run {}", programRunId, e);
}
});
return runtimeInfo;
} finally {
lock.unlock();
}
}

@Override
Expand Down Expand Up @@ -239,6 +256,28 @@ public List<RuntimeInfo> listAll(ProgramType... types) {
return runningPrograms;
}

@Override
public void resetProgramLogLevels(ProgramId programId, Set<String> loggerNames,
@Nullable String runId) throws Exception {
if (!EnumSet.of(ProgramType.SERVICE, ProgramType.WORKER).contains(programId.getType())) {
throw new BadRequestException(
String.format("Resetting log levels for program type %s is not supported",
programId.getType().getPrettyName()));
}
resetLogLevels(programId, loggerNames, runId);
}

@Override
public void updateProgramLogLevels(ProgramId programId, Map<String, LogEntry.Level> logLevels,
@Nullable String runId) throws Exception {
if (!EnumSet.of(ProgramType.SERVICE, ProgramType.WORKER).contains(programId.getType())) {
throw new BadRequestException(
String.format("Updating log levels for program type %s is not supported",
programId.getType().getPrettyName()));
}
updateLogLevels(programId, logLevels, runId);
}

@Override
protected void startUp() throws Exception {
// Limits to at max poolSize number of concurrent program launch.
Expand Down Expand Up @@ -436,4 +475,79 @@ private void cleanupRuntimeInfo(@Nullable RuntimeInfo info) {
Closeables.closeQuietly((Closeable) info);
}
}

/**
* Helper method to get the {@link LogLevelUpdater} for the program.
*/
private LogLevelUpdater getLogLevelUpdater(RuntimeInfo runtimeInfo) throws Exception {
ProgramController programController = runtimeInfo.getController();
if (!(programController instanceof LogLevelUpdater)) {
throw new BadRequestException(
"Update log levels at runtime is only supported in distributed mode");
}
return ((LogLevelUpdater) programController);
}

/**
* Helper method to update log levels for Worker or Service.
*/
private void updateLogLevels(ProgramId programId, Map<String, LogEntry.Level> logLevels,
@Nullable String runId) throws Exception {
ProgramRuntimeService.RuntimeInfo runtimeInfo = findRuntimeInfo(programId, runId).values()
.stream()
.findFirst().orElse(null);
if (runtimeInfo != null) {
LogLevelUpdater logLevelUpdater = getLogLevelUpdater(runtimeInfo);
logLevelUpdater.updateLogLevels(logLevels, null);
}
}

/**
* Helper method to reset log levels for Worker or Service.
*/
private void resetLogLevels(ProgramId programId, Set<String> loggerNames, @Nullable String runId)
throws Exception {
ProgramRuntimeService.RuntimeInfo runtimeInfo = findRuntimeInfo(programId, runId).values()
.stream()
.findFirst().orElse(null);
if (runtimeInfo != null) {
LogLevelUpdater logLevelUpdater = getLogLevelUpdater(runtimeInfo);
logLevelUpdater.resetLogLevels(loggerNames, null);
}
}

@Override
public void setInstances(ProgramId programId, int instances, int oldInstances)
throws ExecutionException, InterruptedException, BadRequestException {
ProgramRuntimeService.RuntimeInfo runtimeInfo = findRuntimeInfo(programId);
if (runtimeInfo != null) {
runtimeInfo.getController().command(ProgramOptionConstants.INSTANCES,
ImmutableMap.of("runnable", programId.getProgram(),
"newInstances", String.valueOf(instances),
"oldInstances", String.valueOf(oldInstances))).get();
}
}

private Map<RunId, ProgramRuntimeService.RuntimeInfo> findRuntimeInfo(
ProgramId programId, @Nullable String runId) throws BadRequestException {

if (runId != null) {
RunId run;
try {
run = RunIds.fromString(runId);
} catch (IllegalArgumentException e) {
throw new BadRequestException("Error parsing run-id.", e);
}
ProgramRuntimeService.RuntimeInfo runtimeInfo = lookup(programId, run);
return runtimeInfo == null ? Collections.emptyMap()
: Collections.singletonMap(run, runtimeInfo);
}
return new HashMap<>(list(programId));
}

@Nullable
protected ProgramRuntimeService.RuntimeInfo findRuntimeInfo(ProgramId programId)
throws BadRequestException {
return findRuntimeInfo(programId, null).values().stream().findFirst().orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

import com.google.common.util.concurrent.Service;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.proto.ProgramLiveInfo;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.security.StandardPermission;
import io.cdap.cdap.security.spi.authorization.UnauthorizedException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.twill.api.RunId;
import org.apache.twill.api.logging.LogEntry;

/**
* Service for interacting with the runtime system.
Expand All @@ -47,7 +53,9 @@ interface RuntimeInfo {
}

/**
* Starts the given program and return a {@link RuntimeInfo} about the running program.
tivv marked this conversation as resolved.
Show resolved Hide resolved
* Runs the given program and return a {@link RuntimeInfo} about the running program. The program
* is run if it is not already running, otherwise the {@link RuntimeInfo} of the already running
* program is returned.
*
* @param programDescriptor describing the program to run
* @param options {@link ProgramOptions} that are needed by the program.
Expand Down Expand Up @@ -97,4 +105,60 @@ interface RuntimeInfo {
* @param types Types of program to check returns List of info about running programs.
*/
List<RuntimeInfo> listAll(ProgramType... types);

/**
* Reset log levels for the given program. Only supported program types for this action are {@link
* ProgramType#SERVICE} and {@link ProgramType#WORKER}.
*
* @param programId the {@link ProgramId} of the program for which log levels are to be
* reset.
* @param loggerNames the {@link String} set of the logger names to be updated, empty means
* reset for all loggers.
* @param runId the run id of the program.
* @throws InterruptedException if there is an error while asynchronously resetting log
* levels.
* @throws ExecutionException if there is an error while asynchronously resetting log levels.
* @throws UnauthorizedException if the user does not have privileges to reset log levels for
* the specified program. To reset log levels for a program, a user needs {@link
* StandardPermission#UPDATE} on the program.
*/
void resetProgramLogLevels(ProgramId programId, Set<String> loggerNames, @Nullable String runId) throws Exception;

/**
* Update log levels for the given program. Only supported program types for this action are
* {@link ProgramType#SERVICE} and {@link ProgramType#WORKER}.
*
* @param programId the {@link ProgramId} of the program for which log levels are to be
* updated
* @param logLevels the {@link Map} of the log levels to be updated.
* @param runId the run id of the program.
* @throws InterruptedException if there is an error while asynchronously updating log
* levels.
* @throws ExecutionException if there is an error while asynchronously updating log levels.
* @throws BadRequestException if the log level is not valid or the program type is not
* supported.
* @throws UnauthorizedException if the user does not have privileges to update log levels for
* the specified program. To update log levels for a program, a user needs {@link
* StandardPermission#UPDATE} on the program.
*/
void updateProgramLogLevels(ProgramId programId, Map<String, LogEntry.Level> logLevels, @Nullable String runId)
throws Exception;

/**
* Set instances for the given program. Only supported program types for this action are {@link
* ProgramType#SERVICE} and {@link ProgramType#WORKER}.
*
* @param programId the {@link ProgramId} of the program for which instances are to be
* updated
* @param instances the number of instances to be updated.
* @param instances the previous number of instances.
*
* @throws InterruptedException if there is an error while asynchronously updating instances
* @throws ExecutionException if there is an error while asynchronously updating instances
* @throws BadRequestException if the number of instances specified is less than 0
* @throws UnauthorizedException if the user does not have privileges to set instances for the
* specified program. To set instances for a program, a user needs {@link
* StandardPermission#UPDATE} on the program.
*/
void setInstances(ProgramId programId, int instances, int oldInstances) throws Exception;
}
11 changes: 11 additions & 0 deletions cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.cdap.cdap.app.program.Program;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.common.ApplicationNotFoundException;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ConflictException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.ProgramNotFoundException;
Expand Down Expand Up @@ -464,6 +465,16 @@ void updateApplicationSourceControlMeta(Map<ApplicationId, SourceControlMeta> up
@Nullable
ApplicationMeta getLatest(ApplicationReference appRef);

/**
* Gets the ApplicationId with the latest version for the given ApplicationReference.
*
* @param appRef ApplicationReference
* @return ApplicationId for the latest version
*
* @throws ApplicationNotFoundException if the app was not found for the given application reference.
*/
ApplicationId getLatestApp(ApplicationReference appRef) throws ApplicationNotFoundException;
vsethi09 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Scans for the latest applications across all namespaces.
*
Expand Down
Loading
Loading