Skip to content

Commit

Permalink
sync bug fixes from core to the plugin repo
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jul 12, 2024
1 parent 84e6b75 commit c39a307
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque
private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
SearchTask searchTask = context.getTask();
List<TaskResourceInfo> tasksResourceUsages = searchRequestContext.getPhaseResourceUsage();
if (clusterService.getTaskResourceTrackingService() != null) {
clusterService.getTaskResourceTrackingService().refreshResourceStats(searchTask);
}
tasksResourceUsages.add(
new TaskResourceInfo(
searchTask.getAction(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
/**
* Service responsible for gathering and storing top N queries
* with high latency or resource usage
*
* @opensearch.internal
*/
public class TopQueriesService {
/**
Expand Down Expand Up @@ -95,9 +97,9 @@ public class TopQueriesService {
private QueryInsightsExporter exporter;

TopQueriesService(
final MetricType metricType,
final ThreadPool threadPool,
final QueryInsightsExporterFactory queryInsightsExporterFactory
final MetricType metricType,
final ThreadPool threadPool,
final QueryInsightsExporterFactory queryInsightsExporterFactory
) {
this.enabled = false;
this.metricType = metricType;
Expand Down Expand Up @@ -136,18 +138,16 @@ public int getTopNSize() {
* @param size the wanted top N size
*/
public void validateTopNSize(final int size) {
if (size > QueryInsightsSettings.MAX_N_SIZE) {
if (size < 1 || size > QueryInsightsSettings.MAX_N_SIZE) {
throw new IllegalArgumentException(
"Top N size setting for ["
+ metricType
+ "]"
+ " should be smaller than max top N size ["
+ QueryInsightsSettings.MAX_N_SIZE
+ "was ("
+ size
+ " > "
+ QueryInsightsSettings.MAX_N_SIZE
+ ")"
"Top N size setting for ["
+ metricType
+ "]"
+ " should be between 1 and "
+ QueryInsightsSettings.MAX_N_SIZE
+ ", was ("
+ size
+ ")"
);
}
}
Expand Down Expand Up @@ -178,31 +178,31 @@ public void setWindowSize(final TimeValue windowSize) {
*/
public void validateWindowSize(final TimeValue windowSize) {
if (windowSize.compareTo(QueryInsightsSettings.MAX_WINDOW_SIZE) > 0
|| windowSize.compareTo(QueryInsightsSettings.MIN_WINDOW_SIZE) < 0) {
|| windowSize.compareTo(QueryInsightsSettings.MIN_WINDOW_SIZE) < 0) {
throw new IllegalArgumentException(
"Window size setting for ["
+ metricType
+ "]"
+ " should be between ["
+ QueryInsightsSettings.MIN_WINDOW_SIZE
+ ","
+ QueryInsightsSettings.MAX_WINDOW_SIZE
+ "]"
+ "was ("
+ windowSize
+ ")"
"Window size setting for ["
+ metricType
+ "]"
+ " should be between ["
+ QueryInsightsSettings.MIN_WINDOW_SIZE
+ ","
+ QueryInsightsSettings.MAX_WINDOW_SIZE
+ "]"
+ "was ("
+ windowSize
+ ")"
);
}
if (!(QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES.contains(windowSize) || windowSize.getMinutes() % 60 == 0)) {
throw new IllegalArgumentException(
"Window size setting for ["
+ metricType
+ "]"
+ " should be multiple of 1 hour, or one of "
+ QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES
+ ", was ("
+ windowSize
+ ")"
"Window size setting for ["
+ metricType
+ "]"
+ " should be multiple of 1 hour, or one of "
+ QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES
+ ", was ("
+ windowSize
+ ")"
);
}
}
Expand All @@ -224,8 +224,8 @@ public void setExporter(final Settings settings) {
logger.error("Fail to close the current exporter when updating exporter, error: ", e);
}
this.exporter = queryInsightsExporterFactory.createExporter(
SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)),
settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN)
SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)),
settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN)
);
}
} else {
Expand Down Expand Up @@ -261,7 +261,7 @@ public void validateExporterConfig(Settings settings) {
public List<SearchQueryRecord> getTopQueriesRecords(final boolean includeLastWindow) throws IllegalArgumentException {
if (!enabled) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Cannot get top n queries for [%s] when it is not enabled.", metricType.toString())
String.format(Locale.ROOT, "Cannot get top n queries for [%s] when it is not enabled.", metricType.toString())
);
}
// read from window snapshots
Expand All @@ -270,9 +270,9 @@ public List<SearchQueryRecord> getTopQueriesRecords(final boolean includeLastWin
queries.addAll(topQueriesHistorySnapshot.get());
}
return Stream.of(queries)
.flatMap(Collection::stream)
.sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1)
.collect(Collectors.toList());
.flatMap(Collection::stream)
.sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1)
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -369,4 +369,4 @@ public List<SearchQueryRecord> getTopQueriesCurrentSnapshot() {
public void close() throws IOException {
queryInsightsExporterFactory.closeExporter(this.exporter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@

package org.opensearch.plugin.insights.rules.model;

import org.apache.lucene.util.ArrayUtil;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
* Valid attributes for a search query record
Expand Down Expand Up @@ -73,8 +80,71 @@ static void writeTo(final StreamOutput out, final Attribute attribute) throws IO
out.writeString(attribute.toString());
}

/**
* Write Attribute value to a StreamOutput
* @param out the StreamOutput to write
* @param attributeValue the Attribute value to write
*/
@SuppressWarnings("unchecked")
public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException {
if (attributeValue instanceof List) {
out.writeList((List<? extends Writeable>) attributeValue);
} else {
out.writeGenericValue(attributeValue);
}
}

/**
* Read attribute value from the input stream given the Attribute type
*
* @param in the {@link StreamInput} input to read
* @param attribute attribute type to differentiate between Source and others
* @return parse value
* @throws IOException IOException
*/
public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException {
if (attribute == Attribute.TASK_RESOURCE_USAGES) {
return in.readList(TaskResourceInfo::readFromStream);
} else {
return in.readGenericValue();
}
}

/**
* Read attribute map from the input stream
*
* @param in the {@link StreamInput} to read
* @return parsed attribute map
* @throws IOException IOException
*/
public static Map<Attribute, Object> readAttributeMap(StreamInput in) throws IOException {
int size = readArraySize(in);
if (size == 0) {
return Collections.emptyMap();
}
Map<Attribute, Object> map = new HashMap<>(size);

for (int i = 0; i < size; i++) {
Attribute key = readFromStream(in);
Object value = readAttributeValue(in, key);
map.put(key, value);
}
return map;
}

private static int readArraySize(StreamInput in) throws IOException {
final int arraySize = in.readVInt();
if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) {
throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize);
}
if (arraySize < 0) {
throw new NegativeArraySizeException("array size must be positive but was: " + arraySize);
}
return arraySize;
}

