Skip to content

Commit

Permalink
Core: Drop nodeName from AbstractComponent (#34487)
Browse files Browse the repository at this point in the history
`AbstractComponent` is trouble because its name implies that
*everything* should extend from it. It *is* useful, but maybe too
broadly useful. The things it offers access too, the `Settings` instance
for the entire server and a logger are nice to have around, but not
really needed *everywhere*. The `Settings` instance especially adds a
fair bit of ceremony to testing without any value.

This removes the `nodeName` method from `AbstractComponent` so it is
more clear where we actually need the node name.
  • Loading branch information
nik9000 committed Oct 27, 2018
1 parent f1011e1 commit a893697
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,21 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final AtomicReference<ClusterState> state; // last applied state

private final String nodeName;

private NodeConnectionsService nodeConnectionsService;
private Supplier<ClusterState.Builder> stateBuilderSupplier;

public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Supplier<ClusterState
.Builder> stateBuilderSupplier) {
public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Supplier<ClusterState.Builder> stateBuilderSupplier) {
super(settings);
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
this.stateBuilderSupplier = stateBuilderSupplier;
this.nodeName = nodeName;
}

public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand All @@ -133,7 +136,7 @@ protected synchronized void doStart() {
Objects.requireNonNull(state.get(), "please set initial state before starting");
addListener(localNodeMasterListeners);
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
nodeName() + "/" + CLUSTER_UPDATE_THREAD_NAME,
nodeName + "/" + CLUSTER_UPDATE_THREAD_NAME,
daemonThreadFactory(settings, CLUSTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
Expand All @@ -60,12 +61,16 @@ public class ClusterService extends AbstractLifecycleComponent {
private final OperationRouting operationRouting;

private final ClusterSettings clusterSettings;

private final String nodeName;

private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
super(settings);
this.masterService = new MasterService(settings, threadPool);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.masterService = new MasterService(nodeName, settings, threadPool);
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand All @@ -74,7 +79,8 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
// Add a no-op update consumer so changes are logged
this.clusterSettings.addAffixUpdateConsumer(USER_DEFINED_META_DATA, (first, second) -> {}, (first, second) -> {});
this.initialClusterStateCustoms = initialClusterStateCustoms;
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
this.clusterApplierService = new ClusterApplierService(nodeName, settings, clusterSettings,
threadPool, this::newClusterStateBuilder);
}

/**
Expand Down Expand Up @@ -214,6 +220,13 @@ public Settings getSettings() {
return settings;
}

/**
* The name of this node.
*/
public final String getNodeName() {
return nodeName;
}

/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig,
* ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class MasterService extends AbstractLifecycleComponent {

public static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

private final String nodeName;

private BiConsumer<ClusterChangedEvent, Discovery.AckListener> clusterStatePublisher;

private java.util.function.Supplier<ClusterState> clusterStateSupplier;
Expand All @@ -81,8 +83,9 @@ public class MasterService extends AbstractLifecycleComponent {
private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
private volatile Batcher taskBatcher;

public MasterService(Settings settings, ThreadPool threadPool) {
public MasterService(String nodeName, Settings settings, ThreadPool threadPool) {
super(settings);
this.nodeName = nodeName;
// TODO: introduce a dedicated setting for master service
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.threadPool = threadPool;
Expand All @@ -105,7 +108,7 @@ protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting");
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
nodeName() + "/" + MASTER_UPDATE_THREAD_NAME,
nodeName + "/" + MASTER_UPDATE_THREAD_NAME,
daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;

public abstract class AbstractComponent {

Expand All @@ -36,11 +35,4 @@ public AbstractComponent(Settings settings) {
this.deprecationLogger = new DeprecationLogger(logger);
this.settings = settings;
}

/**
* Returns the nodes name from the settings or the empty string if not set.
*/
public final String nodeName() {
return Node.NODE_NAME_SETTING.get(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
Expand Down Expand Up @@ -116,6 +117,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {

private final TimeValue resolveTimeout;

private final String nodeName;

private volatile boolean closed = false;

public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
Expand All @@ -130,6 +133,7 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);

resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
nodeName = Node.NODE_NAME_SETTING.get(settings);
logger.debug(
"using concurrent_connects [{}], resolve_timeout [{}]",
concurrentConnects,
Expand All @@ -140,7 +144,7 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService

final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastZenPingExecutorService = EsExecutors.newScaling(
nodeName() + "/" + "unicast_connect",
nodeName + "/" + "unicast_connect",
0,
concurrentConnects,
60,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -206,6 +207,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private volatile Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers = Collections.emptyMap();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private final BytesReference pingMessage;
private final String nodeName;

public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
Expand All @@ -219,6 +221,7 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
if (defaultFeatures == null) {
this.features = new String[0];
Expand Down Expand Up @@ -921,7 +924,7 @@ public void sendErrorResponse(
stream.setVersion(nodeVersion);
stream.setFeatures(features);
RemoteTransportException tx = new RemoteTransportException(
nodeName(), new TransportAddress(channel.getLocalAddress()), action, error);
nodeName, new TransportAddress(channel.getLocalAddress()), action, error);
threadPool.getThreadContext().writeTo(stream);
stream.writeException(tx);
byte status = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ static class TimedClusterApplierService extends ClusterApplierService {
public volatile Long currentTimeOverride = null;

TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool, () -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)));
super("test_node", settings, clusterSettings, threadPool,
() -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ static class TimedMasterService extends MasterService {
public volatile Long currentTimeOverride = null;

TimedMasterService(Settings settings, ThreadPool threadPool) {
super(settings, threadPool);
super("test_node", settings, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
public class ClusterServiceUtils {

public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) {
MasterService masterService = new MasterService(Settings.EMPTY, threadPool);
MasterService masterService = new MasterService("test_master_node", Settings.EMPTY, threadPool);
AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
masterService.setClusterStatePublisher((event, ackListener) -> clusterStateRef.set(event.state()));
masterService.setClusterStateSupplier(clusterStateRef::get);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
return emptyList();
}

Auditor auditor = new Auditor(client, clusterService.nodeName());
Auditor auditor = new Auditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, client, notifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener
}

private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
Auditor auditor = new Auditor(client, clusterService.nodeName());
Auditor auditor = new Auditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredForecastsRemover(client, threadPool),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void executeProcessUpdates(Iterator<UpdateHolder> updatesIterator) {

if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) {
assert clusterService.localNode().isMasterNode();
LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job ["
LOGGER.error("Job update was submitted to non-master node [" + clusterService.getNodeName() + "]; update for job ["
+ update.getJobId() + "] will be ignored");
executeProcessUpdates(updatesIterator);
return;
Expand Down

0 comments on commit a893697

Please sign in to comment.