Skip to content

Commit

Permalink
Fix RunRecordCorrectorService and AuditLogSubscriberService
Browse files Browse the repository at this point in the history
  • Loading branch information
vsethi09 committed Jan 3, 2025
1 parent 7b0a32b commit 90e213d
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
import io.cdap.cdap.common.conf.Constants.Service;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.guice.RemoteAuthenticatorModules;
import io.cdap.cdap.common.runtime.RuntimeModule;
Expand Down Expand Up @@ -220,6 +221,11 @@ protected void configure() {
Names.named("appfabric.services.names"));
servicesNamesBinder.addBinding().toInstance(Constants.Service.APP_FABRIC_HTTP);

Multibinder<String> processorNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Names.named("appfabric.processor.services.names"));
processorNamesBinder.addBinding().toInstance(Service.APP_FABRIC_PROCESSOR);

// TODO: Uncomment after CDAP-7688 is resolved
// servicesNamesBinder.addBinding().toInstance(Constants.Service.MESSAGING_SERVICE);

Expand Down Expand Up @@ -264,6 +270,11 @@ protected void configure() {
Names.named("appfabric.services.names"));
servicesNamesBinder.addBinding().toInstance(Constants.Service.APP_FABRIC_HTTP);

Multibinder<String> processorNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Names.named("appfabric.processor.services.names"));
processorNamesBinder.addBinding().toInstance(Service.APP_FABRIC_PROCESSOR);

// for PingHandler
servicesNamesBinder.addBinding().toInstance(Constants.Service.METRICS_PROCESSOR);
servicesNamesBinder.addBinding().toInstance(Constants.Service.LOGSAVER);
Expand Down Expand Up @@ -325,6 +336,11 @@ protected void configure() {
servicesNamesBinder.addBinding().toInstance(Constants.Service.APP_FABRIC_HTTP);
servicesNamesBinder.addBinding().toInstance(Constants.Service.SECURE_STORE_SERVICE);

Multibinder<String> processorNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Names.named("appfabric.processor.services.names"));
processorNamesBinder.addBinding().toInstance(Service.APP_FABRIC_PROCESSOR);

Multibinder<String> handlerHookNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Names.named("appfabric.handler.hooks"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import io.cdap.cdap.data2.dataset2.lib.kv.LevelDBKVTableDefinition;
import io.cdap.cdap.gateway.handlers.DatasetServiceStore;
import io.cdap.cdap.gateway.handlers.MonitorHandler;
import io.cdap.cdap.internal.app.runtime.distributed.AppFabricProcessorManager;
import io.cdap.cdap.internal.app.runtime.distributed.AppFabricServiceManager;
import io.cdap.cdap.internal.app.runtime.distributed.TransactionServiceManager;
import io.cdap.cdap.internal.app.runtime.monitor.NonHadoopAppFabricProcessorManager;
import io.cdap.cdap.internal.app.runtime.monitor.NonHadoopAppFabricServiceManager;
import io.cdap.cdap.internal.app.runtime.monitor.RuntimeServiceManager;
import io.cdap.cdap.internal.app.services.AppFabricServer;
Expand Down Expand Up @@ -117,6 +119,9 @@ private void addNonHadoopBindings(Binder binder) {
mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP)
.toProvider(
new NonHadoopMasterServiceManagerProvider(NonHadoopAppFabricServiceManager.class));
mapBinder.addBinding(Constants.Service.APP_FABRIC_PROCESSOR)
.toProvider(
new NonHadoopMasterServiceManagerProvider(NonHadoopAppFabricProcessorManager.class));
mapBinder.addBinding(Constants.Service.DATASET_EXECUTOR)
.toProvider(new NonHadoopMasterServiceManagerProvider(DatasetExecutorServiceManager.class));
mapBinder.addBinding(Constants.Service.METADATA_SERVICE)
Expand Down Expand Up @@ -144,6 +149,7 @@ private void addHadoopBindings(Binder binder) {
.to(MetricsProcessorStatusServiceManager.class);
mapBinder.addBinding(Constants.Service.METRICS).to(MetricsServiceManager.class);
mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP).to(AppFabricServiceManager.class);
mapBinder.addBinding(Constants.Service.APP_FABRIC_PROCESSOR).to(AppFabricProcessorManager.class);
mapBinder.addBinding(Constants.Service.DATASET_EXECUTOR)
.to(DatasetExecutorServiceManager.class);
mapBinder.addBinding(Constants.Service.METADATA_SERVICE).to(MetadataServiceManager.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright © 2015 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.runtime.distributed;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
import io.cdap.cdap.common.conf.Constants.Service;
import io.cdap.cdap.common.twill.AbstractMasterServiceManager;
import io.cdap.cdap.common.twill.MasterServiceManager;
import io.cdap.cdap.common.zookeeper.election.LeaderElectionInfoService;
import io.cdap.cdap.proto.Containers;
import io.cdap.cdap.proto.SystemServiceLiveInfo;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* App Fabric Processor Service Management in Distributed Mode.
*/
public class AppFabricProcessorManager extends AbstractMasterServiceManager {

@Inject
AppFabricProcessorManager(CConfiguration cConf, TwillRunner twillRunner,
DiscoveryServiceClient discoveryClient) {
super(cConf, discoveryClient, Constants.Service.APP_FABRIC_PROCESSOR, twillRunner);
}

@Override
public String getDescription() {
return AppFabric.PROCESSOR_DESCRIPTION;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.runtime.monitor;

import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.twill.AbstractMasterServiceManager;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.discovery.DiscoveryServiceClient;

/**
* Service for managing app fabric processor service.
*/
public class NonHadoopAppFabricProcessorManager extends AbstractMasterServiceManager {

@Inject
NonHadoopAppFabricProcessorManager(CConfiguration cConf, DiscoveryServiceClient discoveryClient,
TwillRunner twillRunner) {
super(cConf, discoveryClient, Constants.Service.APP_FABRIC_PROCESSOR, twillRunner);
}

@Override
public String getDescription() {
return Constants.AppFabric.PROCESSOR_DESCRIPTION;
}
}
Loading

0 comments on commit 90e213d

Please sign in to comment.