Skip to content

Commit

Permalink
Use ClientUtil so that we have a consistent place to control index ac…
Browse files Browse the repository at this point in the history
…cess. (opendistro-for-elasticsearch#32)

Testing done:
* all tests passed
* run end-to-end testing to verify basic workflow works including creating detector and generate anomaly score
  • Loading branch information
kaituo authored and wnbts committed Jan 21, 2020
1 parent 5d61f08 commit caebb97
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 23 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

buildscript {
ext {
es_mv = '7.1'
es_mv = '7.2'
es_group = "org.elasticsearch"
es_version = '7.2.1'
es_distribution = 'oss-zip'
Expand Down Expand Up @@ -311,6 +311,8 @@ dependencies {
checkstyle "com.puppycrawl.tools:checkstyle:${project.checkstyle.toolVersion}"
}

compileJava.options.compilerArgs << "-Xlint:-deprecation,-rawtypes,-serial,-try,-unchecked"

apply plugin: 'nebula.ospackage'

// This is afterEvaluate because the bundlePlugin ZIP task is updated afterEvaluate and changes the ZIP name to match the plugin name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import com.amazon.opendistroforelasticsearch.ad.cluster.ADClusterEventListener;
import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData;
import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData.ADMetaDataDiff;
import com.amazon.opendistroforelasticsearch.ad.cluster.DailyCron;
import com.amazon.opendistroforelasticsearch.ad.cluster.DeleteDetector;
import com.amazon.opendistroforelasticsearch.ad.cluster.HashRing;
import com.amazon.opendistroforelasticsearch.ad.cluster.HourlyCron;
import com.amazon.opendistroforelasticsearch.ad.cluster.MasterEventListener;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.dataprocessor.Interpolator;
Expand Down Expand Up @@ -202,7 +200,7 @@ public Collection<Object> createComponents(
NamedWriteableRegistry namedWriteableRegistry
) {
Settings settings = environment.settings();
ClientUtil clientUtil = new ClientUtil(settings);
ClientUtil clientUtil = new ClientUtil(settings, client);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil);
this.clusterService = clusterService;
Expand Down Expand Up @@ -267,8 +265,6 @@ public Collection<Object> createComponents(
anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager);

DeleteDetector deleteUtil = new DeleteDetector(clusterService, clock);
DailyCron dailyCron = new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL);
HourlyCron hourlyCron = new HourlyCron(clusterService, client);

Map<String, ADStat<?>> stats = ImmutableMap
.<String, ADStat<?>>builder()
Expand Down Expand Up @@ -313,11 +309,9 @@ public Collection<Object> createComponents(
runner,
new ADClusterEventListener(clusterService, hashRing, modelManager),
deleteUtil,
dailyCron,
hourlyCron,
adCircuitBreakerService,
adStats,
new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock)
new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
Expand All @@ -41,11 +43,13 @@ public class DailyCron implements Runnable {
private final Clock clock;
private final Client client;
private final Duration checkpointTtl;
private final ClientUtil clientUtil;

public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl) {
public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl, ClientUtil clientUtil) {
this.deleteUtil = deleteUtil;
this.clock = clock;
this.client = client;
this.clientUtil = clientUtil;
this.checkpointTtl = checkpointTtl;
}

Expand All @@ -63,7 +67,7 @@ public void run() {
)
)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
client
clientUtil
.execute(
DeleteByQueryAction.INSTANCE,
deleteRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.threadpool.Scheduler.Cancellable;

import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;

import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -37,20 +38,23 @@ public class MasterEventListener implements LocalNodeMasterListener {
private DeleteDetector deleteUtil;
private Client client;
private Clock clock;
private ClientUtil clientUtil;

public MasterEventListener(
ClusterService clusterService,
ThreadPool threadPool,
DeleteDetector deleteUtil,
Client client,
Clock clock
Clock clock,
ClientUtil clientUtil
) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.deleteUtil = deleteUtil;
this.client = client;
this.clusterService.addLocalNodeMasterListener(this);
this.clock = clock;
this.clientUtil = clientUtil;
}

@Override
Expand All @@ -70,7 +74,7 @@ public void beforeStop() {
if (dailyCron == null) {
dailyCron = threadPool
.scheduleWithFixedDelay(
new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL),
new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil),
TimeValue.timeValueHours(24),
executorName()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -37,14 +41,16 @@

public class ClientUtil {
private volatile TimeValue requestTimeout;
private Client client;

@Inject
public ClientUtil(Settings setting) {
public ClientUtil(Settings setting, Client client) {
this.requestTimeout = REQUEST_TIMEOUT.get(setting);
this.client = client;
}

/**
* Generates a nonblocking request with a timeout. Blocking is not allowed in a
* Send a nonblocking request with a timeout and return response. Blocking is not allowed in a
* transport call context. See BaseFuture.blockingAllowed
* @param request request like index/search/get
* @param LOG log
Expand Down Expand Up @@ -86,4 +92,64 @@ public <Request extends ActionRequest, Response extends ActionResponse> Optional
throw new IllegalStateException(e1);
}
}

/**
* Send an asynchronous request and handle response with the provided listener.
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @param request request body
* @param consumer request method, functional interface to operate as a client request like client::get
* @param listener needed to handle response
*/
public <Request extends ActionRequest, Response extends ActionResponse> void asyncRequest(
Request request,
BiConsumer<Request, ActionListener<Response>> consumer,
ActionListener<Response> listener
) {
consumer
.accept(
request,
ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { listener.onFailure(exception); })
);
}

/**
* Execute a transport action and handle response with the provided listener.
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @param action transport action
* @param request request body
* @param listener needed to handle response
*/
public <Request extends ActionRequest, Response extends ActionResponse> void execute(
Action<Response> action,
Request request,
ActionListener<Response> listener
) {
client
.execute(
action,
request,
ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { listener.onFailure(exception); })
);
}

/**
* Send an synchronous request and handle response with the provided listener.
*
* @deprecated use asyncRequest with listener instead.
*
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @param request request body
* @param function request method, functional interface to operate as a client request like client::get
* @return the response
*/
@Deprecated
public <Request extends ActionRequest, Response extends ActionResponse> Response syncRequest(
Request request,
Function<Request, ActionFuture<Response>> function
) {
return function.apply(request).actionGet(requestTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;

import com.amazon.opendistroforelasticsearch.ad.AbstractADTest;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -59,7 +60,8 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) {
DeleteDetector deleteUtil = mock(DeleteDetector.class);
Clock clock = mock(Clock.class);
Client client = mock(Client.class);
DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24));
ClientUtil clientUtil = mock(ClientUtil.class);
DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24), clientUtil);

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
Expand All @@ -79,7 +81,7 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) {
}

