Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] Add more diagnostic stats to job #35471

Merged
merged 5 commits into from
Nov 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static java.util.Collections.unmodifiableList;
import static java.util.stream.Collectors.joining;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Response from rollup's get jobs api.
Expand All @@ -51,6 +51,12 @@ public class GetRollupJobResponse {
static final ParseField STATE = new ParseField("job_state");
static final ParseField CURRENT_POSITION = new ParseField("current_position");
static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");
static final ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
static final ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
static final ParseField INDEX_TOTAL = new ParseField("index_total");
static final ParseField SEARCH_TOTAL = new ParseField("search_total");
static final ParseField SEARCH_FAILURES = new ParseField("search_failures");
static final ParseField INDEX_FAILURES = new ParseField("index_failures");

private List<JobWrapper> jobs;

Expand Down Expand Up @@ -181,12 +187,25 @@ public static class RollupIndexerJobStats {
private final long numInputDocuments;
private final long numOuputDocuments;
private final long numInvocations;

RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
private long indexTime;
private long indexTotal;
private long searchTime;
private long searchTotal;
private long indexFailures;
private long searchFailures;

RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
long indexTime, long indexTotal, long searchTime, long searchTotal, long indexFailures, long searchFailures) {
this.numPages = numPages;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numInvocations = numInvocations;
this.indexTime = indexTime;
this.indexTotal = indexTotal;
this.searchTime = searchTime;
this.searchTotal = searchTotal;
this.indexFailures = indexFailures;
this.searchFailures = searchFailures;
}

/**
Expand Down Expand Up @@ -217,15 +236,65 @@ public long getOutputDocuments() {
return numOuputDocuments;
}

/**
* Number of failures that have occurred during the bulk indexing phase of Rollup
*/
public long getIndexFailures() {
return indexFailures;
}

/**
* Number of failures that have occurred during the search phase of Rollup
*/
public long getSearchFailures() {
return searchFailures;
}

/**
* Returns the time spent indexing (cumulative) in milliseconds
*/
public long getIndexTime() {
return indexTime;
}

/**
* Returns the time spent searching (cumulative) in milliseconds
*/
public long getSearchTime() {
return searchTime;
}

/**
* Returns the total number of indexing requests that have been sent by the rollup job
* (Note: this is not the number of _documents_ that have been indexed)
*/
public long getIndexTotal() {
return indexTotal;
}

/**
* Returns the total number of search requests that have been sent by the rollup job
*/
public long getSearchTotal() {
return searchTotal;
}

private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
STATS.getPreferredName(),
true,
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}

@Override
Expand All @@ -234,22 +303,35 @@ public boolean equals(Object other) {
if (other == null || getClass() != other.getClass()) return false;
RollupIndexerJobStats that = (RollupIndexerJobStats) other;
return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations);
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.indexTotal, that.indexTotal);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
}

@Override
public final String toString() {
return "{pages=" + numPages
+ ", input_docs=" + numInputDocuments
+ ", output_docs=" + numOuputDocuments
+ ", invocations=" + numInvocations + "}";
+ ", invocations=" + numInvocations
+ ", index_failures=" + indexFailures
+ ", search_failures=" + searchFailures
+ ", index_time_in_ms=" + indexTime
+ ", index_total=" + indexTotal
+ ", search_time_in_ms=" + searchTime
+ ", search_total=" + searchTotal+ "}";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private GetRollupJobResponse createTestInstance() {
}

private RollupIndexerJobStats randomStats() {
return new RollupIndexerJobStats(randomLong(), randomLong(), randomLong(), randomLong());
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
}

private RollupJobStatus randomStatus() {
Expand Down Expand Up @@ -115,6 +117,13 @@ public void toXContent(RollupIndexerJobStats stats, XContentBuilder builder, ToX
builder.field(GetRollupJobResponse.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
builder.field(GetRollupJobResponse.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
builder.field(GetRollupJobResponse.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
builder.field(GetRollupJobResponse.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
builder.field(GetRollupJobResponse.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
builder.field(GetRollupJobResponse.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
builder.field(GetRollupJobResponse.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
builder.field(GetRollupJobResponse.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
builder.field(GetRollupJobResponse.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.endObject();
}

}
24 changes: 21 additions & 3 deletions docs/reference/rollup/apis/get-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"index_failures": 0,
"index_time_in_ms": 0,
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
}
}
]
Expand Down Expand Up @@ -221,7 +227,13 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"index_failures": 0,
"index_time_in_ms": 0,
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
}
},
{
Expand Down Expand Up @@ -270,7 +282,13 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"index_failures": 0,
"index_time_in_ms": 0,
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
}
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
try {
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc)));
stats.markStartSearch();
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing stats.incrementSearchFailures() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ good catch

finishWithFailure(e);
finishWithSearchFailure(e);
}
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
Expand Down Expand Up @@ -256,7 +257,13 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
*/
protected abstract void onAbort();

private void finishWithFailure(Exception exc) {
private void finishWithSearchFailure(Exception exc) {
stats.incrementSearchFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
}

private void finishWithIndexingFailure(Exception exc) {
stats.incrementIndexingFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
}

Expand Down Expand Up @@ -291,6 +298,7 @@ private IndexerState finishAndSetState() {
}

private void onSearchResponse(SearchResponse searchResponse) {
stats.markEndSearch();
try {
if (checkState(getState()) == false) {
return;
Expand Down Expand Up @@ -320,6 +328,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
// TODO this might be a valid case, e.g. if implementation filters
assert bulkRequest.requests().size() > 0;

stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
// resume the failing buckets ?
Expand All @@ -335,24 +344,24 @@ private void onSearchResponse(SearchResponse searchResponse) {
position.set(newPosition);

onBulkResponse(bulkResponse, newPosition);
}, exc -> finishWithFailure(exc)));
}, this::finishWithIndexingFailure));
} catch (Exception e) {
finishWithFailure(e);
finishWithSearchFailure(e);
}
}

private void onBulkResponse(BulkResponse response, JobPosition position) {
stats.markEndIndexing();
try {

ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithFailure);
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
// TODO probably something more intelligent than every-50 is needed
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
doSaveState(IndexerState.INDEXING, position, () -> doNextSearch(buildSearchRequest(), listener));
} else {
doNextSearch(buildSearchRequest(), listener);
}
} catch (Exception e) {
finishWithFailure(e);
finishWithIndexingFailure(e);
}
}

Expand Down
Loading