Skip to content

Commit

Permalink
Replace inheritance with composition
Browse files Browse the repository at this point in the history
  • Loading branch information
vsethi09 committed Jan 29, 2025
1 parent 4b9b796 commit 4ded2d9
Show file tree
Hide file tree
Showing 23 changed files with 917 additions and 848 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import io.cdap.cdap.gateway.handlers.ProfileHttpHandler;
import io.cdap.cdap.gateway.handlers.ProgramLifecycleHttpHandler;
import io.cdap.cdap.gateway.handlers.ProgramLifecycleHttpHandlerInternal;
import io.cdap.cdap.gateway.handlers.ProgramRuntimeLifecycleHttpHandler;
import io.cdap.cdap.gateway.handlers.ProgramRuntimeHttpHandler;
import io.cdap.cdap.gateway.handlers.ProvisionerHttpHandler;
import io.cdap.cdap.gateway.handlers.SourceControlManagementHttpHandler;
import io.cdap.cdap.gateway.handlers.TransactionHttpHandler;
Expand All @@ -87,6 +87,7 @@
import io.cdap.cdap.gateway.handlers.WorkflowHttpHandler;
import io.cdap.cdap.gateway.handlers.WorkflowStatsSLAHttpHandler;
import io.cdap.cdap.gateway.handlers.meta.RemotePrivilegesHandler;
import io.cdap.cdap.gateway.handlers.util.ProgramUtil;
import io.cdap.cdap.internal.app.deploy.ConfiguratorFactory;
import io.cdap.cdap.internal.app.deploy.ConfiguratorFactoryProvider;
import io.cdap.cdap.internal.app.deploy.InMemoryConfigurator;
Expand Down Expand Up @@ -118,7 +119,6 @@
import io.cdap.cdap.internal.app.services.LocalRunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.NoopRunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.internal.app.services.ProgramRuntimeLifecycleService;
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.FlowControlService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
Expand Down Expand Up @@ -251,10 +251,6 @@ protected void configure() {
Names.named("appfabric.handler.hooks"));
handlerHookNamesBinder.addBinding().toInstance(Constants.Service.APP_FABRIC_HTTP);

Multibinder<HttpHandler> handlerBinder = Multibinder.newSetBinder(
binder(), HttpHandler.class, Names.named(Constants.AppFabric.SERVER_HANDLERS_BINDING));
handlerBinder.addBinding().to(ProgramRuntimeLifecycleHttpHandler.class);

