Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Issue 1787 fixing connector endpoint returns index not found #1885

Merged
merged 1 commit into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ integTest {
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'
}

// Set this to true this if you want to see the logs in the terminal test output.
// note: if left false the log output will still show in your IDE
testLogging.showStandardStreams = true
}

testClusters.integTest {
Expand All @@ -197,7 +201,7 @@ testClusters.integTest {
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
// i.e. we have to use a custom property to flag when we want to debug elasticsearch JVM
// since we also support multi node integration tests we increase debugPort per node
if (System.getProperty("opensearch.debug") != null) {
if (System.getProperty("cluster.debug") != null) {
def debugPort = 5005
nodes.forEach { node ->
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,31 @@
import java.util.Optional;
import java.util.stream.Collectors;

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.ml.common.CommonValue;
import org.opensearch.ml.common.connector.HttpConnector;
import org.opensearch.ml.common.transport.connector.MLConnectorSearchAction;
import org.opensearch.ml.helper.ConnectorAccessControlHelper;
import org.opensearch.ml.utils.RestActionUtils;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import com.google.common.annotations.VisibleForTesting;

import lombok.extern.log4j.Log4j2;

@Log4j2
Expand Down Expand Up @@ -60,7 +66,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
private void search(SearchRequest request, ActionListener<SearchResponse> actionListener) {
User user = RestActionUtils.getUserContext(client);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
ActionListener<SearchResponse> wrappedListener = ActionListener.runBefore(actionListener, () -> context.restore());
ActionListener<SearchResponse> wrappedListener = ActionListener.runBefore(actionListener, context::restore);
List<String> excludes = Optional
.ofNullable(request.source())
.map(SearchSourceBuilder::fetchSource)
Expand All @@ -78,16 +84,43 @@ private void search(SearchRequest request, ActionListener<SearchResponse> action
excludes.toArray(new String[0])
);
request.source().fetchSource(rebuiltFetchSourceContext);

ActionListener<SearchResponse> doubleWrappedListener = ActionListener
.wrap(wrappedListener::onResponse, e -> wrapListenerToHandleConnectorIndexNotFound(e, actionListener));

if (connectorAccessControlHelper.skipConnectorAccessControl(user)) {
client.search(request, wrappedListener);
client.search(request, doubleWrappedListener);
} else {
SearchSourceBuilder sourceBuilder = connectorAccessControlHelper.addUserBackendRolesFilter(user, request.source());
request.source(sourceBuilder);
client.search(request, wrappedListener);
client.search(request, doubleWrappedListener);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
actionListener.onFailure(e);
}
}

@VisibleForTesting
public static void wrapListenerToHandleConnectorIndexNotFound(Exception e, ActionListener<SearchResponse> listener) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
log.debug("Connectors index not created yet, therefore we will swallow the exception and return an empty search result");
final InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
final SearchResponse emptySearchResponse = new SearchResponse(
internalSearchResponse,
null,
0,
0,
0,
0,
null,
new ShardSearchFailure[] {},
SearchResponse.Clusters.EMPTY,
null
);
listener.onResponse(emptySearchResponse);
} else {
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,15 @@ public void testDeleteConnector() throws IOException {
assertEquals("deleted", (String) responseMap.get("result"));
}

public void testSearchConnectors() throws IOException {
public void testSearchConnectors_beforeConnectorCreation() throws IOException {
String searchEntity = "{\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " },\n" + " \"size\": 1000\n" + "}";
Response response = TestHelper
.makeRequest(client(), "GET", "/_plugins/_ml/connectors/_search", null, TestHelper.toHttpEntity(searchEntity), null);
Map responseMap = parseResponseToMap(response);
assertEquals((Double) 0.0, (Double) ((Map) ((Map) responseMap.get("hits")).get("total")).get("value"));
}

public void testSearchConnectors_afterConnectorCreation() throws IOException {
createConnector(completionModelConnectorEntity);
String searchEntity = "{\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " },\n" + " \"size\": 1000\n" + "}";
Response response = TestHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import org.apache.lucene.search.TotalHits;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.OpenSearchWrapperException;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchResponseSections;
Expand All @@ -35,6 +37,8 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.ml.action.connector.SearchConnectorTransportAction;
import org.opensearch.ml.common.transport.connector.MLConnectorSearchAction;
import org.opensearch.ml.utils.TestHelper;
import org.opensearch.rest.RestChannel;
Expand Down Expand Up @@ -190,4 +194,51 @@ public void testPrepareRequest_timeout() throws Exception {
RestResponse restResponse = responseCaptor.getValue();
assertEquals(RestStatus.REQUEST_TIMEOUT, restResponse.status());
}

public void testDoubleWrapper_handleIndexNotFound() {
final IndexNotFoundException indexNotFoundException = new IndexNotFoundException("Index not found", ML_CONNECTOR_INDEX);
final DummyActionListener actionListener = new DummyActionListener();

SearchConnectorTransportAction.wrapListenerToHandleConnectorIndexNotFound(indexNotFoundException, actionListener);
Assert.assertTrue(actionListener.success);
}

public void testDoubleWrapper_handleIndexNotFoundWrappedException() {
final WrappedException wrappedException = new WrappedException();
final DummyActionListener actionListener = new DummyActionListener();

SearchConnectorTransportAction.wrapListenerToHandleConnectorIndexNotFound(wrappedException, actionListener);
Assert.assertTrue(actionListener.success);
}

public void testDoubleWrapper_notRelatedException() {
final RuntimeException exception = new RuntimeException("some random exception");
final DummyActionListener actionListener = new DummyActionListener();

SearchConnectorTransportAction.wrapListenerToHandleConnectorIndexNotFound(exception, actionListener);
Assert.assertFalse(actionListener.success);
}

public class DummyActionListener implements ActionListener<SearchResponse> {
public boolean success = false;

@Override
public void onResponse(SearchResponse searchResponse) {
logger.info("success");
this.success = true;
}

@Override
public void onFailure(Exception e) {
logger.error("failure", e);
this.success = false;
}
}

public static class WrappedException extends Exception implements OpenSearchWrapperException {
@Override
public synchronized Throwable getCause() {
return new IndexNotFoundException("Index not found", ML_CONNECTOR_INDEX);
}
}
}
Loading