From b81cf2e4a450627ef015d7e0d0dcd0fe521f8a53 Mon Sep 17 00:00:00 2001 From: Yngrid Coello Date: Thu, 9 May 2024 12:27:05 +0200 Subject: [PATCH 1/2] using msearch to speed up degradedDocs query --- .../dataset_quality/common/api_types.ts | 15 +- .../routes/data_streams/get_degraded_docs.ts | 139 ++++++++++++------ .../utils/create_dataset_quality_es_client.ts | 11 ++ 3 files changed, 114 insertions(+), 51 deletions(-) diff --git a/x-pack/plugins/observability_solution/dataset_quality/common/api_types.ts b/x-pack/plugins/observability_solution/dataset_quality/common/api_types.ts index 4dac346e2a26a..15f15c2356758 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/common/api_types.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/common/api_types.ts @@ -68,11 +68,16 @@ export const getIntegrationsResponseRt = rt.exact( }) ); -export const degradedDocsRt = rt.type({ - dataset: rt.string, - percentage: rt.number, - count: rt.number, -}); +export const degradedDocsRt = rt.intersection([ + rt.type({ + dataset: rt.string, + }), + rt.partial({ + count: rt.number, + totalDocs: rt.number, + percentage: rt.number, + }), +]); export type DegradedDocs = rt.TypeOf; diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_degraded_docs.ts b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_degraded_docs.ts index fbfb6791abf6c..95a14bb73ebb5 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_degraded_docs.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_degraded_docs.ts @@ -25,10 +25,10 @@ export async function getDegradedDocsPaginated(options: { end?: number; datasetQuery?: string; after?: { - dataset: string; - namespace: string; + degradedDocs?: { dataset: string; namespace: string }; + totalDocs?: { dataset: string; namespace: string }; }; - prevResults?: DegradedDocs[]; + prevResults?: { degradedDocs: DegradedDocs[]; totalDocs: DegradedDocs[] }; }): Promise { const { esClient, @@ -37,61 +37,84 @@ export async function getDegradedDocsPaginated(options: { start, end, after, - prevResults = [], + prevResults = { degradedDocs: [], totalDocs: [] }, } = options; const datasetQualityESClient = createDatasetQualityESClient(esClient); - const response = await datasetQualityESClient.search({ - index: '*', - size: 0, - query: { - bool: { - ...(datasetQuery - ? { - should: [ - ...wildcardQuery(DATA_STREAM_DATASET, datasetQuery), - ...wildcardQuery(DATA_STREAM_NAMESPACE, datasetQuery), - ], - minimum_should_match: 1, - } - : {}), - filter: [...rangeQuery(start, end), ...termQuery(DATA_STREAM_TYPE, type)], + const datasetFilter = { + ...(datasetQuery + ? { + should: [ + ...wildcardQuery(DATA_STREAM_DATASET, datasetQuery), + ...wildcardQuery(DATA_STREAM_NAMESPACE, datasetQuery), + ], + minimum_should_match: 1, + } + : {}), + }; + + const otherFilters = [...rangeQuery(start, end), ...termQuery(DATA_STREAM_TYPE, type)]; + + const aggs = (afterKey?: { dataset: string; namespace: string }) => ({ + datasets: { + composite: { + ...(afterKey ? { after: afterKey } : {}), + size: 10000, + sources: [ + { dataset: { terms: { field: 'data_stream.dataset' } } }, + { namespace: { terms: { field: 'data_stream.namespace' } } }, + ], }, }, - aggs: { - datasets: { - composite: { - ...(after ? { after } : {}), - size: 10000, - sources: [ - { dataset: { terms: { field: DATA_STREAM_DATASET } } }, - { namespace: { terms: { field: DATA_STREAM_NAMESPACE } } }, - ], + }); + + const response = await datasetQualityESClient.msearch({ index: `${type}-*` }, [ + // degraded docs per dataset + { + size: 0, + query: { + bool: { + ...datasetFilter, + filter: otherFilters, + must: { exists: { field: _IGNORED } }, }, - aggs: { - degraded: { - filter: { - exists: { - field: _IGNORED, - }, - }, - }, + }, + aggs: aggs(after?.degradedDocs), + }, + // total docs per dataset + { + size: 0, + query: { + bool: { + ...datasetFilter, + filter: otherFilters, }, }, + aggs: aggs(after?.totalDocs), }, - }); + ]); const currDegradedDocs = - response.aggregations?.datasets.buckets.map((bucket) => ({ + response.responses[0].aggregations?.datasets.buckets.map((bucket) => ({ + dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`, + count: bucket.doc_count, + })) ?? []; + + const degradedDocs = [...prevResults.degradedDocs, ...currDegradedDocs]; + + const currTotalDocs = + response.responses[1].aggregations?.datasets.buckets.map((bucket) => ({ dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`, - percentage: (bucket.degraded.doc_count * 100) / bucket.doc_count, - count: bucket.degraded.doc_count, + totalDocs: bucket.doc_count, })) ?? []; - const degradedDocs = [...prevResults, ...currDegradedDocs]; + const totalDocs = [...prevResults.totalDocs, ...currTotalDocs]; - if (response.aggregations?.datasets.after_key) { + if ( + response.responses[0].aggregations?.datasets.after_key || + response.responses[1].aggregations?.datasets.after_key + ) { return getDegradedDocsPaginated({ esClient, type, @@ -99,12 +122,36 @@ export async function getDegradedDocsPaginated(options: { end, datasetQuery, after: { - dataset: response.aggregations?.datasets.after_key.dataset as string, - namespace: response.aggregations?.datasets.after_key.namespace as string, + degradedDocs: + (response.responses[0].aggregations?.datasets.after_key as { + dataset: string; + namespace: string; + }) || after?.degradedDocs, + totalDocs: + (response.responses[1].aggregations?.datasets.after_key as { + dataset: string; + namespace: string; + }) || after?.totalDocs, }, - prevResults: degradedDocs, + prevResults: { degradedDocs, totalDocs }, }); } - return degradedDocs; + const degradedDocsMap = degradedDocs.reduce( + (acc, curr) => ({ + ...acc, + [curr.dataset]: curr.count, + }), + {} + ); + + return totalDocs.map((curr) => { + const degradedDocsCount = degradedDocsMap[curr.dataset as keyof typeof degradedDocsMap] || 0; + + return { + ...curr, + count: degradedDocsCount, + percentage: (degradedDocsCount / curr.totalDocs!) * 100, + }; + }); } diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/utils/create_dataset_quality_es_client.ts b/x-pack/plugins/observability_solution/dataset_quality/server/utils/create_dataset_quality_es_client.ts index 455308a7d91c3..d2c95ebbf0dbf 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/utils/create_dataset_quality_es_client.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/utils/create_dataset_quality_es_client.ts @@ -7,6 +7,7 @@ import { ESSearchRequest, InferSearchResponseOf } from '@kbn/es-types'; import { ElasticsearchClient } from '@kbn/core/server'; +import { Indices } from '@elastic/elasticsearch/lib/api/types'; type DatasetQualityESSearchParams = ESSearchRequest & { size: number; @@ -21,5 +22,15 @@ export function createDatasetQualityESClient(esClient: ElasticsearchClient) { ): Promise> { return esClient.search(searchParams) as Promise; }, + async msearch( + index = {} as { index?: Indices }, + searches: TParams[] + ): Promise<{ + responses: Array>; + }> { + return esClient.msearch({ + searches: searches.map((search) => [index, search]).flat(), + }) as Promise; + }, }; } From dd6ef1545cfb08cde1bde5454835dd526704b9f6 Mon Sep 17 00:00:00 2001 From: Yngrid Coello Date: Thu, 9 May 2024 12:59:40 +0200 Subject: [PATCH 2/2] PR comments --- .../dataset_quality/common/api_types.ts | 16 ++++++---------- .../routes/data_streams/get_degraded_docs.ts | 16 +++++++++++----- .../server/routes/data_streams/routes.ts | 2 +- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/x-pack/plugins/observability_solution/dataset_quality/common/api_types.ts b/x-pack/plugins/observability_solution/dataset_quality/common/api_types.ts index 15f15c2356758..c953d4589c614 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/common/api_types.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/common/api_types.ts @@ -68,16 +68,12 @@ export const getIntegrationsResponseRt = rt.exact( }) ); -export const degradedDocsRt = rt.intersection([ - rt.type({ - dataset: rt.string, - }), - rt.partial({ - count: rt.number, - totalDocs: rt.number, - percentage: rt.number, - }), -]); +export const degradedDocsRt = rt.type({ + dataset: rt.string, + count: rt.number, + totalDocs: rt.number, + percentage: rt.number, +}); export type DegradedDocs = rt.TypeOf; diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_degraded_docs.ts b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_degraded_docs.ts index 95a14bb73ebb5..246d572ec8036 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_degraded_docs.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_degraded_docs.ts @@ -18,17 +18,22 @@ import { } from '../../../common/es_fields'; import { createDatasetQualityESClient, wildcardQuery } from '../../utils'; +interface ResultBucket { + dataset: string; + count: number; +} + export async function getDegradedDocsPaginated(options: { esClient: ElasticsearchClient; type?: DataStreamType; - start?: number; - end?: number; + start: number; + end: number; datasetQuery?: string; after?: { degradedDocs?: { dataset: string; namespace: string }; totalDocs?: { dataset: string; namespace: string }; }; - prevResults?: { degradedDocs: DegradedDocs[]; totalDocs: DegradedDocs[] }; + prevResults?: { degradedDocs: ResultBucket[]; totalDocs: ResultBucket[] }; }): Promise { const { esClient, @@ -106,7 +111,7 @@ export async function getDegradedDocsPaginated(options: { const currTotalDocs = response.responses[1].aggregations?.datasets.buckets.map((bucket) => ({ dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`, - totalDocs: bucket.doc_count, + count: bucket.doc_count, })) ?? []; const totalDocs = [...prevResults.totalDocs, ...currTotalDocs]; @@ -150,8 +155,9 @@ export async function getDegradedDocsPaginated(options: { return { ...curr, + totalDocs: curr.count, count: degradedDocsCount, - percentage: (degradedDocsCount / curr.totalDocs!) * 100, + percentage: (degradedDocsCount / curr.count) * 100, }; }); } diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/routes.ts b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/routes.ts index 6dd8590c91f0b..3ad5a103f4313 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/routes.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/routes.ts @@ -66,7 +66,7 @@ const degradedDocsRoute = createDatasetQualityServerRoute({ endpoint: 'GET /internal/dataset_quality/data_streams/degraded_docs', params: t.type({ query: t.intersection([ - t.partial(rangeRt.props), + rangeRt, typeRt, t.partial({ datasetQuery: t.string,