From ae37ee12b07dcdd449c51c4390e6aef0e00732a5 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Tue, 28 Jan 2025 16:54:08 +0530 Subject: [PATCH] [CDAP-21118] Task workers should communicate with Spanner Messaging Service only via App fabric --- .../DistributedWorkflowProgramRunnerTest.java | 6 +- .../guice/AppFabricServiceRuntimeModule.java | 14 +++- .../DistributedProgramContainerModule.java | 8 +-- .../app/preview/PreviewManagerModule.java | 19 ++++- .../preview/PreviewRunnerTwillRunnable.java | 4 +- .../app/worker/TaskWorkerTwillRunnable.java | 4 +- .../DatasetOpExecutorServerTwillRunnable.java | 4 +- .../runtime/main/LogSaverTwillRunnable.java | 4 +- .../data/runtime/main/MasterServiceMain.java | 4 +- .../main/MetricsProcessorTwillRunnable.java | 4 +- .../runtime/main/MetricsTwillRunnable.java | 4 +- .../main/TransactionServiceTwillRunnable.java | 4 +- .../cdap/data/tools/JobQueueDebugger.java | 6 +- .../environment/k8s/PreviewServiceMain.java | 5 +- .../k8s/TetheringAgentServiceMain.java | 4 +- .../k8s/MessagingServiceMainTest.java | 6 +- .../k8s/SparkContainerDriverLauncher.java | 4 +- .../java/io/cdap/cdap/StandaloneMain.java | 2 +- ...va => AbstractClientMessagingService.java} | 70 ++++++++----------- .../client/DefaultClientMessagingService.java | 50 +++++++++++++ .../PreviewRunnerClientMessagingService.java | 40 +++++++++++ .../TaskWorkerClientMessagingService.java | 38 ++++++++++ .../guice/MessagingServiceModule.java | 8 +-- .../DefaultMessagingClientModule.java} | 8 +-- .../PreviewRunnerMessagingClientModule.java | 49 +++++++++++++ .../TaskWorkerMessagingClientModule.java | 48 +++++++++++++ .../server/MessagingHttpServiceTest.java | 12 ++-- .../main/java/io/cdap/cdap/test/TestBase.java | 2 +- 28 files changed, 333 insertions(+), 98 deletions(-) rename cdap-tms/src/main/java/io/cdap/cdap/messaging/client/{ClientMessagingService.java => AbstractClientMessagingService.java} (91%) create mode 100644 cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultClientMessagingService.java create mode 100644 cdap-tms/src/main/java/io/cdap/cdap/messaging/client/PreviewRunnerClientMessagingService.java create mode 100644 cdap-tms/src/main/java/io/cdap/cdap/messaging/client/TaskWorkerClientMessagingService.java rename cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/{MessagingClientModule.java => client/DefaultMessagingClientModule.java} (77%) create mode 100644 cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/PreviewRunnerMessagingClientModule.java create mode 100644 cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/TaskWorkerMessagingClientModule.java diff --git a/cdap-app-fabric-tests/src/test/java/io/cdap/cdap/internal/app/runtime/distributed/DistributedWorkflowProgramRunnerTest.java b/cdap-app-fabric-tests/src/test/java/io/cdap/cdap/internal/app/runtime/distributed/DistributedWorkflowProgramRunnerTest.java index b5313cd3406d..997810f1546f 100644 --- a/cdap-app-fabric-tests/src/test/java/io/cdap/cdap/internal/app/runtime/distributed/DistributedWorkflowProgramRunnerTest.java +++ b/cdap-app-fabric-tests/src/test/java/io/cdap/cdap/internal/app/runtime/distributed/DistributedWorkflowProgramRunnerTest.java @@ -25,7 +25,6 @@ import io.cdap.cdap.app.DefaultAppConfigurer; import io.cdap.cdap.app.DefaultApplicationContext; import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule; -import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule.ServiceType; import io.cdap.cdap.app.guice.AuthorizationModule; import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule; import io.cdap.cdap.app.guice.TwillModule; @@ -56,7 +55,7 @@ import io.cdap.cdap.internal.app.runtime.SimpleProgramOptions; import io.cdap.cdap.internal.app.runtime.SystemArguments; import io.cdap.cdap.logging.guice.LocalLogAppenderModule; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.metrics.guice.MetricsStoreModule; import io.cdap.cdap.operations.guice.OperationalStatsModule; @@ -70,7 +69,6 @@ import io.cdap.cdap.security.guice.SecureStoreServerModule; import java.io.IOException; import java.util.Collections; -import java.util.EnumSet; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.twill.api.Configs; @@ -284,7 +282,7 @@ private static ProgramRunnerFactory createProgramRunnerFactory(CConfiguration cC new DataSetsModules().getDistributedModules(), new MetricsClientRuntimeModule().getDistributedModules(), new MetricsStoreModule(), - new MessagingClientModule(), + new DefaultMessagingClientModule(), new AuditModule(), CoreSecurityRuntimeModule.getDistributedModule(cConf), new AuthenticationContextModules().getNoOpModule(), diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index 1c57d5ee9386..37c5d1b10f95 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -47,6 +47,7 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.conf.Constants.AppFabric; +import io.cdap.cdap.common.conf.Constants.MessagingSystem; import io.cdap.cdap.common.conf.Constants.Service; import io.cdap.cdap.common.encryption.guice.DataStorageAeadEncryptionModule; import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider; @@ -114,11 +115,11 @@ import io.cdap.cdap.internal.app.runtime.schedule.store.TriggerMisfireLogger; import io.cdap.cdap.internal.app.runtime.workflow.BasicWorkflowStateWriter; import io.cdap.cdap.internal.app.runtime.workflow.WorkflowStateWriter; +import io.cdap.cdap.internal.app.services.FlowControlService; import io.cdap.cdap.internal.app.services.LocalRunRecordCorrectorService; import io.cdap.cdap.internal.app.services.NoopRunRecordCorrectorService; import io.cdap.cdap.internal.app.services.ProgramLifecycleService; import io.cdap.cdap.internal.app.services.RunRecordCorrectorService; -import io.cdap.cdap.internal.app.services.FlowControlService; import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService; import io.cdap.cdap.internal.app.store.DefaultStore; import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules; @@ -149,6 +150,9 @@ import io.cdap.cdap.internal.tethering.TetheringClientHandler; import io.cdap.cdap.internal.tethering.TetheringHandler; import io.cdap.cdap.internal.tethering.TetheringServerHandler; +import io.cdap.cdap.messaging.server.FetchHandler; +import io.cdap.cdap.messaging.server.MetadataHandler; +import io.cdap.cdap.messaging.server.StoreHandler; import io.cdap.cdap.metadata.LocalPreferencesFetcherInternal; import io.cdap.cdap.metadata.PreferencesFetcher; import io.cdap.cdap.pipeline.PipelineFactory; @@ -532,6 +536,14 @@ protected void configure() { handlerBinder.addBinding().to(CredentialProviderHttpHandlerInternal.class); handlerBinder.addBinding().to(OperationHttpHandler.class); + if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) { + // Add these handlers only if messaging service endpoint doesn't exist and task workers need to + // communicate with messaging service via AppFabric. + handlerBinder.addBinding().to(MetadataHandler.class); + handlerBinder.addBinding().to(StoreHandler.class); + handlerBinder.addBinding().to(FetchHandler.class); + } + FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf); if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) { handlerBinder.addBinding().to(GcpWorkloadIdentityHttpHandler.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/DistributedProgramContainerModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/DistributedProgramContainerModule.java index b81130d6d10f..3a2772a2126d 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/DistributedProgramContainerModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/DistributedProgramContainerModule.java @@ -64,8 +64,8 @@ import io.cdap.cdap.logging.guice.TMSLogAppenderModule; import io.cdap.cdap.master.environment.MasterEnvironments; import io.cdap.cdap.master.spi.environment.MasterEnvironment; -import io.cdap.cdap.messaging.client.ClientMessagingService; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.client.DefaultClientMessagingService; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.messaging.guice.MessagingServiceModule; import io.cdap.cdap.metadata.MetadataReaderWriterModules; import io.cdap.cdap.metadata.PreferencesFetcher; @@ -327,7 +327,7 @@ private Module getMessagingModules() { return new MessagingServiceModule(cConf); } - return new MessagingClientModule(); + return new DefaultMessagingClientModule(); } /** @@ -389,7 +389,7 @@ public ProgramStatePublisher get() { internalAuthenticator); return new MessagingProgramStatePublisher(cConf, - new ClientMessagingService(cConf, remoteClientFactory)); + new DefaultClientMessagingService(cConf, remoteClientFactory)); } } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManagerModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManagerModule.java index 4978597a0334..c4bf1096aafb 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManagerModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManagerModule.java @@ -21,6 +21,8 @@ import com.google.inject.multibindings.Multibinder; import com.google.inject.name.Names; import io.cdap.cdap.app.store.preview.PreviewStore; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.MessagingSystem; import io.cdap.cdap.data.runtime.DataSetsModules; import io.cdap.cdap.data2.datafabric.dataset.RemoteDatasetFramework; import io.cdap.cdap.data2.dataset2.DatasetDefinitionRegistryFactory; @@ -37,6 +39,9 @@ import io.cdap.cdap.internal.app.preview.PreviewDataCleanupService; import io.cdap.cdap.internal.app.preview.PreviewRunStopper; import io.cdap.cdap.internal.app.store.preview.DefaultPreviewStore; +import io.cdap.cdap.messaging.server.FetchHandler; +import io.cdap.cdap.messaging.server.MetadataHandler; +import io.cdap.cdap.messaging.server.StoreHandler; import io.cdap.http.HttpHandler; /** @@ -46,7 +51,10 @@ public class PreviewManagerModule extends PrivateModule { private final boolean distributedRunner; - public PreviewManagerModule(boolean distributedRunner) { + private final CConfiguration cConf; + + public PreviewManagerModule(CConfiguration cConf, boolean distributedRunner) { + this.cConf = cConf; this.distributedRunner = distributedRunner; } @@ -76,6 +84,15 @@ protected void configure() { handlerBinder.addBinding().to(PreviewHttpHandler.class); handlerBinder.addBinding().to(PreviewErrorClassificationHttpHandler.class); handlerBinder.addBinding().to(PreviewHttpHandlerInternal.class); + + if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) { + // Add these handlers only if messaging service endpoint doesn't exist and preview runners need to + // communicate with messaging service via preview manager. + handlerBinder.addBinding().to(MetadataHandler.class); + handlerBinder.addBinding().to(StoreHandler.class); + handlerBinder.addBinding().to(FetchHandler.class); + } + CommonHandlers.add(handlerBinder); bind(PreviewHttpServer.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerTwillRunnable.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerTwillRunnable.java index a8731b20c784..6c5346d8f489 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerTwillRunnable.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerTwillRunnable.java @@ -67,7 +67,7 @@ import io.cdap.cdap.master.environment.MasterEnvironments; import io.cdap.cdap.master.spi.environment.MasterEnvironment; import io.cdap.cdap.master.spi.twill.ExtendedTwillContext; -import io.cdap.cdap.messaging.guice.MessagingServiceModule; +import io.cdap.cdap.messaging.guice.client.PreviewRunnerMessagingClientModule; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.security.auth.context.AuthenticationContextModules; import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule; @@ -237,7 +237,7 @@ protected void configure() { } modules.add(new PreviewRunnerManagerModule().getDistributedModules()); - modules.add(new MessagingServiceModule(cConf)); + modules.add(new PreviewRunnerMessagingClientModule(cConf)); modules.add(new SecureStoreClientModule()); // Needed for InMemoryProgramRunnerModule. We use local metadata reader/publisher to avoid conflicting with // metadata stored in AppFabric. diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerTwillRunnable.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerTwillRunnable.java index 8853bdd7cbd6..6ae74058e12e 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerTwillRunnable.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerTwillRunnable.java @@ -45,7 +45,7 @@ import io.cdap.cdap.logging.guice.RemoteLogAppenderModule; import io.cdap.cdap.master.environment.MasterEnvironments; import io.cdap.cdap.master.spi.environment.MasterEnvironment; -import io.cdap.cdap.messaging.guice.MessagingServiceModule; +import io.cdap.cdap.messaging.guice.client.TaskWorkerMessagingClientModule; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.security.auth.context.AuthenticationContextModules; @@ -95,7 +95,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf) { modules.add(new IOModule()); modules.add(new AuthenticationContextModules().getMasterWorkerModule()); modules.add(coreSecurityModule); - modules.add(new MessagingServiceModule(cConf)); + modules.add(new TaskWorkerMessagingClientModule(cConf)); modules.add(new SystemAppModule()); modules.add(new MetricsClientRuntimeModule().getDistributedModules()); modules.add(new AuditLogWriterModule(cConf).getDistributedModules()); diff --git a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/DatasetOpExecutorServerTwillRunnable.java b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/DatasetOpExecutorServerTwillRunnable.java index 92845e5edfae..c252e9bc06b0 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/DatasetOpExecutorServerTwillRunnable.java +++ b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/DatasetOpExecutorServerTwillRunnable.java @@ -51,7 +51,7 @@ import io.cdap.cdap.internal.metadata.MetadataConsumerSubscriberService; import io.cdap.cdap.logging.appender.LogAppenderInitializer; import io.cdap.cdap.logging.guice.KafkaLogAppenderModule; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.metadata.MetadataService; import io.cdap.cdap.metadata.MetadataServiceModule; import io.cdap.cdap.metadata.MetadataSubscriberService; @@ -111,7 +111,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf, String new ZkClientModule(), new ZkDiscoveryModule(), new KafkaClientModule(), - new MessagingClientModule(), + new DefaultMessagingClientModule(), new MetricsClientRuntimeModule().getDistributedModules(), new DFSLocationModule(), new NamespaceQueryAdminModule(), diff --git a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/LogSaverTwillRunnable.java b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/LogSaverTwillRunnable.java index d65a421e361d..714b5f39a8d5 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/LogSaverTwillRunnable.java +++ b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/LogSaverTwillRunnable.java @@ -42,7 +42,7 @@ import io.cdap.cdap.logging.guice.DistributedLogFrameworkModule; import io.cdap.cdap.logging.guice.KafkaLogAppenderModule; import io.cdap.cdap.logging.service.LogSaverStatusService; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.security.auth.context.AuthenticationContextModules; import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule; @@ -118,7 +118,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf, new AuditModule(), new AuthorizationEnforcementModule().getDistributedModules(), new AuthenticationContextModules().getMasterModule(), - new MessagingClientModule(), + new DefaultMessagingClientModule(), new NoOpAuditLogModule(), new AbstractModule() { @Override diff --git a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MasterServiceMain.java b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MasterServiceMain.java index b86819141e80..f5680f09e4a9 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MasterServiceMain.java +++ b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MasterServiceMain.java @@ -73,7 +73,7 @@ import io.cdap.cdap.logging.appender.LogAppenderInitializer; import io.cdap.cdap.logging.guice.KafkaLogAppenderModule; import io.cdap.cdap.master.startup.ServiceResourceKeys; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.metrics.guice.MetricsStoreModule; import io.cdap.cdap.operations.OperationalStatsService; @@ -538,7 +538,7 @@ protected void configure() { new DataSetsModules().getDistributedModules(), new MetricsClientRuntimeModule().getDistributedModules(), new MetricsStoreModule(), - new MessagingClientModule(), + new DefaultMessagingClientModule(), new AuditModule(), new AuditLogWriterModule(cConf).getDistributedModules(), CoreSecurityRuntimeModule.getDistributedModule(cConf), diff --git a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MetricsProcessorTwillRunnable.java b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MetricsProcessorTwillRunnable.java index f3fb95283c56..7d4a247a7821 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MetricsProcessorTwillRunnable.java +++ b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MetricsProcessorTwillRunnable.java @@ -45,7 +45,7 @@ import io.cdap.cdap.data2.audit.AuditModule; import io.cdap.cdap.logging.appender.LogAppenderInitializer; import io.cdap.cdap.logging.guice.KafkaLogAppenderModule; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.metrics.guice.MetricsProcessorStatusServiceModule; import io.cdap.cdap.metrics.guice.MetricsStoreModule; @@ -121,7 +121,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf, S new ZkClientModule(), new ZkDiscoveryModule(), new KafkaClientModule(), - new MessagingClientModule(), + new DefaultMessagingClientModule(), new MetricsClientRuntimeModule().getDistributedModules(), new MetricsStoreModule(), new KafkaLogAppenderModule(), diff --git a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MetricsTwillRunnable.java b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MetricsTwillRunnable.java index 194f90a32ec6..49c4cf2e9e84 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MetricsTwillRunnable.java +++ b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/MetricsTwillRunnable.java @@ -45,7 +45,7 @@ import io.cdap.cdap.logging.guice.LogQueryRuntimeModule; import io.cdap.cdap.logging.guice.LogReaderRuntimeModules; import io.cdap.cdap.logging.service.LogQueryService; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.metrics.guice.MetricsHandlerModule; import io.cdap.cdap.metrics.guice.MetricsStoreModule; @@ -110,7 +110,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf, new ZkClientModule(), new ZkDiscoveryModule(), new KafkaClientModule(), - new MessagingClientModule(), + new DefaultMessagingClientModule(), new DataFabricModules(txClientId).getDistributedModules(), new DataSetsModules().getDistributedModules(), // For the injection of DatasetDefinition of MetricsTable directly diff --git a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/TransactionServiceTwillRunnable.java b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/TransactionServiceTwillRunnable.java index aa2811716a2e..b772cddbd9c3 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/TransactionServiceTwillRunnable.java +++ b/cdap-master/src/main/java/io/cdap/cdap/data/runtime/main/TransactionServiceTwillRunnable.java @@ -44,7 +44,7 @@ import io.cdap.cdap.data2.audit.AuditModule; import io.cdap.cdap.logging.appender.LogAppenderInitializer; import io.cdap.cdap.logging.guice.KafkaLogAppenderModule; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.security.auth.context.AuthenticationContextModules; @@ -105,7 +105,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf, new ZkClientModule(), new ZkDiscoveryModule(), new KafkaClientModule(), - new MessagingClientModule(), + new DefaultMessagingClientModule(), new DataFabricModules(txClientId).getDistributedModules(), new DataSetsModules().getDistributedModules(), new SystemDatasetRuntimeModule().getDistributedModules(), diff --git a/cdap-master/src/main/java/io/cdap/cdap/data/tools/JobQueueDebugger.java b/cdap-master/src/main/java/io/cdap/cdap/data/tools/JobQueueDebugger.java index b4de03eacfc3..fc5d98e3df13 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/data/tools/JobQueueDebugger.java +++ b/cdap-master/src/main/java/io/cdap/cdap/data/tools/JobQueueDebugger.java @@ -33,7 +33,6 @@ import io.cdap.cdap.api.dataset.lib.CloseableIterator; import io.cdap.cdap.api.schedule.Trigger; import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule; -import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule.ServiceType; import io.cdap.cdap.app.guice.AuthorizationModule; import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule; import io.cdap.cdap.app.guice.TwillModule; @@ -63,7 +62,7 @@ import io.cdap.cdap.internal.schedule.constraint.Constraint; import io.cdap.cdap.logging.guice.KafkaLogAppenderModule; import io.cdap.cdap.messaging.data.MessageId; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.metrics.guice.MetricsStoreModule; import io.cdap.cdap.security.auth.context.AuthenticationContextModules; @@ -74,7 +73,6 @@ import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; import java.io.IOException; -import java.util.EnumSet; import java.util.List; import javax.annotation.Nullable; import org.apache.commons.cli.BasicParser; @@ -361,7 +359,7 @@ private static Injector createInjector() throws Exception { new AuthorizationModule(), new AuthorizationEnforcementModule().getMasterModule(), new SecureStoreServerModule(), - new MessagingClientModule(), + new DefaultMessagingClientModule(), new AbstractModule() { @Override protected void configure() { diff --git a/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/PreviewServiceMain.java b/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/PreviewServiceMain.java index 2f319f2b1c66..c0c3c4628641 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/PreviewServiceMain.java +++ b/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/PreviewServiceMain.java @@ -52,7 +52,6 @@ import io.cdap.cdap.security.guice.SecureStoreClientModule; import java.util.ArrayList; import java.util.Arrays; -import java.util.EnumSet; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -124,9 +123,9 @@ protected void configure() { )); if (cConf.getInt(Constants.Preview.CONTAINER_COUNT) > 0) { - modules.add(new PreviewManagerModule(true)); + modules.add(new PreviewManagerModule(cConf, true)); } else { - modules.add(new PreviewManagerModule(false)); + modules.add(new PreviewManagerModule(cConf, false)); modules.add(new PreviewRunnerManagerModule().getStandaloneModules()); } diff --git a/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/TetheringAgentServiceMain.java b/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/TetheringAgentServiceMain.java index 138d4ad96eac..c98c801ae40b 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/TetheringAgentServiceMain.java +++ b/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/TetheringAgentServiceMain.java @@ -39,7 +39,7 @@ import io.cdap.cdap.logging.gateway.handlers.ProgramRunRecordFetcher; import io.cdap.cdap.master.spi.environment.MasterEnvironment; import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule; import java.util.Arrays; @@ -66,7 +66,7 @@ protected List getServiceModules(MasterEnvironment masterEnv, RemoteAuthenticatorModules.getDefaultModule( TetheringAgentService.REMOTE_TETHERING_AUTHENTICATOR, Constants.Tethering.CLIENT_AUTHENTICATOR_NAME), - new MessagingClientModule(), + new DefaultMessagingClientModule(), new NamespaceQueryAdminModule(), getDataFabricModule(), // Always use local table implementations, which use LevelDB. diff --git a/cdap-master/src/test/java/io/cdap/cdap/master/environment/k8s/MessagingServiceMainTest.java b/cdap-master/src/test/java/io/cdap/cdap/master/environment/k8s/MessagingServiceMainTest.java index 8dcecbf4ef1c..661f01f02d54 100644 --- a/cdap-master/src/test/java/io/cdap/cdap/master/environment/k8s/MessagingServiceMainTest.java +++ b/cdap-master/src/test/java/io/cdap/cdap/master/environment/k8s/MessagingServiceMainTest.java @@ -21,9 +21,9 @@ import io.cdap.cdap.common.internal.remote.RemoteClientFactory; import io.cdap.cdap.messaging.DefaultMessageFetchRequest; import io.cdap.cdap.messaging.DefaultTopicMetadata; -import io.cdap.cdap.messaging.spi.MessagingService; -import io.cdap.cdap.messaging.client.ClientMessagingService; +import io.cdap.cdap.messaging.client.DefaultClientMessagingService; import io.cdap.cdap.messaging.client.StoreRequestBuilder; +import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.messaging.spi.RawMessage; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.id.TopicId; @@ -50,7 +50,7 @@ public void testMessagingService() throws Exception { // Use a separate TMS client to create topic, then publish and then poll some messages TopicId topicId = NamespaceId.SYSTEM.topic("test"); - MessagingService messagingService = new ClientMessagingService(remoteClientFactory, true); + MessagingService messagingService = new DefaultClientMessagingService(remoteClientFactory, true); messagingService.createTopic(new DefaultTopicMetadata(topicId)); // Publish 10 messages diff --git a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/distributed/k8s/SparkContainerDriverLauncher.java b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/distributed/k8s/SparkContainerDriverLauncher.java index fde39602bf95..d468bdc6b969 100644 --- a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/distributed/k8s/SparkContainerDriverLauncher.java +++ b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/distributed/k8s/SparkContainerDriverLauncher.java @@ -53,7 +53,7 @@ import io.cdap.cdap.master.environment.MasterEnvironments; import io.cdap.cdap.master.spi.environment.MasterEnvironment; import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext; -import io.cdap.cdap.messaging.guice.MessagingClientModule; +import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.security.auth.context.AuthenticationContextModules; @@ -259,7 +259,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf, Master modules.add(new IOModule()); modules.add(new AuthenticationContextModules().getMasterWorkerModule()); modules.add(coreSecurityModule); - modules.add(new MessagingClientModule()); + modules.add(new DefaultMessagingClientModule()); modules.add(new MetricsClientRuntimeModule().getDistributedModules()); //Need for guice binding, but No Audit Log action required. modules.add(new NoOpAuditLogModule()); diff --git a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java index 3f52499587bb..227483df956c 100644 --- a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java +++ b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java @@ -561,7 +561,7 @@ private static List createPersistentModules(CConfiguration cConf, Config new NoOpAuditLogModule(), new AuthorizationEnforcementModule().getStandaloneModules(), new PreviewConfigModule(cConf, new Configuration(), SConfiguration.create()), - new PreviewManagerModule(false), + new PreviewManagerModule(cConf, false), new PreviewRunnerManagerModule().getStandaloneModules(), new MessagingServerRuntimeModule().getStandaloneModules(), new AppFabricServiceRuntimeModule(cConf, AppFabricServiceRuntimeModule.ALL_SERVICE_TYPES) diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/AbstractClientMessagingService.java similarity index 91% rename from cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java rename to cdap-tms/src/main/java/io/cdap/cdap/messaging/client/AbstractClientMessagingService.java index 270baa84ce13..be5aebd1ab71 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/AbstractClientMessagingService.java @@ -16,7 +16,6 @@ package io.cdap.cdap.messaging.client; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.io.ByteStreams; @@ -24,27 +23,24 @@ import com.google.common.net.HttpHeaders; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import com.google.inject.Inject; import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.api.dataset.lib.AbstractCloseableIterator; import io.cdap.cdap.api.dataset.lib.CloseableIterator; import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; import io.cdap.cdap.api.messaging.TopicNotFoundException; import io.cdap.cdap.api.service.ServiceUnavailableException; -import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.http.DefaultHttpRequestConfig; import io.cdap.cdap.common.internal.remote.RemoteClient; -import io.cdap.cdap.common.internal.remote.RemoteClientFactory; import io.cdap.cdap.internal.io.ExposedByteArrayOutputStream; +import io.cdap.cdap.messaging.Schemas; import io.cdap.cdap.messaging.spi.MessageFetchRequest; -import io.cdap.cdap.messaging.spi.MessagingServiceContext; import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.messaging.spi.MessagingServiceContext; +import io.cdap.cdap.messaging.spi.RawMessage; import io.cdap.cdap.messaging.spi.RollbackDetail; -import io.cdap.cdap.messaging.Schemas; import io.cdap.cdap.messaging.spi.StoreRequest; import io.cdap.cdap.messaging.spi.TopicMetadata; -import io.cdap.cdap.messaging.spi.RawMessage; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.id.TopicId; import io.cdap.cdap.security.spi.authorization.UnauthorizedException; @@ -87,14 +83,15 @@ import org.apache.tephra.TransactionCodec; /** - * The client implementation of {@link MessagingService}. This client is intended for internal - * higher level API implementation only. - * + * The abstract client implementation of {@link MessagingService}. This client is intended for + * internal higher level API implementation only. + *

* NOTE: This class shouldn't expose to end user (e.g. cdap-client module). */ -public final class ClientMessagingService implements MessagingService { +public abstract class AbstractClientMessagingService implements MessagingService { - private static final HttpRequestConfig HTTP_REQUEST_CONFIG = new DefaultHttpRequestConfig(false); + protected static final HttpRequestConfig HTTP_REQUEST_CONFIG = new DefaultHttpRequestConfig( + false); private static final TransactionCodec TRANSACTION_CODEC = new TransactionCodec(); private static final Gson GSON = new Gson(); // These types for only for Gson to use, hence using the gson TypeToken instead of guava one @@ -106,15 +103,8 @@ public final class ClientMessagingService implements MessagingService { private final RemoteClient remoteClient; private final boolean compressPayload; - @Inject - public ClientMessagingService(CConfiguration cConf, RemoteClientFactory remoteClientFactory) { - this(remoteClientFactory, cConf.getBoolean(Constants.MessagingSystem.HTTP_COMPRESS_PAYLOAD)); - } - - @VisibleForTesting - public ClientMessagingService(RemoteClientFactory remoteClientFactory, boolean compressPayload) { - this.remoteClient = remoteClientFactory.createRemoteClient( - Constants.Service.MESSAGING_SERVICE, HTTP_REQUEST_CONFIG, "/v1/namespaces/"); + public AbstractClientMessagingService(RemoteClient remoteClient, boolean compressPayload) { + this.remoteClient = remoteClient; this.compressPayload = compressPayload; } @@ -133,8 +123,7 @@ public void createTopic(TopicMetadata topicMetadata) TopicId topicId = topicMetadata.getTopicId(); HttpRequest request = remoteClient.requestBuilder(HttpMethod.PUT, createTopicPath(topicId)) - .withBody(GSON.toJson(topicMetadata.getProperties())) - .build(); + .withBody(GSON.toJson(topicMetadata.getProperties())).build(); HttpResponse response = remoteClient.execute(request); if (response.getResponseCode() == HttpURLConnection.HTTP_CONFLICT) { @@ -150,8 +139,7 @@ public void updateTopic(TopicMetadata topicMetadata) HttpRequest request = remoteClient.requestBuilder(HttpMethod.PUT, createTopicPath(topicId) + "/properties") - .withBody(GSON.toJson(topicMetadata.getProperties())) - .build(); + .withBody(GSON.toJson(topicMetadata.getProperties())).build(); HttpResponse response = remoteClient.execute(request); if (response.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND) { @@ -232,32 +220,30 @@ public void storePayload(StoreRequest request) @Override public void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException, UnauthorizedException { - ByteBuffer requestBody = (rollbackDetail instanceof ClientRollbackDetail) - ? ByteBuffer.wrap(((ClientRollbackDetail) rollbackDetail).getEncoded()) + ByteBuffer requestBody = (rollbackDetail instanceof ClientRollbackDetail) ? ByteBuffer.wrap( + ((ClientRollbackDetail) rollbackDetail).getEncoded()) : encodeRollbackDetail(rollbackDetail); HttpRequest httpRequest = remoteClient.requestBuilder(HttpMethod.POST, - createTopicPath(topicId) + "/rollback") - .addHeader(HttpHeaders.CONTENT_TYPE, "avro/binary") - .withBody(requestBody) - .build(); + createTopicPath(topicId) + "/rollback").addHeader(HttpHeaders.CONTENT_TYPE, "avro/binary") + .withBody(requestBody).build(); HttpResponse response = remoteClient.execute(httpRequest); if (response.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new TopicNotFoundException(topicId.getNamespace(), topicId.getTopic()); } - handleError(response, "Failed to rollback message in topic " + topicId - + " with rollback detail " + rollbackDetail); + handleError(response, + "Failed to rollback message in topic " + topicId + " with rollback detail " + + rollbackDetail); } - /** * Makes a request to the server for writing to the messaging system * * @param request contains information about what to write * @param publish {@code true} to make publish call, {@code false} to make store call. * @return the response from the server - * @throws IOException if failed to perform the write operation + * @throws IOException if failed to perform the write operation * @throws TopicNotFoundException if the topic to write to does not exist */ private HttpResponse performWriteRequest(StoreRequest request, @@ -343,8 +329,8 @@ private void handleError(int responseCode, Supplier responseBodySupplier } /** - * Converts the payloads carried by the given {@link StoreRequest} into a {@link List} of {@link - * ByteBuffer}, which is needed by the avro record. + * Converts the payloads carried by the given {@link StoreRequest} into a {@link List} of + * {@link ByteBuffer}, which is needed by the avro record. */ private List convertPayloads(StoreRequest request) { return StreamSupport.stream(request.spliterator(), false).map(ByteBuffer::wrap) @@ -353,9 +339,9 @@ private List convertPayloads(StoreRequest request) { /** * Encodes the given {@link RollbackDetail} as expected by the rollback call. This method is - * rarely used as the call to {@link #rollback(TopicId, RollbackDetail)} expects a {@link - * ClientRollbackDetail} which already contains the encoded bytes. - * + * rarely used as the call to {@link #rollback(TopicId, RollbackDetail)} expects a + * {@link ClientRollbackDetail} which already contains the encoded bytes. + *

* This method looks very similar to the {@code StoreHandler.encodeRollbackDetail} method, but is * intended to have them separated. This is to allow client side classes be moved to separate * module without any dependency on the server side (this can also be done with a util method in a @@ -521,8 +507,8 @@ public void close() { } /** - * Based on the given {@link HttpURLConnection} content encoding, optionally wrap the given {@link - * InputStream} with either gzip or deflate decompression. + * Based on the given {@link HttpURLConnection} content encoding, optionally wrap the given + * {@link InputStream} with either gzip or deflate decompression. */ private InputStream decompressIfNeeded(HttpURLConnection urlConn, InputStream is) throws IOException { diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultClientMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultClientMessagingService.java new file mode 100644 index 000000000000..1d2b75821766 --- /dev/null +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultClientMessagingService.java @@ -0,0 +1,50 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.messaging.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.internal.remote.RemoteClientFactory; +import io.cdap.cdap.messaging.spi.MessagingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The client implementation of {@link MessagingService} that uses Messaging service endpoint. + */ +public class DefaultClientMessagingService extends AbstractClientMessagingService { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultClientMessagingService.class); + + @Inject + public DefaultClientMessagingService(CConfiguration cConf, + RemoteClientFactory remoteClientFactory) { + super(remoteClientFactory.createRemoteClient(Constants.Service.MESSAGING_SERVICE, + HTTP_REQUEST_CONFIG, "/v1/namespaces/"), + cConf.getBoolean(Constants.MessagingSystem.HTTP_COMPRESS_PAYLOAD)); + LOG.info("DefaultClientMessagingService initialised."); + } + + @VisibleForTesting + public DefaultClientMessagingService(RemoteClientFactory remoteClientFactory, + boolean compressPayload) { + super(remoteClientFactory.createRemoteClient(Constants.Service.MESSAGING_SERVICE, + HTTP_REQUEST_CONFIG, "/v1/namespaces/"), compressPayload); + } +} diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/PreviewRunnerClientMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/PreviewRunnerClientMessagingService.java new file mode 100644 index 000000000000..857f4a27a26b --- /dev/null +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/PreviewRunnerClientMessagingService.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.messaging.client; + +import com.google.inject.Inject; +import io.cdap.cdap.common.conf.Constants.Service; +import io.cdap.cdap.common.internal.remote.RemoteClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This helps Preview runner pods to communicate with MessagingService via Preview Manager. + */ +public class PreviewRunnerClientMessagingService extends AbstractClientMessagingService { + + private static final Logger LOG = LoggerFactory.getLogger( + PreviewRunnerClientMessagingService.class); + + @Inject + public PreviewRunnerClientMessagingService(RemoteClientFactory remoteClientFactory) { + // TODO (CDAP-21118) - enable gzip compression for preview http server. + super(remoteClientFactory.createRemoteClient(Service.PREVIEW_HTTP, HTTP_REQUEST_CONFIG, + "/v1/namespaces/"), false); + LOG.info("PreviewRunnerClientMessagingService initialised"); + } +} diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/TaskWorkerClientMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/TaskWorkerClientMessagingService.java new file mode 100644 index 000000000000..401acc44e0e8 --- /dev/null +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/TaskWorkerClientMessagingService.java @@ -0,0 +1,38 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.messaging.client; + +import com.google.inject.Inject; +import io.cdap.cdap.common.conf.Constants.Service; +import io.cdap.cdap.common.internal.remote.RemoteClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This helps Task worker pods to communicate with MessagingService via Preview Manager. + */ +public class TaskWorkerClientMessagingService extends AbstractClientMessagingService { + + private static final Logger LOG = LoggerFactory.getLogger(TaskWorkerClientMessagingService.class); + + @Inject + public TaskWorkerClientMessagingService(RemoteClientFactory remoteClientFactory) { + super(remoteClientFactory.createRemoteClient(Service.APP_FABRIC_HTTP, HTTP_REQUEST_CONFIG, + "/v1/namespaces/"), false); + LOG.info("TaskWorkerClientMessagingService initialised"); + } +} diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java index f358b202eb7e..a7705c6d4896 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java @@ -20,19 +20,19 @@ import com.google.inject.Scopes; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants.MessagingSystem; -import io.cdap.cdap.messaging.client.ClientMessagingService; +import io.cdap.cdap.messaging.client.DefaultClientMessagingService; import io.cdap.cdap.messaging.client.DelegatingMessagingService; import io.cdap.cdap.messaging.spi.MessagingService; /** * The Guice module to bind messaging service based on * {@link MessagingSystem#MESSAGING_SERVICE_NAME} if set in cConf. Otherwise, binds to - * {@link ClientMessagingService}. + * {@link DefaultClientMessagingService}. */ public class MessagingServiceModule extends AbstractModule { private static final String DEFAULT_MESSAGING_SERVICE_NAME = - ClientMessagingService.class.getSimpleName(); + DefaultClientMessagingService.class.getSimpleName(); private final String messagingService; public MessagingServiceModule(CConfiguration cConf) { @@ -44,7 +44,7 @@ public MessagingServiceModule(CConfiguration cConf) { @Override protected void configure() { if (messagingService.equals(DEFAULT_MESSAGING_SERVICE_NAME)) { - bind(MessagingService.class).to(ClientMessagingService.class).in(Scopes.SINGLETON); + bind(MessagingService.class).to(DefaultClientMessagingService.class).in(Scopes.SINGLETON); } else { bind(MessagingService.class).to(DelegatingMessagingService.class).in(Scopes.SINGLETON); } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingClientModule.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/DefaultMessagingClientModule.java similarity index 77% rename from cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingClientModule.java rename to cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/DefaultMessagingClientModule.java index e034d32f0ac8..be0a98433aeb 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingClientModule.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/DefaultMessagingClientModule.java @@ -14,21 +14,21 @@ * the License. */ -package io.cdap.cdap.messaging.guice; +package io.cdap.cdap.messaging.guice.client; import com.google.inject.AbstractModule; import com.google.inject.Scopes; +import io.cdap.cdap.messaging.client.DefaultClientMessagingService; import io.cdap.cdap.messaging.spi.MessagingService; -import io.cdap.cdap.messaging.client.ClientMessagingService; /** * The Guice module to provide binding for messaging system client. This module should only be used * in containers in distributed mode. */ -public class MessagingClientModule extends AbstractModule { +public class DefaultMessagingClientModule extends AbstractModule { @Override protected void configure() { - bind(MessagingService.class).to(ClientMessagingService.class).in(Scopes.SINGLETON); + bind(MessagingService.class).to(DefaultClientMessagingService.class).in(Scopes.SINGLETON); } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/PreviewRunnerMessagingClientModule.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/PreviewRunnerMessagingClientModule.java new file mode 100644 index 000000000000..8d4c2d073750 --- /dev/null +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/PreviewRunnerMessagingClientModule.java @@ -0,0 +1,49 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.messaging.guice.client; + +import com.google.inject.AbstractModule; +import com.google.inject.Scopes; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.MessagingSystem; +import io.cdap.cdap.messaging.client.DefaultClientMessagingService; +import io.cdap.cdap.messaging.client.PreviewRunnerClientMessagingService; +import io.cdap.cdap.messaging.spi.MessagingService; + +/** + * The Guice module to provide binding for messaging system client in preview runners and is based + * on {@link MessagingSystem#MESSAGING_SERVICE_ENABLED}. If set in cConf it binds to + * {@link DefaultClientMessagingService} to prevent redundant network calls. + */ +public class PreviewRunnerMessagingClientModule extends AbstractModule { + + private final boolean messagingServiceEnabled; + + public PreviewRunnerMessagingClientModule(CConfiguration cConf) { + this.messagingServiceEnabled = cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED); + } + + @Override + protected void configure() { + if (messagingServiceEnabled) { + bind(MessagingService.class).to(DefaultClientMessagingService.class).in(Scopes.SINGLETON); + } else { + bind(MessagingService.class).to(PreviewRunnerClientMessagingService.class) + .in(Scopes.SINGLETON); + } + } +} diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/TaskWorkerMessagingClientModule.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/TaskWorkerMessagingClientModule.java new file mode 100644 index 000000000000..3593631109a5 --- /dev/null +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/client/TaskWorkerMessagingClientModule.java @@ -0,0 +1,48 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.messaging.guice.client; + +import com.google.inject.AbstractModule; +import com.google.inject.Scopes; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.MessagingSystem; +import io.cdap.cdap.messaging.client.DefaultClientMessagingService; +import io.cdap.cdap.messaging.client.TaskWorkerClientMessagingService; +import io.cdap.cdap.messaging.spi.MessagingService; + +/** + * The Guice module to provide binding for messaging system client in task workers and is based on + * {@link MessagingSystem#MESSAGING_SERVICE_ENABLED}. If set in cConf it binds to + * {@link DefaultClientMessagingService} to prevent redundant network calls. + */ +public class TaskWorkerMessagingClientModule extends AbstractModule { + + private final boolean messagingServiceEnabled; + + public TaskWorkerMessagingClientModule(CConfiguration cConf) { + this.messagingServiceEnabled = cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED); + } + + @Override + protected void configure() { + if (messagingServiceEnabled) { + bind(MessagingService.class).to(DefaultClientMessagingService.class).in(Scopes.SINGLETON); + } else { + bind(MessagingService.class).to(TaskWorkerClientMessagingService.class).in(Scopes.SINGLETON); + } + } +} diff --git a/cdap-tms/src/test/java/io/cdap/cdap/messaging/server/MessagingHttpServiceTest.java b/cdap-tms/src/test/java/io/cdap/cdap/messaging/server/MessagingHttpServiceTest.java index c9f4681571ce..6feb251479bd 100644 --- a/cdap-tms/src/test/java/io/cdap/cdap/messaging/server/MessagingHttpServiceTest.java +++ b/cdap-tms/src/test/java/io/cdap/cdap/messaging/server/MessagingHttpServiceTest.java @@ -37,16 +37,16 @@ import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; import io.cdap.cdap.messaging.DefaultMessageFetchRequest; import io.cdap.cdap.messaging.DefaultTopicMetadata; +import io.cdap.cdap.messaging.client.DefaultClientMessagingService; +import io.cdap.cdap.messaging.client.StoreRequestBuilder; +import io.cdap.cdap.messaging.data.MessageId; +import io.cdap.cdap.messaging.guice.MessagingServerRuntimeModule; import io.cdap.cdap.messaging.spi.MessageFetchRequest; import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.messaging.spi.RawMessage; import io.cdap.cdap.messaging.spi.RollbackDetail; import io.cdap.cdap.messaging.spi.StoreRequest; import io.cdap.cdap.messaging.spi.TopicMetadata; -import io.cdap.cdap.messaging.client.ClientMessagingService; -import io.cdap.cdap.messaging.client.StoreRequestBuilder; -import io.cdap.cdap.messaging.data.MessageId; -import io.cdap.cdap.messaging.spi.RawMessage; -import io.cdap.cdap.messaging.guice.MessagingServerRuntimeModule; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.id.TopicId; import io.cdap.cdap.security.auth.context.AuthenticationContextModules; @@ -129,7 +129,7 @@ protected void configure() { httpService = injector.getInstance(MessagingHttpService.class); httpService.startAndWait(); - client = new ClientMessagingService(injector.getInstance(RemoteClientFactory.class), compressPayload); + client = new DefaultClientMessagingService(injector.getInstance(RemoteClientFactory.class), compressPayload); } @After diff --git a/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java b/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java index 12dd6555a760..14ed0443de61 100644 --- a/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java +++ b/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java @@ -322,7 +322,7 @@ protected void configure() { new LogReaderRuntimeModules().getInMemoryModules(), new MessagingServerRuntimeModule().getInMemoryModules(), new PreviewConfigModule(cConf, new Configuration(), SConfiguration.create()), - new PreviewManagerModule(false), + new PreviewManagerModule(cConf, false), new PreviewRunnerManagerModule().getInMemoryModules(), new MockProvisionerModule(), new NoOpAuditLogModule(),