From df933e3a3d03696bb0a860521351b12b6eb57e0a Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 16 Jul 2024 13:38:09 -0700 Subject: [PATCH] sync bug fixes from core to the plugin repo Signed-off-by: Chenyang Ji --- .editorconfig | 33 +++++++++ .../core/listener/QueryInsightsListener.java | 4 +- .../core/service/TopQueriesService.java | 11 ++- .../insights/rules/model/Attribute.java | 73 ++++++++++++++++++- .../rules/model/SearchQueryRecord.java | 8 +- .../insights/QueryInsightsTestUtils.java | 25 ++++++- .../listener/QueryInsightsListenerTests.java | 4 +- .../core/service/TopQueriesServiceTests.java | 22 +++++- 8 files changed, 161 insertions(+), 19 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..5be7b263 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,33 @@ +# EditorConfig: http://editorconfig.org/ + +root = true + +[*] +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space + +[*.gradle] +indent_size = 2 + +[*.groovy] +indent_size = 4 + +[*.java] +indent_size = 4 + +[*.json] +indent_size = 2 + +[*.py] +indent_size = 2 + +[*.sh] +indent_size = 2 + +[*.{yml,yaml}] +indent_size = 2 + +[*.{xsd,xml}] +indent_size = 4 diff --git a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 03c4011c..8f186769 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -52,7 +52,7 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener /** * Constructor for QueryInsightsListener * - * @param clusterService The Node's cluster service. + * @param clusterService The Node's cluster service. * @param queryInsightsService The topQueriesByLatencyService associated with this listener */ @Inject @@ -91,7 +91,7 @@ public QueryInsightsListener(final ClusterService clusterService, final QueryIns * and query insights services. * * @param metricType {@link MetricType} - * @param enabled boolean + * @param enabled boolean */ public void setEnableTopQueries(final MetricType metricType, final boolean enabled) { boolean isAllMetricsDisabled = !queryInsightsService.isEnabled(); diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index 0814e667..a6b7b799 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -136,17 +136,15 @@ public int getTopNSize() { * @param size the wanted top N size */ public void validateTopNSize(final int size) { - if (size > QueryInsightsSettings.MAX_N_SIZE) { + if (size < 1 || size > QueryInsightsSettings.MAX_N_SIZE) { throw new IllegalArgumentException( "Top N size setting for [" + metricType + "]" - + " should be smaller than max top N size [" + + " should be between 1 and " + QueryInsightsSettings.MAX_N_SIZE - + "was (" + + ", was (" + size - + " > " - + QueryInsightsSettings.MAX_N_SIZE + ")" ); } @@ -154,6 +152,7 @@ public void validateTopNSize(final int size) { /** * Set enable flag for the service + * * @param enabled boolean */ public void setEnabled(final boolean enabled) { @@ -251,7 +250,7 @@ public void validateExporterConfig(Settings settings) { /** * Get all top queries records that are in the current top n queries store * Optionally include top N records from the last window. - * + *

