Skip to content

Commit

Permalink
Add cdap system service for appfabric processor
Browse files Browse the repository at this point in the history
  • Loading branch information
vsethi09 committed Jan 6, 2025
1 parent 90e213d commit 227670b
Show file tree
Hide file tree
Showing 13 changed files with 26 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ protected void configure() {
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);

Multibinder<HttpHandler> handlerBinder = Multibinder.newSetBinder(
binder(), HttpHandler.class, Names.named(Constants.AppFabric.HANDLERS_BINDING));
binder(), HttpHandler.class, Names.named(Constants.AppFabric.SERVER_HANDLERS_BINDING));

CommonHandlers.add(handlerBinder);
handlerBinder.addBinding().to(ConfigHandler.class);
Expand Down Expand Up @@ -499,6 +499,10 @@ protected void configure() {
for (Class<? extends HttpHandler> handlerClass : handlerClasses) {
handlerBinder.addBinding().to(handlerClass);
}

Multibinder<HttpHandler> procesorHandlerBinder = Multibinder.newSetBinder(
binder(), HttpHandler.class, Names.named(AppFabric.PROCESSOR_HANDLERS_BINDING));
CommonHandlers.add(procesorHandlerBinder);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected void configure() {
});

Multibinder<HttpHandler> handlerBinder = Multibinder.newSetBinder(
binder(), HttpHandler.class, Names.named(Constants.AppFabric.HANDLERS_BINDING));
binder(), HttpHandler.class, Names.named(Constants.AppFabric.SERVER_HANDLERS_BINDING));

handlerBinder.addBinding().to(MonitorHandler.class);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2015 Cask Data, Inc.
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand All @@ -16,34 +16,13 @@

package io.cdap.cdap.internal.app.runtime.distributed;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
import io.cdap.cdap.common.conf.Constants.Service;
import io.cdap.cdap.common.twill.AbstractMasterServiceManager;
import io.cdap.cdap.common.twill.MasterServiceManager;
import io.cdap.cdap.common.zookeeper.election.LeaderElectionInfoService;
import io.cdap.cdap.proto.Containers;
import io.cdap.cdap.proto.SystemServiceLiveInfo;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* App Fabric Processor Service Management in Distributed Mode.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2019 Cask Data, Inc.
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2014-2024 Cask Data, Inc.
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -46,6 +46,7 @@
import io.cdap.cdap.security.auth.AuditLogSubscriberService;
import io.cdap.cdap.sourcecontrol.RepositoryCleanupService;
import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner;
import io.cdap.http.HttpHandler;
import io.cdap.http.NettyHttpService;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class AppFabricProcessorService extends AbstractIdleService {
private final boolean sslEnabled;
private CommonNettyHttpServiceFactory commonNettyHttpServiceFactory;
private Cancellable cancelHttpService;
private Set<HttpHandler> handlers;

/**
* Construct the AppFabricProcessorService with service factory and cConf coming from guice injection.
Expand All @@ -98,6 +100,7 @@ public AppFabricProcessorService(CConfiguration cConf,
SConfiguration sConf,
DiscoveryService discoveryService,
@Named(Constants.Service.MASTER_SERVICES_BIND_ADDRESS) InetAddress hostname,
@Named(Constants.AppFabric.SERVER_HANDLERS_BINDING) Set<HttpHandler> handlers,
ProgramRuntimeService programRuntimeService,
RunRecordCorrectorService runRecordCorrectorService,
ProgramRunStatusMonitorService programRunStatusMonitorService,
Expand All @@ -119,6 +122,7 @@ public AppFabricProcessorService(CConfiguration cConf,
ScheduleNotificationSubscriberService scheduleNotificationSubscriberService) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
this.cConf = cConf;
this.sConf = sConf;
this.sslEnabled = cConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED);
Expand Down Expand Up @@ -188,6 +192,7 @@ protected void startUp() throws Exception {
NettyHttpService.Builder httpServiceBuilder = commonNettyHttpServiceFactory
.builder(Constants.Service.APP_FABRIC_HTTP)
.setHost(hostname.getCanonicalHostName())
.setHttpHandlers(handlers)
.setConnectionBacklog(cConf.getInt(Constants.AppFabric.BACKLOG_CONNECTIONS,
Constants.AppFabric.DEFAULT_BACKLOG))
.setExecThreadPoolSize(cConf.getInt(Constants.AppFabric.EXEC_THREADS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class AppFabricServer extends AbstractIdleService {
public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
DiscoveryService discoveryService,
@Named(Constants.Service.MASTER_SERVICES_BIND_ADDRESS) InetAddress hostname,
@Named(Constants.AppFabric.HANDLERS_BINDING) Set<HttpHandler> handlers,
@Named(Constants.AppFabric.SERVER_HANDLERS_BINDING) Set<HttpHandler> handlers,
@Nullable MetricsCollectionService metricsCollectionService,
ProgramRuntimeService programRuntimeService,
ProgramRunStatusMonitorService programRunStatusMonitorService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2014-2018 Cask Data, Inc.
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testSystemServices() throws Exception {
List<SystemServiceMeta> actual = GSON.fromJson(new String(ByteStreams.toByteArray(urlConn.getInputStream()),
Charsets.UTF_8), token);

Assert.assertEquals(9, actual.size());
Assert.assertEquals(10, actual.size());
urlConn.disconnect();
}

Expand All @@ -81,7 +81,7 @@ public void testSystemServicesStatus() throws Exception {

Map<String, String> result = GSON.fromJson(new String(ByteStreams.toByteArray(urlConn.getInputStream()),
Charsets.UTF_8), token);
Assert.assertEquals(9, result.size());
Assert.assertEquals(10, result.size());
urlConn.disconnect();
Assert.assertEquals("OK", result.get(Constants.Service.APP_FABRIC_HTTP));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,8 @@ public static final class AppFabric {
/**
* Guice named bindings.
*/
public static final String HANDLERS_BINDING = "appfabric.http.handler";
public static final String SERVER_HANDLERS_BINDING = "appfabric.http.handler";
public static final String PROCESSOR_HANDLERS_BINDING = "appfabric.processor.http.handler";

/**
* Defaults.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ The status of these CDAP system services can be checked:
* - ``App Fabric``
- ``appfabric``
- Service that handles application fabric requests
* - ``App Fabric Processor``
- ``appfabric.processor``
- Service that handles application lifecycle
* - ``Log Saver``
- ``log.saver``
- Service that aggregates all system and application logs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2024 Cask Data, Inc.
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,5 @@ protected LoggingContext getLoggingContext(EnvironmentOptions options) {
Constants.Logging.COMPONENT_NAME,
Constants.Service.APP_FABRIC_HTTP);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testSupportBundleSystemLogTask() throws Exception {
List<File> systemLogFiles = DirUtils.listFiles(systemLogFolder,
file -> !file.isHidden() && !file.getParentFile().isHidden());

Assert.assertEquals(9, systemLogFiles.size());
Assert.assertEquals(10, systemLogFiles.size());
}

@Before
Expand Down

0 comments on commit 227670b

Please sign in to comment.