Skip to content

Commit

Permalink
[Dataset quality] using msearch to speed up degradedDocs query (elast…
Browse files Browse the repository at this point in the history
…ic#183023)

Relates to elastic#179227.

After gathering some numbers around possible tweaks of the current
degradedDocs query ([more
information](elastic#179227 (comment))),
I decided to move forward and split the query to reduce the time taken
by elastic search aggregating on data streams.

This PR contains the following changes:
- `mSearch` method was added to `DatasetQualityESClient` to allow the
usage of [multi
search](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html).
- `degradedDocsRt` was changed to now include not only the amount of
degradedDocs but also the total docs for the datastreams within the
timerange selected

Nothing visible has changed in terms of functionality



https://github.com/elastic/kibana/assets/1313018/1d836446-7d35-4ff2-8356-16a8087ab505
  • Loading branch information
yngrdyn authored May 9, 2024
1 parent 6004cad commit f82d640
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ export const getIntegrationsResponseRt = rt.exact(

export const degradedDocsRt = rt.type({
dataset: rt.string,
percentage: rt.number,
count: rt.number,
totalDocs: rt.number,
percentage: rt.number,
});

export type DegradedDocs = rt.TypeOf<typeof degradedDocsRt>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@ import {
} from '../../../common/es_fields';
import { createDatasetQualityESClient, wildcardQuery } from '../../utils';

interface ResultBucket {
dataset: string;
count: number;
}

export async function getDegradedDocsPaginated(options: {
esClient: ElasticsearchClient;
type?: DataStreamType;
start?: number;
end?: number;
start: number;
end: number;
datasetQuery?: string;
after?: {
dataset: string;
namespace: string;
degradedDocs?: { dataset: string; namespace: string };
totalDocs?: { dataset: string; namespace: string };
};
prevResults?: DegradedDocs[];
prevResults?: { degradedDocs: ResultBucket[]; totalDocs: ResultBucket[] };
}): Promise<DegradedDocs[]> {
const {
esClient,
Expand All @@ -37,74 +42,122 @@ export async function getDegradedDocsPaginated(options: {
start,
end,
after,
prevResults = [],
prevResults = { degradedDocs: [], totalDocs: [] },
} = options;

const datasetQualityESClient = createDatasetQualityESClient(esClient);

const response = await datasetQualityESClient.search({
index: '*',
size: 0,
query: {
bool: {
...(datasetQuery
? {
should: [
...wildcardQuery(DATA_STREAM_DATASET, datasetQuery),
...wildcardQuery(DATA_STREAM_NAMESPACE, datasetQuery),
],
minimum_should_match: 1,
}
: {}),
filter: [...rangeQuery(start, end), ...termQuery(DATA_STREAM_TYPE, type)],
const datasetFilter = {
...(datasetQuery
? {
should: [
...wildcardQuery(DATA_STREAM_DATASET, datasetQuery),
...wildcardQuery(DATA_STREAM_NAMESPACE, datasetQuery),
],
minimum_should_match: 1,
}
: {}),
};

const otherFilters = [...rangeQuery(start, end), ...termQuery(DATA_STREAM_TYPE, type)];

const aggs = (afterKey?: { dataset: string; namespace: string }) => ({
datasets: {
composite: {
...(afterKey ? { after: afterKey } : {}),
size: 10000,
sources: [
{ dataset: { terms: { field: 'data_stream.dataset' } } },
{ namespace: { terms: { field: 'data_stream.namespace' } } },
],
},
},
aggs: {
datasets: {
composite: {
...(after ? { after } : {}),
size: 10000,
sources: [
{ dataset: { terms: { field: DATA_STREAM_DATASET } } },
{ namespace: { terms: { field: DATA_STREAM_NAMESPACE } } },
],
});

const response = await datasetQualityESClient.msearch({ index: `${type}-*` }, [
// degraded docs per dataset
{
size: 0,
query: {
bool: {
...datasetFilter,
filter: otherFilters,
must: { exists: { field: _IGNORED } },
},
aggs: {
degraded: {
filter: {
exists: {
field: _IGNORED,
},
},
},
},
aggs: aggs(after?.degradedDocs),
},
// total docs per dataset
{
size: 0,
query: {
bool: {
...datasetFilter,
filter: otherFilters,
},
},
aggs: aggs(after?.totalDocs),
},
});
]);

const currDegradedDocs =
response.aggregations?.datasets.buckets.map((bucket) => ({
response.responses[0].aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`,
percentage: (bucket.degraded.doc_count * 100) / bucket.doc_count,
count: bucket.degraded.doc_count,
count: bucket.doc_count,
})) ?? [];

const degradedDocs = [...prevResults, ...currDegradedDocs];
const degradedDocs = [...prevResults.degradedDocs, ...currDegradedDocs];

const currTotalDocs =
response.responses[1].aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`,
count: bucket.doc_count,
})) ?? [];

if (response.aggregations?.datasets.after_key) {
const totalDocs = [...prevResults.totalDocs, ...currTotalDocs];

if (
response.responses[0].aggregations?.datasets.after_key ||
response.responses[1].aggregations?.datasets.after_key
) {
return getDegradedDocsPaginated({
esClient,
type,
start,
end,
datasetQuery,
after: {
dataset: response.aggregations?.datasets.after_key.dataset as string,
namespace: response.aggregations?.datasets.after_key.namespace as string,
degradedDocs:
(response.responses[0].aggregations?.datasets.after_key as {
dataset: string;
namespace: string;
}) || after?.degradedDocs,
totalDocs:
(response.responses[1].aggregations?.datasets.after_key as {
dataset: string;
namespace: string;
}) || after?.totalDocs,
},
prevResults: degradedDocs,
prevResults: { degradedDocs, totalDocs },
});
}

return degradedDocs;
const degradedDocsMap = degradedDocs.reduce(
(acc, curr) => ({
...acc,
[curr.dataset]: curr.count,
}),
{}
);

return totalDocs.map((curr) => {
const degradedDocsCount = degradedDocsMap[curr.dataset as keyof typeof degradedDocsMap] || 0;

return {
...curr,
totalDocs: curr.count,
count: degradedDocsCount,
percentage: (degradedDocsCount / curr.count) * 100,
};
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const degradedDocsRoute = createDatasetQualityServerRoute({
endpoint: 'GET /internal/dataset_quality/data_streams/degraded_docs',
params: t.type({
query: t.intersection([
t.partial(rangeRt.props),
rangeRt,
typeRt,
t.partial({
datasetQuery: t.string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import { ESSearchRequest, InferSearchResponseOf } from '@kbn/es-types';
import { ElasticsearchClient } from '@kbn/core/server';
import { Indices } from '@elastic/elasticsearch/lib/api/types';

type DatasetQualityESSearchParams = ESSearchRequest & {
size: number;
Expand All @@ -21,5 +22,15 @@ export function createDatasetQualityESClient(esClient: ElasticsearchClient) {
): Promise<InferSearchResponseOf<TDocument, TParams>> {
return esClient.search<TDocument>(searchParams) as Promise<any>;
},
async msearch<TDocument, TParams extends DatasetQualityESSearchParams>(
index = {} as { index?: Indices },
searches: TParams[]
): Promise<{
responses: Array<InferSearchResponseOf<TDocument, TParams>>;
}> {
return esClient.msearch({
searches: searches.map((search) => [index, search]).flat(),
}) as Promise<any>;
},
};
}

0 comments on commit f82d640

Please sign in to comment.