* By default, return the records in sorted order. * * @param includeLastWindow if the top N queries from the last window should be included diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index 9d341d8f..9d9f3db5 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -8,11 +8,18 @@ package org.opensearch.plugin.insights.rules.model; +import org.apache.lucene.util.ArrayUtil; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Locale; +import java.util.Map; /** * Valid attributes for a search query record @@ -65,7 +72,7 @@ static Attribute readFromStream(final StreamInput in) throws IOException { /** * Write Attribute to a StreamOutput * - * @param out the StreamOutput to write + * @param out the StreamOutput to write * @param attribute the Attribute to write * @throws IOException IOException */ @@ -73,6 +80,70 @@ static void writeTo(final StreamOutput out, final Attribute attribute) throws IO out.writeString(attribute.toString()); } + /** + * Write Attribute value to a StreamOutput + * + * @param out the StreamOutput to write + * @param attributeValue the Attribute value to write + */ + @SuppressWarnings("unchecked") + public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException { + if (attributeValue instanceof List) { + out.writeList((List) attributeValue); + } else { + out.writeGenericValue(attributeValue); + } + } + + /** + * Read attribute value from the input stream given the Attribute type + * + * @param in the {@link StreamInput} input to read + * @param attribute attribute type to differentiate between Source and others + * @return parse value + * @throws IOException IOException + */ + public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException { + if (attribute == Attribute.TASK_RESOURCE_USAGES) { + return in.readList(TaskResourceInfo::readFromStream); + } else { + return in.readGenericValue(); + } + } + + /** + * Read attribute map from the input stream + * + * @param in the {@link StreamInput} to read + * @return parsed attribute map + * @throws IOException IOException + */ + public static Map readAttributeMap(StreamInput in) throws IOException { + int size = readArraySize(in); + if (size == 0) { + return Collections.emptyMap(); + } + Map map = new HashMap<>(size); + + for (int i = 0; i < size; i++) { + Attribute key = readFromStream(in); + Object value = readAttributeValue(in, key); + map.put(key, value); + } + return map; + } + + private static int readArraySize(StreamInput in) throws IOException { + final int arraySize = in.readVInt(); + if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) { + throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize); + } + if (arraySize < 0) { + throw new NegativeArraySizeException("array size must be positive but was: " + arraySize); + } + return arraySize; + } + @Override public String toString() { return this.name().toLowerCase(Locale.ROOT); diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index e241a9e1..4b8d7a41 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -43,7 +43,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce measurements = new HashMap<>(); in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); - this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); + this.attributes = Attribute.readAttributeMap(in); } /** @@ -132,7 +132,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten public void writeTo(final StreamOutput out) throws IOException { out.writeLong(timestamp); out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); - out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); + out.writeMap( + attributes, + (stream, attribute) -> Attribute.writeTo(out, attribute), + (stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue) + ); } /** diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 7fa4e984..9b97e5f3 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -12,6 +12,8 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.util.Maps; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; @@ -80,6 +82,25 @@ public static List generateQueryInsightRecords(int lower, int attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + attributes.put( + Attribute.TASK_RESOURCE_USAGES, + List.of( + new TaskResourceInfo( + randomAlphaOfLengthBetween(5, 10), + randomLongBetween(1, 1000), + randomLongBetween(1, 1000), + randomAlphaOfLengthBetween(5, 10), + new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) + ), + new TaskResourceInfo( + randomAlphaOfLengthBetween(5, 10), + randomLongBetween(1, 1000), + randomLongBetween(1, 1000), + randomAlphaOfLengthBetween(5, 10), + new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) + ) + ) + ); records.add(new SearchQueryRecord(timestamp, measurements, attributes)); timestamp += interval; @@ -163,8 +184,8 @@ public static boolean checkRecordsEquals(List records1, List< return false; } else if (value instanceof Map && !Maps.deepEquals((Map) value, (Map) attributes2.get(attribute))) { - return false; - } + return false; + } } } return true; diff --git a/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index 86de44c6..5c2ea577 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -108,7 +108,7 @@ public void testOnRequestEnd() throws InterruptedException { Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel") ); - String[] indices = new String[] { "index-1", "index-2" }; + String[] indices = new String[]{"index-1", "index-2"}; Map phaseLatencyMap = new HashMap<>(); phaseLatencyMap.put("expand", 0L); @@ -157,7 +157,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel") ); - String[] indices = new String[] { "index-1", "index-2" }; + String[] indices = new String[]{"index-1", "index-2"}; Map phaseLatencyMap = new HashMap<>(); phaseLatencyMap.put("expand", 0L); diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 3efd4c86..396e1822 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -75,12 +75,22 @@ public void testSmallNSize() { } public void testValidateTopNSize() { - assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); }); + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); + }); + } + + public void testValidateNegativeTopNSize() { + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateTopNSize(-1); + }); } public void testGetTopQueriesWhenNotEnabled() { topQueriesService.setEnabled(false); - assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false); }); + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.getTopQueriesRecords(false); + }); } public void testValidateWindowSize() { @@ -90,8 +100,12 @@ public void testValidateWindowSize() { assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MIN_WINDOW_SIZE.getSeconds() - 1, TimeUnit.SECONDS)); }); - assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); }); - assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES)); }); + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); + }); + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES)); + }); } private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministicTaskQueue, long duration) {