Skip to content

Commit

Permalink
Consolidate Metrics Query and Metrics Proccessor Services
Browse files Browse the repository at this point in the history
  • Loading branch information
bhardwaj-priyanshu committed Feb 13, 2025
1 parent 7f29f87 commit cd4bef4
Show file tree
Hide file tree
Showing 16 changed files with 14 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ protected void configure() {
servicesNamesBinder.addBinding().toInstance(Constants.Service.APP_FABRIC_HTTP);

// for PingHandler
servicesNamesBinder.addBinding().toInstance(Constants.Service.METRICS_PROCESSOR);
servicesNamesBinder.addBinding().toInstance(Constants.Service.LOGSAVER);
servicesNamesBinder.addBinding().toInstance(Constants.Service.TRANSACTION_HTTP);
servicesNamesBinder.addBinding().toInstance(Constants.Service.RUNTIME);
Expand All @@ -331,7 +330,6 @@ protected void configure() {
handlerHookNamesBinder.addBinding().toInstance(Constants.Service.APP_FABRIC_HTTP);

// for PingHandler
handlerHookNamesBinder.addBinding().toInstance(Constants.Service.METRICS_PROCESSOR);
handlerHookNamesBinder.addBinding().toInstance(Constants.Service.LOGSAVER);
handlerHookNamesBinder.addBinding().toInstance(Constants.Service.TRANSACTION_HTTP);
handlerHookNamesBinder.addBinding().toInstance(Constants.Service.RUNTIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.logging.run.LogSaverServiceManager;
import io.cdap.cdap.messaging.distributed.MessagingServiceManager;
import io.cdap.cdap.metrics.runtime.MetricsProcessorStatusServiceManager;
import io.cdap.cdap.metrics.runtime.MetricsServiceManager;
import io.cdap.http.HttpHandler;
import java.util.Map;
Expand Down Expand Up @@ -115,9 +114,6 @@ private void addNonHadoopBindings(Binder binder) {
.toProvider(new NonHadoopMasterServiceManagerProvider(LogSaverServiceManager.class));
mapBinder.addBinding(Constants.Service.TRANSACTION)
.toProvider(new NonHadoopMasterServiceManagerProvider(TransactionServiceManager.class));
mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR)
.toProvider(
new NonHadoopMasterServiceManagerProvider(MetricsProcessorStatusServiceManager.class));
mapBinder.addBinding(Constants.Service.METRICS)
.toProvider(new NonHadoopMasterServiceManagerProvider(MetricsServiceManager.class));
mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP)
Expand Down Expand Up @@ -152,11 +148,10 @@ private void addHadoopBindings(Binder binder) {
MasterServiceManager.class);
mapBinder.addBinding(Constants.Service.LOGSAVER).to(LogSaverServiceManager.class);
mapBinder.addBinding(Constants.Service.TRANSACTION).to(TransactionServiceManager.class);
mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR)
.to(MetricsProcessorStatusServiceManager.class);
mapBinder.addBinding(Constants.Service.METRICS).to(MetricsServiceManager.class);
mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP).to(AppFabricServiceManager.class);
mapBinder.addBinding(Constants.Service.APP_FABRIC_PROCESSOR).to(AppFabricProcessorManager.class);
mapBinder.addBinding(Constants.Service.APP_FABRIC_PROCESSOR)
.to(AppFabricProcessorManager.class);
mapBinder.addBinding(Constants.Service.DATASET_EXECUTOR)
.to(DatasetExecutorServiceManager.class);
mapBinder.addBinding(Constants.Service.METADATA_SERVICE).to(MetadataServiceManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testSystemServices() throws Exception {
List<SystemServiceMeta> actual = GSON.fromJson(new String(ByteStreams.toByteArray(urlConn.getInputStream()),
Charsets.UTF_8), token);

Assert.assertEquals(10, actual.size());
Assert.assertEquals(9, actual.size());
urlConn.disconnect();
}

Expand All @@ -81,7 +81,7 @@ public void testSystemServicesStatus() throws Exception {

Map<String, String> result = GSON.fromJson(new String(ByteStreams.toByteArray(urlConn.getInputStream()),
Charsets.UTF_8), token);
Assert.assertEquals(10, result.size());
Assert.assertEquals(9, result.size());
urlConn.disconnect();
Assert.assertEquals("OK", result.get(Constants.Service.APP_FABRIC_HTTP));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ public static final class Metrics {
ImmutableMap.of(Constants.Metrics.Tag.NAMESPACE,
NamespaceId.SYSTEM.getNamespace(),
Constants.Metrics.Tag.COMPONENT,
Constants.Service.METRICS_PROCESSOR);
Constants.Service.METRICS);

public static final Map<String, String> TRANSACTION_MANAGER_CONTEXT =
ImmutableMap.of(Constants.Metrics.Tag.NAMESPACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,8 @@ private RouteDestination getV3RoutingService(String[] uriParts, AllowedMethod re
return LOG_SAVER;
case Constants.Service.TRANSACTION:
return TRANSACTION;
case Constants.Service.METRICS_PROCESSOR:
return METRICS_PROCESSOR;
case Constants.Service.METRICS:
case Constants.Service.METRICS_PROCESSOR:
return METRICS;
case Constants.Service.APP_FABRIC_HTTP:
return APP_FABRIC_HTTP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void testSystemServiceStatusPaths() {
assertRouting(String.format("/v3/system/services/%s/status", Constants.Service.TRANSACTION),
RouterPathLookup.TRANSACTION);
assertRouting(String.format("/v3/system/services/%s/status", Constants.Service.METRICS_PROCESSOR),
RouterPathLookup.METRICS_PROCESSOR);
RouterPathLookup.METRICS);
assertRouting(String.format("/v3/system/services/%s/status", Constants.Service.METRICS),
RouterPathLookup.METRICS);
assertRouting(String.format("/v3/system/services/%s/status", Constants.Service.APP_FABRIC_HTTP),
Expand All @@ -394,7 +394,7 @@ public void testSystemServiceStacksPaths() {
assertRouting(String.format("/v3/system/services/%s/stacks", Constants.Service.TRANSACTION),
RouterPathLookup.TRANSACTION);
assertRouting(String.format("/v3/system/services/%s/stacks", Constants.Service.METRICS_PROCESSOR),
RouterPathLookup.METRICS_PROCESSOR);
RouterPathLookup.METRICS);
assertRouting(String.format("/v3/system/services/%s/stacks", Constants.Service.METRICS),
RouterPathLookup.METRICS);
assertRouting(String.format("/v3/system/services/%s/stacks", Constants.Service.APP_FABRIC_HTTP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ public static Set<ServiceResourceKeys> createSystemServicesResourceKeysSet(CConf
Constants.Metrics.NUM_CORES,
Constants.Metrics.NUM_INSTANCES,
Constants.Metrics.MAX_INSTANCES))
.add(new ServiceResourceKeys(cConf,
Constants.Service.METRICS_PROCESSOR,
Constants.MetricsProcessor.MEMORY_MB,
Constants.MetricsProcessor.NUM_CORES,
Constants.MetricsProcessor.NUM_INSTANCES,
Constants.MetricsProcessor.MAX_INSTANCES))
.add(new ServiceResourceKeys(cConf,
Constants.Service.LOGSAVER,
Constants.LogSaver.MEMORY_MB,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected Injector doInit(TwillContext context) {
LoggingContextAccessor.setLoggingContext(
new ServiceLoggingContext(NamespaceId.SYSTEM.getNamespace(),
Constants.Logging.COMPONENT_NAME,
Constants.Service.METRICS_PROCESSOR));
Constants.Service.METRICS));
return injector;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.metrics.guice.MetricsHandlerModule;
import io.cdap.cdap.metrics.guice.MetricsProcessorStatusServiceModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.metrics.process.MessagingMetricsProcessorServiceFactory;
import io.cdap.cdap.metrics.process.MetricsAdminSubscriberService;
import io.cdap.cdap.metrics.process.MetricsProcessorStatusService;
import io.cdap.cdap.metrics.process.loader.MetricsWriterModule;
import io.cdap.cdap.metrics.query.MetricsQueryService;
import io.cdap.cdap.metrics.store.MetricsCleanUpService;
Expand Down Expand Up @@ -75,7 +73,6 @@ protected List<Module> getServiceModules(MasterEnvironment masterEnv,
new SystemDatasetRuntimeModule().getStandaloneModules(),
new MetricsStoreModule(),
new FactoryModuleBuilder().build(MessagingMetricsProcessorServiceFactory.class),
new MetricsProcessorStatusServiceModule(),
new MetricsHandlerModule(),
new DFSLocationModule(),
new MetricsWriterModule()
Expand All @@ -98,7 +95,6 @@ protected void addServices(Injector injector, List<? super Service> services,

services.add(injector.getInstance(MessagingMetricsProcessorServiceFactory.class)
.create(topicNumbers, metricsContext, 0));
services.add(injector.getInstance(MetricsProcessorStatusService.class));
services.add(injector.getInstance(MetricsQueryService.class));
services.add(injector.getInstance(MetricsAdminSubscriberService.class));
services.add(injector.getInstance(MetricsCleanUpService.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ private static List<Module> createPersistentModules(CConfiguration cConf, Config
cConf.set(Constants.Transaction.Container.ADDRESS, localhost);
cConf.set(Constants.Dataset.Executor.ADDRESS, localhost);
cConf.set(Constants.Metrics.ADDRESS, localhost);
cConf.set(Constants.MetricsProcessor.BIND_ADDRESS, localhost);
cConf.set(Constants.LogSaver.ADDRESS, localhost);
cConf.set(Constants.LogQuery.ADDRESS, localhost);
cConf.set(Constants.Metadata.SERVICE_BIND_ADDRESS, localhost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testSupportBundleSystemLogTask() throws Exception {
List<File> systemLogFiles = DirUtils.listFiles(systemLogFolder,
file -> !file.isHidden() && !file.getParentFile().isHidden());

Assert.assertEquals(10, systemLogFiles.size());
Assert.assertEquals(9, systemLogFiles.size());
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,6 @@ private static CConfiguration createCconf(File localDataDir,
cConf.set(Constants.Transaction.Container.ADDRESS, localhost);
cConf.set(Constants.Dataset.Executor.ADDRESS, localhost);
cConf.set(Constants.Metrics.ADDRESS, localhost);
cConf.set(Constants.MetricsProcessor.BIND_ADDRESS, localhost);
cConf.set(Constants.LogSaver.ADDRESS, localhost);
cConf.set(Constants.Security.AUTH_SERVER_BIND_ADDRESS, localhost);
cConf.set(Constants.Metadata.SERVICE_BIND_ADDRESS, localhost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void append(ILoggingEvent eventObject) {
// Don't increment metrics for logs from MetricsProcessor to avoid possibility of infinite loop
if (!(metricsTags.containsKey(Constants.Metrics.Tag.COMPONENT)
&& metricsTags.get(Constants.Metrics.Tag.COMPONENT)
.equals(Constants.Service.METRICS_PROCESSOR))) {
.equals(Constants.Service.METRICS))) {
// todo this is inefficient as childContext implementation creates new map should use metricsCollectionService
MetricsContext childContext = metricsContext.childContext(metricsTags);
childContext.increment(metricName, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public final class LocalMetricsCollectionService extends AggregatedMetricsCollec

private static final ImmutableMap<String, String> METRICS_PROCESSOR_CONTEXT =
ImmutableMap.of(Constants.Metrics.Tag.NAMESPACE, Id.Namespace.SYSTEM.getId(),
Constants.Metrics.Tag.COMPONENT, Constants.Service.METRICS_PROCESSOR);
Constants.Metrics.Tag.COMPONENT, Constants.Service.METRICS);

private final CConfiguration cConf;
private final MetricStore metricStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public MetricsProcessorStatusService(CConfiguration cConf, SConfiguration sConf,
protected void startUp() throws Exception {
LoggingContextAccessor.setLoggingContext(new ServiceLoggingContext(Id.Namespace.SYSTEM.getId(),
Constants.Logging.COMPONENT_NAME,
Constants.Service.METRICS_PROCESSOR));
Constants.Service.METRICS));
LOG.info("Starting MetricsProcessor Status Service...");

httpService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class MetricsProcessorStatusServiceManager extends AbstractMasterServiceM
@Inject
MetricsProcessorStatusServiceManager(CConfiguration cConf, TwillRunner twillRunner,
DiscoveryServiceClient discoveryClient) {
super(cConf, discoveryClient, Constants.Service.METRICS_PROCESSOR, twillRunner);
super(cConf, discoveryClient, Constants.Service.METRICS, twillRunner);
}

@Override
Expand Down

0 comments on commit cd4bef4

Please sign in to comment.