diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java index fd629ca879f5..150bfd9e46e2 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java @@ -40,6 +40,7 @@ import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.scheduler.CoreSchedulerService; import io.cdap.cdap.scheduler.ScheduleNotificationSubscriberService; +import io.cdap.cdap.security.auth.AuditLogSubscriberService; import io.cdap.cdap.sourcecontrol.RepositoryCleanupService; import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunner; @@ -65,6 +66,7 @@ public class AppFabricProcessorService extends AbstractIdleService { private final ApplicationLifecycleService applicationLifecycleService; private final ProgramNotificationSubscriberService programNotificationSubscriberService; private final ProgramStopSubscriberService programStopSubscriberService; + private final AuditLogSubscriberService auditLogSubscriberService; private final RunRecordCorrectorService runRecordCorrectorService; private final RunDataTimeToLiveService runDataTimeToLiveService; private final ProgramRunStatusMonitorService programRunStatusMonitorService; @@ -95,6 +97,7 @@ public AppFabricProcessorService(CConfiguration cConf, ApplicationLifecycleService applicationLifecycleService, ProgramNotificationSubscriberService programNotificationSubscriberService, ProgramStopSubscriberService programStopSubscriberService, + AuditLogSubscriberService auditLogSubscriberService, CoreSchedulerService coreSchedulerService, CredentialProviderService credentialProviderService, NamespaceCredentialProviderService namespaceCredentialProviderService, @@ -114,6 +117,7 @@ public AppFabricProcessorService(CConfiguration cConf, this.applicationLifecycleService = applicationLifecycleService; this.programNotificationSubscriberService = programNotificationSubscriberService; this.programStopSubscriberService = programStopSubscriberService; + this.auditLogSubscriberService = auditLogSubscriberService; this.runRecordCorrectorService = runRecordCorrectorService; this.programRunStatusMonitorService = programRunStatusMonitorService; this.coreSchedulerService = coreSchedulerService; @@ -145,6 +149,11 @@ protected void startUp() throws Exception { if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) { futuresList.add(namespaceCredentialProviderService.start()); } + // Only for RBAC instances + if (Feature.DATAPLANE_AUDIT_LOGGING.isEnabled(featureFlagsProvider) + && cConf.getBoolean(Constants.Security.ENABLED)) { + futuresList.add(auditLogSubscriberService.start()); + } futuresList.addAll(ImmutableList.of( provisioningService.start(), applicationLifecycleService.start(), @@ -198,5 +207,6 @@ protected void shutDown() throws Exception { credentialProviderService.stopAndWait(); namespaceCredentialProviderService.stopAndWait(); operationNotificationSubscriberService.stopAndWait(); + auditLogSubscriberService.stopAndWait(); } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java index 5c5494dfa8a6..aed253be7c45 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java @@ -81,8 +81,6 @@ public class AppFabricServer extends AbstractIdleService { private final ApplicationLifecycleService applicationLifecycleService; private final Set servicesNames; private final Set handlerHookNames; - private final RunRecordCorrectorService runRecordCorrectorService; - private final RunDataTimeToLiveService runDataTimeToLiveService; private final ProgramRunStatusMonitorService programRunStatusMonitorService; private final RunRecordMonitorService runRecordCounterService; private final CoreSchedulerService coreSchedulerService; @@ -114,7 +112,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, @Named(Constants.AppFabric.HANDLERS_BINDING) Set handlers, @Nullable MetricsCollectionService metricsCollectionService, ProgramRuntimeService programRuntimeService, - RunRecordCorrectorService runRecordCorrectorService, ProgramRunStatusMonitorService programRunStatusMonitorService, ApplicationLifecycleService applicationLifecycleService, @Named("appfabric.services.names") Set servicesNames, @@ -128,7 +125,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, TransactionRunner transactionRunner, RunRecordMonitorService runRecordCounterService, CommonNettyHttpServiceFactory commonNettyHttpServiceFactory, - RunDataTimeToLiveService runDataTimeToLiveService, SourceControlOperationRunner sourceControlOperationRunner, RepositoryCleanupService repositoryCleanupService) { this.hostname = hostname; @@ -141,7 +137,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, this.servicesNames = servicesNames; this.handlerHookNames = handlerHookNames; this.applicationLifecycleService = applicationLifecycleService; - this.runRecordCorrectorService = runRecordCorrectorService; this.programRunStatusMonitorService = programRunStatusMonitorService; this.sslEnabled = cConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED); this.coreSchedulerService = coreSchedulerService; @@ -152,7 +147,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, this.systemAppManagementService = systemAppManagementService; this.transactionRunner = transactionRunner; this.runRecordCounterService = runRecordCounterService; - this.runDataTimeToLiveService = runDataTimeToLiveService; this.commonNettyHttpServiceFactory = commonNettyHttpServiceFactory; this.sourceControlOperationRunner = sourceControlOperationRunner; this.repositoryCleanupService = repositoryCleanupService; @@ -177,12 +171,10 @@ protected void startUp() throws Exception { applicationLifecycleService.start(), bootstrapService.start(), programRuntimeService.start(), - runRecordCorrectorService.start(), programRunStatusMonitorService.start(), coreSchedulerService.start(), credentialProviderService.start(), runRecordCounterService.start(), - runDataTimeToLiveService.start(), sourceControlOperationRunner.start(), repositoryCleanupService.start() )); @@ -241,11 +233,9 @@ protected void shutDown() throws Exception { cancelHttpService.cancel(); programRuntimeService.stopAndWait(); applicationLifecycleService.stopAndWait(); - runRecordCorrectorService.stopAndWait(); programRunStatusMonitorService.stopAndWait(); provisioningService.stopAndWait(); runRecordCounterService.stopAndWait(); - runDataTimeToLiveService.stopAndWait(); sourceControlOperationRunner.stopAndWait(); repositoryCleanupService.stopAndWait(); credentialProviderService.stopAndWait(); diff --git a/cdap-gateway/src/test/java/io/cdap/cdap/gateway/GatewayTestBase.java b/cdap-gateway/src/test/java/io/cdap/cdap/gateway/GatewayTestBase.java index 42be4597f016..b23aaca4a0ea 100644 --- a/cdap-gateway/src/test/java/io/cdap/cdap/gateway/GatewayTestBase.java +++ b/cdap-gateway/src/test/java/io/cdap/cdap/gateway/GatewayTestBase.java @@ -38,6 +38,7 @@ import io.cdap.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutorService; import io.cdap.cdap.gateway.handlers.log.MockLogReader; import io.cdap.cdap.gateway.router.NettyRouter; +import io.cdap.cdap.internal.app.services.AppFabricProcessorService; import io.cdap.cdap.internal.app.services.AppFabricServer; import io.cdap.cdap.internal.guice.AppFabricTestModule; import io.cdap.cdap.logging.read.LogReader; @@ -106,6 +107,7 @@ public abstract class GatewayTestBase { private static Injector injector; private static AppFabricServer appFabricServer; + private static AppFabricProcessorService appFabricProcessorService; private static NettyRouter router; private static LogQueryService logQueryService; private static MetricsQueryService metricsQueryService; @@ -194,6 +196,8 @@ protected void configure() { datasetService.startAndWait(); appFabricServer = injector.getInstance(AppFabricServer.class); appFabricServer.startAndWait(); + appFabricProcessorService = injector.getInstance(AppFabricProcessorService.class); + appFabricProcessorService.startAndWait(); logQueryService = injector.getInstance(LogQueryService.class); logQueryService.startAndWait(); metricsQueryService = injector.getInstance(MetricsQueryService.class); @@ -217,6 +221,7 @@ public static void stopGateway(CConfiguration conf) throws Exception { namespaceAdmin.delete(new NamespaceId(TEST_NAMESPACE2)); namespaceAdmin.delete(NamespaceId.DEFAULT); appFabricServer.stopAndWait(); + appFabricProcessorService.stopAndWait(); metricsCollectionService.stopAndWait(); metricsQueryService.stopAndWait(); logQueryService.stopAndWait(); diff --git a/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java b/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java index 9958a5aeab85..30a5031647ff 100644 --- a/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java +++ b/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java @@ -86,6 +86,7 @@ import io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService; import io.cdap.cdap.gateway.handlers.AuthorizationHandler; import io.cdap.cdap.internal.app.runtime.AppStateStoreProvider; +import io.cdap.cdap.internal.app.services.AppFabricProcessorService; import io.cdap.cdap.internal.app.services.AppFabricServer; import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService; import io.cdap.cdap.internal.capability.CapabilityConfig; @@ -231,6 +232,7 @@ public class TestBase { private static FieldLineageAdmin fieldLineageAdmin; private static LineageAdmin lineageAdmin; private static AppFabricServer appFabricServer; + private static AppFabricProcessorService appFabricProcessorService; private static PreferencesService preferencesService; private static ArtifactLocalizerService artifactLocalizerService; private static AppStateStoreProvider appStateStoreProvider; @@ -424,6 +426,8 @@ protected void configure() { previewRunnerManager.startAndWait(); appFabricServer = injector.getInstance(AppFabricServer.class); appFabricServer.startAndWait(); + appFabricProcessorService = injector.getInstance(AppFabricProcessorService.class); + appFabricProcessorService.startAndWait(); preferencesService = injector.getInstance(PreferencesService.class); scheduler = injector.getInstance(Scheduler.class); @@ -646,6 +650,7 @@ public static void finish() throws Exception { ((Service) messagingService).stopAndWait(); } appFabricServer.stopAndWait(); + appFabricProcessorService.stopAndWait(); } protected MetricsManager getMetricsManager() {