return null;
}).when(client).execute(eq(DeleteByQueryAction.INSTANCE), any(), any());
}).when(clientUtil).execute(eq(DeleteByQueryAction.INSTANCE), any(), any());

doNothing().when(deleteUtil).deleteDetectorResult(eq(client));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;

import com.amazon.opendistroforelasticsearch.ad.AbstractADTest;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleListener;
Expand All @@ -44,6 +45,7 @@ public class MasterEventListenerTests extends AbstractADTest {
private Cancellable hourlyCancellable;
private Cancellable dailyCancellable;
private MasterEventListener masterService;
private ClientUtil clientUtil;

@Override
@Before
Expand All @@ -59,7 +61,8 @@ public void setUp() throws Exception {
deleteUtil = mock(DeleteDetector.class);
client = mock(Client.class);
clock = mock(Clock.class);
masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock);
clientUtil = mock(ClientUtil.class);
masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil);
}

public void testOnOffMaster() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistroforelasticsearch.ad.indices;

import static org.mockito.Mockito.mock;

import com.amazon.opendistroforelasticsearch.ad.TestHelpers;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
Expand All @@ -24,6 +26,7 @@
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -46,6 +49,7 @@ public class AnomalyDetectionIndicesTests extends ESIntegTestCase {
private ClientUtil requestUtil;
private Settings settings;
private ClusterService clusterService;
private Client client;

@Before
public void setup() {
Expand All @@ -65,7 +69,8 @@ public void setup() {
clusterSettings.add(AnomalyDetectorSettings.REQUEST_TIMEOUT);
clusterSetting = new ClusterSettings(settings, clusterSettings);
clusterService = TestHelpers.createClusterService(client().threadPool(), clusterSetting);
requestUtil = new ClientUtil(settings);
client = mock(Client.class);
requestUtil = new ClientUtil(settings, client);
indices = new AnomalyDetectionIndices(client(), clusterService, client().threadPool(), settings, requestUtil);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,15 @@ public void setUp() throws Exception {
.build();
clock = mock(Clock.class);
duration = Duration.ofHours(1);
stateManager = new ADStateManager(client, xContentRegistry(), modelManager, settings, new ClientUtil(settings), clock, duration);
stateManager = new ADStateManager(
client,
xContentRegistry(),
modelManager,
settings,
new ClientUtil(settings, client),
clock,
duration
);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -53,7 +54,8 @@ public class ADStatsTransportActionTests extends ESIntegTestCase {
public void setUp() throws Exception {
super.setUp();

IndexUtils indexUtils = new IndexUtils(client(), new ClientUtil(Settings.EMPTY), clusterService());
Client client = client();
IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client), clusterService());
ModelManager modelManager = mock(ModelManager.class);

clusterStatName1 = "clusterStat1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void setUp() throws Exception {

indexNameResolver = new IndexNameExpressionResolver();

ClientUtil clientUtil = new ClientUtil(Settings.EMPTY);
ClientUtil clientUtil = new ClientUtil(Settings.EMPTY, client);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);

Map<String, ADStat<?>> statsMap = new HashMap<String, ADStat<?>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.util;

import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;
Expand All @@ -27,7 +28,8 @@ public class IndexUtilsTests extends ESIntegTestCase {

@Before
public void setup() {
clientUtil = new ClientUtil(Settings.EMPTY);
Client client = client();
clientUtil = new ClientUtil(Settings.EMPTY, client);
}

@Test
Expand Down

0 comments on commit caebb97

Please sign in to comment.