Skip to content

Commit

Permalink
[ML] Introduce a setting for the process connect timeout (#43234)
Browse files Browse the repository at this point in the history
This change introduces a new setting,
xpack.ml.process_connect_timeout, to enable
the timeout for one of the external ML processes
to connect to the ES JVM to be increased.

The timeout may need to be increased if many
processes are being started simultaneously on
the same machine. This is unlikely in clusters
with many ML nodes, as we balance the processes
across the ML nodes, but can happen in clusters
with a single ML node and a high value for
xpack.ml.node_concurrent_job_allocations.
  • Loading branch information
droberts195 committed Jun 26, 2019
1 parent 69a1d49 commit a9b9441
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 8 deletions.
8 changes: 8 additions & 0 deletions docs/reference/settings/ml-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,11 @@ cluster and the job is assigned to run on that node.
IMPORTANT: This setting assumes some external process is capable of adding ML nodes
to the cluster. This setting is only useful when used in conjunction with
such an external process.

`xpack.ml.process_connect_timeout` (<<cluster-update-settings,Dynamic>>)::
The connection timeout for {ml} processes that run separately from the {es} JVM.
Defaults to `10s`. Some {ml} processing is done by processes that run separately
to the {es} JVM. When such processes are started they must connect to the {es}
JVM. If such a process does not connect within the time period specified by this
setting then the process is assumed to have failed. Defaults to `10s`. The minimum
value for this setting is `5s`.
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);

public static final Setting<TimeValue> PROCESS_CONNECT_TIMEOUT =
Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10),
TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope);

// Undocumented setting for integration test purposes
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope);
Expand Down Expand Up @@ -332,6 +336,7 @@ public static boolean isMlNode(DiscoveryNode node) {
public List<Setting<?>> getSettings() {
return Collections.unmodifiableList(
Arrays.asList(MachineLearningField.AUTODETECT_PROCESS,
PROCESS_CONNECT_TIMEOUT,
ML_ENABLED,
CONCURRENT_JOB_ALLOCATIONS,
MachineLearningField.MAX_MODEL_MEMORY_LIMIT,
Expand Down Expand Up @@ -448,7 +453,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
nativeController,
client,
clusterService);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService);
} catch (IOException e) {
// The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so
// only log this at the lowest level of detail. It's almost always "file not found" on a named pipe we expect to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
Expand All @@ -36,13 +37,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory

private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);

private final Client client;
private final Environment env;
private final Settings settings;
private final NativeController nativeController;
private final ClusterService clusterService;
private volatile Duration processConnectTimeout;

public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
ClusterService clusterService) {
Expand All @@ -51,6 +52,13 @@ public NativeAutodetectProcessFactory(Environment env, Settings settings, Native
this.nativeController = Objects.requireNonNull(nativeController);
this.client = client;
this.clusterService = clusterService;
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
}

void setProcessConnectTimeout(TimeValue processConnectTimeout) {
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
}

@Override
Expand Down Expand Up @@ -87,8 +95,8 @@ public AutodetectProcess createAutodetectProcess(Job job,
}
}

private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
List<Path> filesToDelete) {
void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
List<Path> filesToDelete) {
try {

Settings updatedSettings = Settings.builder()
Expand All @@ -108,7 +116,7 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
autodetectBuilder.quantiles(autodetectParams.quantiles());
}
autodetectBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
processPipes.connectStreams(processConnectTimeout);
} catch (IOException e) {
String msg = "Failed to launch autodetect for job " + job.getId();
LOGGER.error(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
Expand All @@ -25,14 +28,21 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory

private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);

private final Environment env;
private final NativeController nativeController;
private volatile Duration processConnectTimeout;

public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) {
public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
}

void setProcessConnectTimeout(TimeValue processConnectTimeout) {
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
}

@Override
Expand Down Expand Up @@ -64,7 +74,7 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
processPipes.addArgs(command);
nativeController.startProcess(command);
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
processPipes.connectStreams(processConnectTimeout);
} catch (IOException e) {
String msg = "Failed to launch normalizer for job " + jobId;
LOGGER.error(msg);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;

import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class NativeAutodetectProcessFactoryTests extends ESTestCase {

public void testSetProcessConnectTimeout() throws IOException {

int timeoutSeconds = randomIntBetween(5, 100);

Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
Environment env = TestEnvironment.newEnvironment(settings);
NativeController nativeController = mock(NativeController.class);
Client client = mock(Client.class);
ClusterSettings clusterSettings = new ClusterSettings(settings,
Sets.newHashSet(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC));
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
Job job = mock(Job.class);
when(job.getId()).thenReturn("set_process_connect_test_job");
AutodetectParams autodetectParams = mock(AutodetectParams.class);
ProcessPipes processPipes = mock(ProcessPipes.class);

NativeAutodetectProcessFactory nativeAutodetectProcessFactory =
new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService);
nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds));
nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList());

verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds)));
}
}

0 comments on commit a9b9441

Please sign in to comment.