diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 9001716c94cda..a3e7f0c8a5833 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -95,11 +95,13 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private final AtomicReference state; // last applied state + private final String nodeName; + private NodeConnectionsService nodeConnectionsService; private Supplier stateBuilderSupplier; - public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Supplier stateBuilderSupplier) { + public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, + Supplier stateBuilderSupplier) { super(settings); this.clusterSettings = clusterSettings; this.threadPool = threadPool; @@ -107,6 +109,7 @@ public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool); this.stateBuilderSupplier = stateBuilderSupplier; + this.nodeName = nodeName; } public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { @@ -133,7 +136,7 @@ protected synchronized void doStart() { Objects.requireNonNull(state.get(), "please set initial state before starting"); addListener(localNodeMasterListeners); threadPoolExecutor = EsExecutors.newSinglePrioritizing( - nodeName() + "/" + CLUSTER_UPDATE_THREAD_NAME, + nodeName + "/" + CLUSTER_UPDATE_THREAD_NAME, daemonThreadFactory(settings, CLUSTER_UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler()); diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index f9af23b374c8d..de66cbe6cb6a0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.Node; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; @@ -60,12 +61,16 @@ public class ClusterService extends AbstractLifecycleComponent { private final OperationRouting operationRouting; private final ClusterSettings clusterSettings; + + private final String nodeName; + private final Map> initialClusterStateCustoms; public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Map> initialClusterStateCustoms) { super(settings); - this.masterService = new MasterService(settings, threadPool); + this.nodeName = Node.NODE_NAME_SETTING.get(settings); + this.masterService = new MasterService(nodeName, settings, threadPool); this.operationRouting = new OperationRouting(settings, clusterSettings); this.clusterSettings = clusterSettings; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); @@ -74,7 +79,8 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread // Add a no-op update consumer so changes are logged this.clusterSettings.addAffixUpdateConsumer(USER_DEFINED_META_DATA, (first, second) -> {}, (first, second) -> {}); this.initialClusterStateCustoms = initialClusterStateCustoms; - this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder); + this.clusterApplierService = new ClusterApplierService(nodeName, settings, clusterSettings, + threadPool, this::newClusterStateBuilder); } /** @@ -214,6 +220,13 @@ public Settings getSettings() { return settings; } + /** + * The name of this node. + */ + public final String getNodeName() { + return nodeName; + } + /** * Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig, * ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched. diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index d720e9d603fe9..86fc3b54ca6f4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -70,6 +70,8 @@ public class MasterService extends AbstractLifecycleComponent { public static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask"; + private final String nodeName; + private BiConsumer clusterStatePublisher; private java.util.function.Supplier clusterStateSupplier; @@ -81,8 +83,9 @@ public class MasterService extends AbstractLifecycleComponent { private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor; private volatile Batcher taskBatcher; - public MasterService(Settings settings, ThreadPool threadPool) { + public MasterService(String nodeName, Settings settings, ThreadPool threadPool) { super(settings); + this.nodeName = nodeName; // TODO: introduce a dedicated setting for master service this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); this.threadPool = threadPool; @@ -105,7 +108,7 @@ protected synchronized void doStart() { Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting"); Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting"); threadPoolExecutor = EsExecutors.newSinglePrioritizing( - nodeName() + "/" + MASTER_UPDATE_THREAD_NAME, + nodeName + "/" + MASTER_UPDATE_THREAD_NAME, daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler()); diff --git a/server/src/main/java/org/elasticsearch/common/component/AbstractComponent.java b/server/src/main/java/org/elasticsearch/common/component/AbstractComponent.java index 97a8053c1d912..f8fc84e97ec60 100644 --- a/server/src/main/java/org/elasticsearch/common/component/AbstractComponent.java +++ b/server/src/main/java/org/elasticsearch/common/component/AbstractComponent.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; public abstract class AbstractComponent { @@ -36,11 +35,4 @@ public AbstractComponent(Settings settings) { this.deprecationLogger = new DeprecationLogger(logger); this.settings = settings; } - - /** - * Returns the nodes name from the settings or the empty string if not set. - */ - public final String nodeName() { - return Node.NODE_NAME_SETTING.get(settings); - } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 9c86fa17e9b06..bba58de4f7e91 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.KeyedLock; +import org.elasticsearch.node.Node; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; @@ -116,6 +117,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final TimeValue resolveTimeout; + private final String nodeName; + private volatile boolean closed = false; public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, @@ -130,6 +133,7 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); + nodeName = Node.NODE_NAME_SETTING.get(settings); logger.debug( "using concurrent_connects [{}], resolve_timeout [{}]", concurrentConnects, @@ -140,7 +144,7 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); unicastZenPingExecutorService = EsExecutors.newScaling( - nodeName() + "/" + "unicast_connect", + nodeName + "/" + "unicast_connect", 0, concurrentConnects, 60, diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 71264b3cb44e2..5abbbdb990237 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -64,6 +64,7 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.node.Node; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; @@ -206,6 +207,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private volatile Map> requestHandlers = Collections.emptyMap(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); private final BytesReference pingMessage; + private final String nodeName; public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, @@ -219,6 +221,7 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings); this.networkService = networkService; this.transportName = transportName; + this.nodeName = Node.NODE_NAME_SETTING.get(settings); final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings); if (defaultFeatures == null) { this.features = new String[0]; @@ -921,7 +924,7 @@ public void sendErrorResponse( stream.setVersion(nodeVersion); stream.setFeatures(features); RemoteTransportException tx = new RemoteTransportException( - nodeName(), new TransportAddress(channel.getLocalAddress()), action, error); + nodeName, new TransportAddress(channel.getLocalAddress()), action, error); threadPool.getThreadContext().writeTo(stream); stream.writeException(tx); byte status = 0; diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index cbf8a7eda2b3e..5baf502d30362 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -412,7 +412,8 @@ static class TimedClusterApplierService extends ClusterApplierService { public volatile Long currentTimeOverride = null; TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { - super(settings, clusterSettings, threadPool, () -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))); + super("test_node", settings, clusterSettings, threadPool, + () -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 1168e1034fe6c..b95d7b301d8f8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -906,7 +906,7 @@ static class TimedMasterService extends MasterService { public volatile Long currentTimeOverride = null; TimedMasterService(Settings settings, ThreadPool threadPool) { - super(settings, threadPool); + super("test_node", settings, threadPool); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index 8c4076e327d70..efc553354e9e7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -50,7 +50,7 @@ public class ClusterServiceUtils { public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { - MasterService masterService = new MasterService(Settings.EMPTY, threadPool); + MasterService masterService = new MasterService("test_master_node", Settings.EMPTY, threadPool); AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); masterService.setClusterStatePublisher((event, ackListener) -> clusterStateRef.set(event.state())); masterService.setClusterStateSupplier(clusterStateRef::get); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 5297aa77dcd9c..1cf0e1f32a4f9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -368,7 +368,7 @@ public Collection createComponents(Client client, ClusterService cluster return emptyList(); } - Auditor auditor = new Auditor(client, clusterService.nodeName()); + Auditor auditor = new Auditor(client, clusterService.getNodeName()); JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool); JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, client, notifier); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index fc4e9133ec2a3..71dbda3d492c3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -54,7 +54,7 @@ protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener } private void deleteExpiredData(ActionListener listener) { - Auditor auditor = new Auditor(client, clusterService.nodeName()); + Auditor auditor = new Auditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( new ExpiredResultsRemover(client, clusterService, auditor), new ExpiredForecastsRemover(client, threadPool), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 6b871c074619e..c88e4f3d4408e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -112,7 +112,7 @@ void executeProcessUpdates(Iterator updatesIterator) { if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) { assert clusterService.localNode().isMasterNode(); - LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job [" + LOGGER.error("Job update was submitted to non-master node [" + clusterService.getNodeName() + "]; update for job [" + update.getJobId() + "] will be ignored"); executeProcessUpdates(updatesIterator); return;