Skip to content

Commit

Permalink
Fix RunRecordCorrectorService and AuditLogSubscriberService
Browse files Browse the repository at this point in the history
  • Loading branch information
vsethi09 committed Dec 19, 2024
1 parent 8806de4 commit 6874823
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.scheduler.CoreSchedulerService;
import io.cdap.cdap.scheduler.ScheduleNotificationSubscriberService;
import io.cdap.cdap.security.auth.AuditLogSubscriberService;
import io.cdap.cdap.sourcecontrol.RepositoryCleanupService;
import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
Expand All @@ -65,6 +66,7 @@ public class AppFabricProcessorService extends AbstractIdleService {
private final ApplicationLifecycleService applicationLifecycleService;
private final ProgramNotificationSubscriberService programNotificationSubscriberService;
private final ProgramStopSubscriberService programStopSubscriberService;
private final AuditLogSubscriberService auditLogSubscriberService;
private final RunRecordCorrectorService runRecordCorrectorService;
private final RunDataTimeToLiveService runDataTimeToLiveService;
private final ProgramRunStatusMonitorService programRunStatusMonitorService;
Expand Down Expand Up @@ -95,6 +97,7 @@ public AppFabricProcessorService(CConfiguration cConf,
ApplicationLifecycleService applicationLifecycleService,
ProgramNotificationSubscriberService programNotificationSubscriberService,
ProgramStopSubscriberService programStopSubscriberService,
AuditLogSubscriberService auditLogSubscriberService,
CoreSchedulerService coreSchedulerService,
CredentialProviderService credentialProviderService,
NamespaceCredentialProviderService namespaceCredentialProviderService,
Expand All @@ -114,6 +117,7 @@ public AppFabricProcessorService(CConfiguration cConf,
this.applicationLifecycleService = applicationLifecycleService;
this.programNotificationSubscriberService = programNotificationSubscriberService;
this.programStopSubscriberService = programStopSubscriberService;
this.auditLogSubscriberService = auditLogSubscriberService;
this.runRecordCorrectorService = runRecordCorrectorService;
this.programRunStatusMonitorService = programRunStatusMonitorService;
this.coreSchedulerService = coreSchedulerService;
Expand Down Expand Up @@ -145,6 +149,11 @@ protected void startUp() throws Exception {
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
futuresList.add(namespaceCredentialProviderService.start());
}
// Only for RBAC instances
if (Feature.DATAPLANE_AUDIT_LOGGING.isEnabled(featureFlagsProvider)
&& cConf.getBoolean(Constants.Security.ENABLED)) {
futuresList.add(auditLogSubscriberService.start());
}
futuresList.addAll(ImmutableList.of(
provisioningService.start(),
applicationLifecycleService.start(),
Expand Down Expand Up @@ -198,5 +207,6 @@ protected void shutDown() throws Exception {
credentialProviderService.stopAndWait();
namespaceCredentialProviderService.stopAndWait();
operationNotificationSubscriberService.stopAndWait();
auditLogSubscriberService.stopAndWait();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ public class AppFabricServer extends AbstractIdleService {
private final ApplicationLifecycleService applicationLifecycleService;
private final Set<String> servicesNames;
private final Set<String> handlerHookNames;
private final RunRecordCorrectorService runRecordCorrectorService;
private final RunDataTimeToLiveService runDataTimeToLiveService;
private final ProgramRunStatusMonitorService programRunStatusMonitorService;
private final RunRecordMonitorService runRecordCounterService;
private final CoreSchedulerService coreSchedulerService;
Expand Down Expand Up @@ -114,7 +112,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
@Named(Constants.AppFabric.HANDLERS_BINDING) Set<HttpHandler> handlers,
@Nullable MetricsCollectionService metricsCollectionService,
ProgramRuntimeService programRuntimeService,
RunRecordCorrectorService runRecordCorrectorService,
ProgramRunStatusMonitorService programRunStatusMonitorService,
ApplicationLifecycleService applicationLifecycleService,
@Named("appfabric.services.names") Set<String> servicesNames,
Expand All @@ -128,7 +125,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
TransactionRunner transactionRunner,
RunRecordMonitorService runRecordCounterService,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory,
RunDataTimeToLiveService runDataTimeToLiveService,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService) {
this.hostname = hostname;
Expand All @@ -141,7 +137,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.servicesNames = servicesNames;
this.handlerHookNames = handlerHookNames;
this.applicationLifecycleService = applicationLifecycleService;
this.runRecordCorrectorService = runRecordCorrectorService;
this.programRunStatusMonitorService = programRunStatusMonitorService;
this.sslEnabled = cConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED);
this.coreSchedulerService = coreSchedulerService;
Expand All @@ -152,7 +147,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.systemAppManagementService = systemAppManagementService;
this.transactionRunner = transactionRunner;
this.runRecordCounterService = runRecordCounterService;
this.runDataTimeToLiveService = runDataTimeToLiveService;
this.commonNettyHttpServiceFactory = commonNettyHttpServiceFactory;
this.sourceControlOperationRunner = sourceControlOperationRunner;
this.repositoryCleanupService = repositoryCleanupService;
Expand All @@ -177,12 +171,10 @@ protected void startUp() throws Exception {
applicationLifecycleService.start(),
bootstrapService.start(),
programRuntimeService.start(),
runRecordCorrectorService.start(),
programRunStatusMonitorService.start(),
coreSchedulerService.start(),
credentialProviderService.start(),
runRecordCounterService.start(),
runDataTimeToLiveService.start(),
sourceControlOperationRunner.start(),
repositoryCleanupService.start()
));
Expand Down Expand Up @@ -241,11 +233,9 @@ protected void shutDown() throws Exception {
cancelHttpService.cancel();
programRuntimeService.stopAndWait();
applicationLifecycleService.stopAndWait();
runRecordCorrectorService.stopAndWait();
programRunStatusMonitorService.stopAndWait();
provisioningService.stopAndWait();
runRecordCounterService.stopAndWait();
runDataTimeToLiveService.stopAndWait();
sourceControlOperationRunner.stopAndWait();
repositoryCleanupService.stopAndWait();
credentialProviderService.stopAndWait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.cdap.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutorService;
import io.cdap.cdap.gateway.handlers.log.MockLogReader;
import io.cdap.cdap.gateway.router.NettyRouter;
import io.cdap.cdap.internal.app.services.AppFabricProcessorService;
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.internal.guice.AppFabricTestModule;
import io.cdap.cdap.logging.read.LogReader;
Expand Down Expand Up @@ -106,6 +107,7 @@ public abstract class GatewayTestBase {

private static Injector injector;
private static AppFabricServer appFabricServer;
private static AppFabricProcessorService appFabricProcessorService;
private static NettyRouter router;
private static LogQueryService logQueryService;
private static MetricsQueryService metricsQueryService;
Expand Down Expand Up @@ -194,6 +196,8 @@ protected void configure() {
datasetService.startAndWait();
appFabricServer = injector.getInstance(AppFabricServer.class);
appFabricServer.startAndWait();
appFabricProcessorService = injector.getInstance(AppFabricProcessorService.class);
appFabricProcessorService.startAndWait();
logQueryService = injector.getInstance(LogQueryService.class);
logQueryService.startAndWait();
metricsQueryService = injector.getInstance(MetricsQueryService.class);
Expand All @@ -217,6 +221,7 @@ public static void stopGateway(CConfiguration conf) throws Exception {
namespaceAdmin.delete(new NamespaceId(TEST_NAMESPACE2));
namespaceAdmin.delete(NamespaceId.DEFAULT);
appFabricServer.stopAndWait();
appFabricProcessorService.stopAndWait();
metricsCollectionService.stopAndWait();
metricsQueryService.stopAndWait();
logQueryService.stopAndWait();
Expand Down
5 changes: 5 additions & 0 deletions cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService;
import io.cdap.cdap.gateway.handlers.AuthorizationHandler;
import io.cdap.cdap.internal.app.runtime.AppStateStoreProvider;
import io.cdap.cdap.internal.app.services.AppFabricProcessorService;
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService;
import io.cdap.cdap.internal.capability.CapabilityConfig;
Expand Down Expand Up @@ -231,6 +232,7 @@ public class TestBase {
private static FieldLineageAdmin fieldLineageAdmin;
private static LineageAdmin lineageAdmin;
private static AppFabricServer appFabricServer;
private static AppFabricProcessorService appFabricProcessorService;
private static PreferencesService preferencesService;
private static ArtifactLocalizerService artifactLocalizerService;
private static AppStateStoreProvider appStateStoreProvider;
Expand Down Expand Up @@ -424,6 +426,8 @@ protected void configure() {
previewRunnerManager.startAndWait();
appFabricServer = injector.getInstance(AppFabricServer.class);
appFabricServer.startAndWait();
appFabricProcessorService = injector.getInstance(AppFabricProcessorService.class);
appFabricProcessorService.startAndWait();
preferencesService = injector.getInstance(PreferencesService.class);

scheduler = injector.getInstance(Scheduler.class);
Expand Down Expand Up @@ -646,6 +650,7 @@ public static void finish() throws Exception {
((Service) messagingService).stopAndWait();
}
appFabricServer.stopAndWait();
appFabricProcessorService.stopAndWait();
}

protected MetricsManager getMetricsManager() {
Expand Down

0 comments on commit 6874823

Please sign in to comment.