// TODO: Uncomment after CDAP-7688 is resolved
// servicesNamesBinder.addBinding().toInstance(Constants.Service.MESSAGING_SERVICE);
// handlerHookNamesBinder.addBinding().toInstance(Constants.Service.MESSAGING_SERVICE);
Expand Down Expand Up @@ -449,9 +445,7 @@ protected void configure() {
bind(ProfileService.class).in(Scopes.SINGLETON);
bind(FlowControlService.class).in(Scopes.SINGLETON);
bind(ProgramLifecycleService.class).in(Scopes.SINGLETON);
if (serviceTypes.contains(ServiceType.PROCESSOR)) {
bind(ProgramRuntimeLifecycleService.class).in(Scopes.SINGLETON);
}
bind(ProgramUtil.class).in(Scopes.SINGLETON);
bind(SystemAppManagementService.class).in(Scopes.SINGLETON);
bind(OwnerAdmin.class).to(DefaultOwnerAdmin.class);
bind(CoreSchedulerService.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -547,7 +541,7 @@ protected void configure() {
Multibinder<HttpHandler> processorHandlerBinder = Multibinder.newSetBinder(
binder(), HttpHandler.class, Names.named(AppFabric.PROCESSOR_HANDLERS_BINDING));
CommonHandlers.add(processorHandlerBinder);
processorHandlerBinder.addBinding().to(ProgramRuntimeLifecycleHttpHandler.class);
processorHandlerBinder.addBinding().to(ProgramRuntimeHttpHandler.class);
processorHandlerBinder.addBinding().to(BootstrapHttpHandler.class);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.io.Closeables;
Expand All @@ -26,12 +27,14 @@
import com.google.inject.Inject;
import io.cdap.cdap.app.deploy.ProgramRunDispatcherContext;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.twill.TwillAppNames;
import io.cdap.cdap.internal.app.deploy.ProgramRunDispatcherFactory;
import io.cdap.cdap.internal.app.runtime.AbstractListener;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.internal.app.runtime.service.SimpleRuntimeInfo;
import io.cdap.cdap.proto.InMemoryProgramLiveInfo;
import io.cdap.cdap.proto.NotRunningProgramLiveInfo;
Expand All @@ -46,6 +49,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -59,6 +64,7 @@
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -121,11 +127,18 @@ void setRemoteTwillRunnerService(
@Override
public final RuntimeInfo run(ProgramDescriptor programDescriptor, ProgramOptions options,
RunId runId) {
ProgramId programId = programDescriptor.getProgramId();
ProgramRunId programRunId = programId.run(runId);
synchronized (this) {
RuntimeInfo runtimeInfo = lookup(programRunId.getParent(), runId);
if (runtimeInfo != null) {
return runtimeInfo;
}
}

ProgramRunDispatcherContext dispatcherContext = new ProgramRunDispatcherContext(
programDescriptor, options, runId,
isDistributed());
ProgramId programId = programDescriptor.getProgramId();
ProgramRunId programRunId = programId.run(runId);
DelayedProgramController controller = new DelayedProgramController(programRunId);
RuntimeInfo runtimeInfo = createRuntimeInfo(controller, programId,
dispatcherContext::executeCleanupTasks);
Expand Down Expand Up @@ -239,6 +252,28 @@ public List<RuntimeInfo> listAll(ProgramType... types) {
return runningPrograms;
}

@Override
public void resetProgramLogLevels(ProgramId programId, Set<String> loggerNames,
@Nullable String runId) throws Exception {
if (!EnumSet.of(ProgramType.SERVICE, ProgramType.WORKER).contains(programId.getType())) {
throw new BadRequestException(
String.format("Resetting log levels for program type %s is not supported",
programId.getType().getPrettyName()));
}
resetLogLevels(programId, loggerNames, runId);
}

@Override
public void updateProgramLogLevels(ProgramId programId, Map<String, LogEntry.Level> logLevels,
@Nullable String runId) throws Exception {
if (!EnumSet.of(ProgramType.SERVICE, ProgramType.WORKER).contains(programId.getType())) {
throw new BadRequestException(
String.format("Updating log levels for program type %s is not supported",
programId.getType().getPrettyName()));
}
updateLogLevels(programId, logLevels, runId);
}

@Override
protected void startUp() throws Exception {
// Limits to at max poolSize number of concurrent program launch.
Expand Down Expand Up @@ -436,4 +471,79 @@ private void cleanupRuntimeInfo(@Nullable RuntimeInfo info) {
Closeables.closeQuietly((Closeable) info);
}
}

/**
* Helper method to get the {@link LogLevelUpdater} for the program.
*/
private LogLevelUpdater getLogLevelUpdater(RuntimeInfo runtimeInfo) throws Exception {
ProgramController programController = runtimeInfo.getController();
if (!(programController instanceof LogLevelUpdater)) {
throw new BadRequestException(
"Update log levels at runtime is only supported in distributed mode");
}
return ((LogLevelUpdater) programController);
}

/**
* Helper method to update log levels for Worker or Service.
*/
private void updateLogLevels(ProgramId programId, Map<String, LogEntry.Level> logLevels,
@Nullable String runId) throws Exception {
ProgramRuntimeService.RuntimeInfo runtimeInfo = findRuntimeInfo(programId, runId).values()
.stream()
.findFirst().orElse(null);
if (runtimeInfo != null) {
LogLevelUpdater logLevelUpdater = getLogLevelUpdater(runtimeInfo);
logLevelUpdater.updateLogLevels(logLevels, null);
}
}

/**
* Helper method to reset log levels for Worker or Service.
*/
private void resetLogLevels(ProgramId programId, Set<String> loggerNames, @Nullable String runId)
throws Exception {
ProgramRuntimeService.RuntimeInfo runtimeInfo = findRuntimeInfo(programId, runId).values()
.stream()
.findFirst().orElse(null);
if (runtimeInfo != null) {
LogLevelUpdater logLevelUpdater = getLogLevelUpdater(runtimeInfo);
logLevelUpdater.resetLogLevels(loggerNames, null);
}
}

@Override
public void setInstances(ProgramId programId, int instances, int oldInstances)
throws ExecutionException, InterruptedException, BadRequestException {
ProgramRuntimeService.RuntimeInfo runtimeInfo = findRuntimeInfo(programId);
if (runtimeInfo != null) {
runtimeInfo.getController().command(ProgramOptionConstants.INSTANCES,
ImmutableMap.of("runnable", programId.getProgram(),
"newInstances", String.valueOf(instances),
"oldInstances", String.valueOf(oldInstances))).get();
}
}

private Map<RunId, ProgramRuntimeService.RuntimeInfo> findRuntimeInfo(
ProgramId programId, @Nullable String runId) throws BadRequestException {

if (runId != null) {
RunId run;
try {
run = RunIds.fromString(runId);
} catch (IllegalArgumentException e) {
throw new BadRequestException("Error parsing run-id.", e);
}
ProgramRuntimeService.RuntimeInfo runtimeInfo = lookup(programId, run);
return runtimeInfo == null ? Collections.emptyMap()
: Collections.singletonMap(run, runtimeInfo);
}
return new HashMap<>(list(programId));
}

@Nullable
protected ProgramRuntimeService.RuntimeInfo findRuntimeInfo(ProgramId programId)
throws BadRequestException {
return findRuntimeInfo(programId, null).values().stream().findFirst().orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

import com.google.common.util.concurrent.Service;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.proto.ProgramLiveInfo;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.security.StandardPermission;
import io.cdap.cdap.security.spi.authorization.UnauthorizedException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.twill.api.RunId;
import org.apache.twill.api.logging.LogEntry;

/**
* Service for interacting with the runtime system.
Expand All @@ -47,7 +53,7 @@ interface RuntimeInfo {
}

/**
* Starts the given program and return a {@link RuntimeInfo} about the running program.
* Runs the given program and return a {@link RuntimeInfo} about the running program.
*
* @param programDescriptor describing the program to run
* @param options {@link ProgramOptions} that are needed by the program.
Expand Down Expand Up @@ -97,4 +103,60 @@ interface RuntimeInfo {
* @param types Types of program to check returns List of info about running programs.
*/
List<RuntimeInfo> listAll(ProgramType... types);

/**
* Reset log levels for the given program. Only supported program types for this action are {@link
* ProgramType#SERVICE} and {@link ProgramType#WORKER}.
*
* @param programId the {@link ProgramId} of the program for which log levels are to be
* reset.
* @param loggerNames the {@link String} set of the logger names to be updated, empty means
* reset for all loggers.
* @param runId the run id of the program.
* @throws InterruptedException if there is an error while asynchronously resetting log
* levels.
* @throws ExecutionException if there is an error while asynchronously resetting log levels.
* @throws UnauthorizedException if the user does not have privileges to reset log levels for
* the specified program. To reset log levels for a program, a user needs {@link
* StandardPermission#UPDATE} on the program.
*/
void resetProgramLogLevels(ProgramId programId, Set<String> loggerNames, @Nullable String runId) throws Exception;

/**
* Update log levels for the given program. Only supported program types for this action are
* {@link ProgramType#SERVICE} and {@link ProgramType#WORKER}.
*
* @param programId the {@link ProgramId} of the program for which log levels are to be
* updated
* @param logLevels the {@link Map} of the log levels to be updated.
* @param runId the run id of the program.
* @throws InterruptedException if there is an error while asynchronously updating log
* levels.
* @throws ExecutionException if there is an error while asynchronously updating log levels.
* @throws BadRequestException if the log level is not valid or the program type is not
* supported.
* @throws UnauthorizedException if the user does not have privileges to update log levels for
* the specified program. To update log levels for a program, a user needs {@link
* StandardPermission#UPDATE} on the program.
*/
void updateProgramLogLevels(ProgramId programId, Map<String, LogEntry.Level> logLevels, @Nullable String runId)
throws Exception;

/**
* Set instances for the given program. Only supported program types for this action are {@link
* ProgramType#SERVICE} and {@link ProgramType#WORKER}.
*
* @param programId the {@link ProgramId} of the program for which instances are to be
* updated
* @param instances the number of instances to be updated.
* @param instances the previous number of instances.
*
* @throws InterruptedException if there is an error while asynchronously updating instances
* @throws ExecutionException if there is an error while asynchronously updating instances
* @throws BadRequestException if the number of instances specified is less than 0
* @throws UnauthorizedException if the user does not have privileges to set instances for the
* specified program. To set instances for a program, a user needs {@link
* StandardPermission#UPDATE} on the program.
*/
void setInstances(ProgramId programId, int instances, int oldInstances) throws Exception;
}
28 changes: 28 additions & 0 deletions cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.cdap.cdap.app.program.Program;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.common.ApplicationNotFoundException;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ConflictException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.ProgramNotFoundException;
Expand Down Expand Up @@ -381,6 +382,23 @@ default int getProgramActiveRunsCount(ProgramReference programRef) {
@Nullable
RunRecordDetail getRun(ProgramReference programRef, String runId);

/**
* Fetches the run record for particular run of a program without version.
*
* @param namespace namespace id
* @param appName application name
* @param type program type
* @param programName program name
* @param runId the run id
*
* @return run record for the specified program and runRef
*
* @throws BadRequestException for invalid request
* @throws NotFoundException if run record is not found
*/
RunRecordDetail getRunRecordDetailFromId(String namespace, String appName,
String type, String programName, String runId) throws BadRequestException, NotFoundException;

/**
* Creates new application if it doesn't exist. Updates existing one otherwise.
* Always marks the added application as latest.
Expand Down Expand Up @@ -464,6 +482,16 @@ void updateApplicationSourceControlMeta(Map<ApplicationId, SourceControlMeta> up
@Nullable
ApplicationMeta getLatest(ApplicationReference appRef);

/**
* Gets latest app version.
*
* @param namespaceId namespace Id
* @param appId app Id
*
* @return latest app version
*/
String getLatestAppVersion(String namespaceId, String appId) throws ApplicationNotFoundException;

/**
* Scans for the latest applications across all namespaces.
*
Expand Down
Loading

0 comments on commit 4ded2d9

Please sign in to comment.