@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce
measurements = new HashMap<>();
in.readMap(MetricType::readFromStream, StreamInput::readGenericValue)
.forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o))));
this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue);
this.attributes = Attribute.readAttributeMap(in);
}

/**
Expand Down Expand Up @@ -132,7 +132,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten
public void writeTo(final StreamOutput out) throws IOException {
out.writeLong(timestamp);
out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue);
out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue);
out.writeMap(
attributes,
(stream, attribute) -> Attribute.writeTo(out, attribute),
(stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.util.Maps;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries;
Expand Down Expand Up @@ -80,6 +82,25 @@ public static List<SearchQueryRecord> generateQueryInsightRecords(int lower, int
attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100));
attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10)));
attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap);
attributes.put(
Attribute.TASK_RESOURCE_USAGES,
List.of(
new TaskResourceInfo(
randomAlphaOfLengthBetween(5, 10),
randomLongBetween(1, 1000),
randomLongBetween(1, 1000),
randomAlphaOfLengthBetween(5, 10),
new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000))
),
new TaskResourceInfo(
randomAlphaOfLengthBetween(5, 10),
randomLongBetween(1, 1000),
randomLongBetween(1, 1000),
randomAlphaOfLengthBetween(5, 10),
new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000))
)
)
);

records.add(new SearchQueryRecord(timestamp, measurements, attributes));
timestamp += interval;
Expand Down Expand Up @@ -202,4 +223,4 @@ public static void registerAllQueryInsightsSettings(ClusterSettings clusterSetti
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase {
private final SearchRequest searchRequest = mock(SearchRequest.class);
private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class);
private final TopQueriesService topQueriesService = mock(TopQueriesService.class);
private final TaskResourceTrackingService taskResourceTrackingService = mock(TaskResourceTrackingService.class);
private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool");
private ClusterService clusterService;

Expand All @@ -77,6 +79,7 @@ public void setup() {
ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("test", true, 1 + randomInt(3), randomInt(2));
clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings);
ClusterServiceUtils.setState(clusterService, state);
clusterService.setTaskResourceTrackingService(taskResourceTrackingService);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);

Expand All @@ -100,12 +103,12 @@ public void testOnRequestEnd() throws InterruptedException {
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(
0,
"n/a",
"n/a",
() -> "test",
TaskId.EMPTY_TASK_ID,
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
0,
"n/a",
"n/a",
() -> "test",
TaskId.EMPTY_TASK_ID,
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
);

String[] indices = new String[] { "index-1", "index-2" };
Expand Down Expand Up @@ -139,6 +142,7 @@ public void testOnRequestEnd() throws InterruptedException {
assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE));
Map<String, String> labels = (Map<String, String>) generatedRecord.getAttributes().get(Attribute.LABELS);
assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID));
verify(taskResourceTrackingService, times(1)).refreshResourceStats(task);
}

public void testConcurrentOnRequestEnd() throws InterruptedException {
Expand All @@ -149,12 +153,12 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(
0,
"n/a",
"n/a",
() -> "test",
TaskId.EMPTY_TASK_ID,
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
0,
"n/a",
"n/a",
() -> "test",
TaskId.EMPTY_TASK_ID,
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
);

String[] indices = new String[] { "index-1", "index-2" };
Expand Down Expand Up @@ -200,6 +204,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
countDownLatch.await();

verify(queryInsightsService, times(numRequests)).addRecord(any());
verify(taskResourceTrackingService, times(numRequests)).refreshResourceStats(task);
}

public void testSetEnabled() {
Expand All @@ -214,4 +219,4 @@ public void testSetEnabled() {
queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, false);
assertFalse(queryInsightsListener.isEnabled());
}
}
}
Loading

0 comments on commit c39a307

Please sign in to comment.