Skip to content

Commit

Permalink
Add new query profile collector fields with concurrent search executi…
Browse files Browse the repository at this point in the history
…on (#7898)

* Bringing new query profile collector fields with concurrent search execution

Signed-off-by: Ticheng Lin <[email protected]>

* Update CHANGELOG.md and fix gradle check format violations

Signed-off-by: Ticheng Lin <[email protected]>

* Change query profile collector field data types to primitives and add more tests

Signed-off-by: Ticheng Lin <[email protected]>

---------

Signed-off-by: Ticheng Lin <[email protected]>
Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws authored Jun 8, 2023
1 parent 1803fd9 commit c58dc1a
Show file tree
Hide file tree
Showing 9 changed files with 549 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add min, max, average and thread info to resource stats in tasks API ([#7673](https://github.com/opensearch-project/OpenSearch/pull/7673))
- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321))
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118))
- Add new query profile collector fields with concurrent search execution ([#7898](https://github.com/opensearch-project/OpenSearch/pull/7898))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.search.profile.query;

import org.opensearch.Version;
import org.opensearch.core.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -66,11 +67,17 @@ public class CollectorResult implements ToXContentObject, Writeable {
public static final String REASON_SEARCH_MULTI = "search_multi";
public static final String REASON_AGGREGATION = "aggregation";
public static final String REASON_AGGREGATION_GLOBAL = "aggregation_global";
public static final String COLLECTOR_MANAGER = "CollectorManager";

private static final ParseField NAME = new ParseField("name");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField TIME = new ParseField("time");
private static final ParseField TIME_NANOS = new ParseField("time_in_nanos");
private static final ParseField REDUCE_TIME_NANOS = new ParseField("reduce_time_in_nanos");
private static final ParseField MAX_SLICE_TIME_NANOS = new ParseField("max_slice_time_in_nanos");
private static final ParseField MIN_SLICE_TIME_IN_NANOS = new ParseField("min_slice_time_in_nanos");
private static final ParseField AVG_SLICE_TIME_IN_NANOS = new ParseField("avg_slice_time_in_nanos");
private static final ParseField SLICE_COUNT = new ParseField("slice_count");
private static final ParseField CHILDREN = new ParseField("children");

/**
Expand All @@ -86,17 +93,61 @@ public class CollectorResult implements ToXContentObject, Writeable {
/**
* The total elapsed time for this Collector
*/
private final Long time;
private final long time;

/**
* The total elapsed time in reduce phase for this CollectorManager
*/
private final long reduceTime;

/**
* The maximum slice time for this CollectorManager
*/
private final long maxSliceTime;

/**
* The minimum slice time for this CollectorManager
*/
private final long minSliceTime;

/**
* The average slice time for this CollectorManager
*/
private final long avgSliceTime;

/**
* The segment slice count for this CollectorManager
*/
private final int sliceCount;

/**
* A list of children collectors "embedded" inside this collector
*/
private List<CollectorResult> children;

public CollectorResult(String collectorName, String reason, Long time, List<CollectorResult> children) {
public CollectorResult(String collectorName, String reason, long time, List<CollectorResult> children) {
this(collectorName, reason, time, 0L, time, time, time, 1, children);
}

public CollectorResult(
String collectorName,
String reason,
long time,
long reduceTime,
long maxSliceTime,
long minSliceTime,
long avgSliceTime,
int sliceCount,
List<CollectorResult> children
) {
this.collectorName = collectorName;
this.reason = reason;
this.time = time;
this.reduceTime = reduceTime;
this.maxSliceTime = maxSliceTime;
this.minSliceTime = minSliceTime;
this.avgSliceTime = avgSliceTime;
this.sliceCount = sliceCount;
this.children = children;
}

Expand All @@ -113,6 +164,19 @@ public CollectorResult(StreamInput in) throws IOException {
CollectorResult child = new CollectorResult(in);
this.children.add(child);
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.reduceTime = in.readLong();
this.maxSliceTime = in.readLong();
this.minSliceTime = in.readLong();
this.avgSliceTime = in.readLong();
this.sliceCount = in.readVInt();
} else {
this.reduceTime = 0L;
this.maxSliceTime = this.time;
this.minSliceTime = this.time;
this.avgSliceTime = this.time;
this.sliceCount = 1;
}
}

@Override
Expand All @@ -124,31 +188,73 @@ public void writeTo(StreamOutput out) throws IOException {
for (CollectorResult child : children) {
child.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(reduceTime);
out.writeLong(maxSliceTime);
out.writeLong(minSliceTime);
out.writeLong(avgSliceTime);
out.writeVInt(sliceCount);
}
}

/**
* @return the profiled time for this collector (inclusive of children)
* @return the profiled time for this collector/collector manager (inclusive of children)
*/
public long getTime() {
return this.time;
}

/**
* @return a human readable "hint" about what this collector was used for
* @return the profiled reduce time for this collector manager (inclusive of children)
*/
public long getReduceTime() {
return this.reduceTime;
}

/**
* @return the profiled maximum slice time for this collector manager (inclusive of children)
*/
public long getMaxSliceTime() {
return this.maxSliceTime;
}

/**
* @return the profiled minimum slice time for this collector manager (inclusive of children)
*/
public long getMinSliceTime() {
return this.minSliceTime;
}

/**
* @return the profiled average slice time for this collector manager (inclusive of children)
*/
public long getAvgSliceTime() {
return this.avgSliceTime;
}

/**
* @return the profiled segment slice count for this collector manager (inclusive of children)
*/
public int getSliceCount() {
return this.sliceCount;
}

/**
* @return a human readable "hint" about what this collector/collector manager was used for
*/
public String getReason() {
return this.reason;
}

/**
* @return the lucene class name of the collector
* @return the lucene class name of the collector/collector manager
*/
public String getName() {
return this.collectorName;
}

/**
* @return a list of children collectors
* @return a list of children collectors/collector managers
*/
public List<CollectorResult> getProfiledChildren() {
return children;
Expand All @@ -163,6 +269,13 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.field(TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString());
}
builder.field(TIME_NANOS.getPreferredName(), getTime());
if (getName().contains(COLLECTOR_MANAGER)) {
builder.field(REDUCE_TIME_NANOS.getPreferredName(), getReduceTime());
builder.field(MAX_SLICE_TIME_NANOS.getPreferredName(), getMaxSliceTime());
builder.field(MIN_SLICE_TIME_IN_NANOS.getPreferredName(), getMinSliceTime());
builder.field(AVG_SLICE_TIME_IN_NANOS.getPreferredName(), getAvgSliceTime());
builder.field(SLICE_COUNT.getPreferredName(), getSliceCount());
}

if (!children.isEmpty()) {
builder = builder.startArray(CHILDREN.getPreferredName());
Expand All @@ -181,6 +294,11 @@ public static CollectorResult fromXContent(XContentParser parser) throws IOExcep
String currentFieldName = null;
String name = null, reason = null;
long time = -1;
long reduceTime = -1;
long maxSliceTime = -1;
long minSliceTime = -1;
long avgSliceTime = -1;
int sliceCount = 0;
List<CollectorResult> children = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
Expand All @@ -195,6 +313,16 @@ public static CollectorResult fromXContent(XContentParser parser) throws IOExcep
parser.text();
} else if (TIME_NANOS.match(currentFieldName, parser.getDeprecationHandler())) {
time = parser.longValue();
} else if (REDUCE_TIME_NANOS.match(currentFieldName, parser.getDeprecationHandler())) {
reduceTime = parser.longValue();
} else if (MAX_SLICE_TIME_NANOS.match(currentFieldName, parser.getDeprecationHandler())) {
maxSliceTime = parser.longValue();
} else if (MIN_SLICE_TIME_IN_NANOS.match(currentFieldName, parser.getDeprecationHandler())) {
minSliceTime = parser.longValue();
} else if (AVG_SLICE_TIME_IN_NANOS.match(currentFieldName, parser.getDeprecationHandler())) {
avgSliceTime = parser.longValue();
} else if (SLICE_COUNT.match(currentFieldName, parser.getDeprecationHandler())) {
sliceCount = parser.intValue();
} else {
parser.skipChildren();
}
Expand All @@ -210,6 +338,6 @@ public static CollectorResult fromXContent(XContentParser parser) throws IOExcep
parser.skipChildren();
}
}
return new CollectorResult(name, reason, time, children);
return new CollectorResult(name, reason, time, reduceTime, maxSliceTime, minSliceTime, avgSliceTime, sliceCount, children);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ public long getTime() {
return collector.getTime();
}

/**
* @return the profiled start time for this collector (inclusive of children)
*/
public long getSliceStartTime() {
return collector.getSliceStartTime();
}

/**
* @return a human readable "hint" about what this collector was used for
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public class InternalProfileCollectorManager
private final String reason;
private final List<InternalProfileCollectorManager> children;
private long time = 0;
private long reduceTime = 0;
private long maxSliceEndTime = Long.MIN_VALUE;
private long minSliceStartTime = Long.MAX_VALUE;
private long maxSliceTime = 0;
private long minSliceTime = Long.MAX_VALUE;
private long avgSliceTime = 0;
private int sliceCount = 0;

public InternalProfileCollectorManager(
CollectorManager<? extends Collector, ReduceableSearchResult> manager,
Expand All @@ -50,14 +57,27 @@ public InternalProfileCollector newCollector() throws IOException {
@SuppressWarnings("unchecked")
@Override
public ReduceableSearchResult reduce(Collection<InternalProfileCollector> collectors) throws IOException {
final Collection<Collector> subs = new ArrayList<>();
final long reduceStart = System.nanoTime();
try {
final Collection<Collector> subs = new ArrayList<>();

for (final InternalProfileCollector collector : collectors) {
subs.add(collector.getCollector());
time += collector.getTime();
for (final InternalProfileCollector collector : collectors) {
subs.add(collector.getCollector());
maxSliceEndTime = Math.max(maxSliceEndTime, collector.getSliceStartTime() + collector.getTime());
minSliceStartTime = Math.min(minSliceStartTime, collector.getSliceStartTime());
maxSliceTime = Math.max(maxSliceTime, collector.getTime());
minSliceTime = Math.min(minSliceTime, collector.getTime());
avgSliceTime += collector.getTime();
}
time = maxSliceEndTime - minSliceStartTime;
sliceCount = collectors.size();
avgSliceTime = sliceCount == 0 ? 0 : avgSliceTime / sliceCount;

return ((CollectorManager<Collector, ReduceableSearchResult>) manager).reduce(subs);
} finally {
reduceTime = Math.max(1, System.nanoTime() - reduceStart);
}

return ((CollectorManager<Collector, ReduceableSearchResult>) manager).reduce(subs);
}

@Override
Expand All @@ -70,6 +90,26 @@ public long getTime() {
return time;
}

public long getReduceTime() {
return reduceTime;
}

public long getMaxSliceTime() {
return maxSliceTime;
}

public long getMinSliceTime() {
return minSliceTime;
}

public long getAvgSliceTime() {
return avgSliceTime;
}

public int getSliceCount() {
return sliceCount;
}

@Override
public Collection<? extends InternalProfileComponent> children() {
return children;
Expand All @@ -82,7 +122,26 @@ public String getName() {

@Override
public CollectorResult getCollectorTree() {
return InternalProfileCollector.doGetCollectorTree(this);
return doGetCollectorManagerTree(this);
}

static CollectorResult doGetCollectorManagerTree(InternalProfileCollectorManager collector) {
List<CollectorResult> childResults = new ArrayList<>(collector.children().size());
for (InternalProfileComponent child : collector.children()) {
CollectorResult result = doGetCollectorManagerTree((InternalProfileCollectorManager) child);
childResults.add(result);
}
return new CollectorResult(
collector.getName(),
collector.getReason(),
collector.getTime(),
collector.getReduceTime(),
collector.getMaxSliceTime(),
collector.getMinSliceTime(),
collector.getAvgSliceTime(),
collector.getSliceCount(),
childResults
);
}

@Override
Expand Down
Loading

0 comments on commit c58dc1a

Please sign in to comment.