Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataset quality] using msearch to speed up degradedDocs query #183023

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ export const getIntegrationsResponseRt = rt.exact(
})
);

export const degradedDocsRt = rt.type({
dataset: rt.string,
percentage: rt.number,
count: rt.number,
});
export const degradedDocsRt = rt.intersection([
rt.type({
dataset: rt.string,
}),
rt.partial({
count: rt.number,
yngrdyn marked this conversation as resolved.
Show resolved Hide resolved
totalDocs: rt.number,
percentage: rt.number,
}),
]);

export type DegradedDocs = rt.TypeOf<typeof degradedDocsRt>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ export async function getDegradedDocsPaginated(options: {
end?: number;
datasetQuery?: string;
after?: {
dataset: string;
namespace: string;
degradedDocs?: { dataset: string; namespace: string };
totalDocs?: { dataset: string; namespace: string };
};
prevResults?: DegradedDocs[];
prevResults?: { degradedDocs: DegradedDocs[]; totalDocs: DegradedDocs[] };
}): Promise<DegradedDocs[]> {
const {
esClient,
Expand All @@ -37,74 +37,121 @@ export async function getDegradedDocsPaginated(options: {
start,
end,
after,
prevResults = [],
prevResults = { degradedDocs: [], totalDocs: [] },
} = options;

const datasetQualityESClient = createDatasetQualityESClient(esClient);

const response = await datasetQualityESClient.search({
index: '*',
size: 0,
query: {
bool: {
...(datasetQuery
? {
should: [
...wildcardQuery(DATA_STREAM_DATASET, datasetQuery),
...wildcardQuery(DATA_STREAM_NAMESPACE, datasetQuery),
],
minimum_should_match: 1,
}
: {}),
filter: [...rangeQuery(start, end), ...termQuery(DATA_STREAM_TYPE, type)],
const datasetFilter = {
...(datasetQuery
? {
should: [
...wildcardQuery(DATA_STREAM_DATASET, datasetQuery),
...wildcardQuery(DATA_STREAM_NAMESPACE, datasetQuery),
],
minimum_should_match: 1,
}
: {}),
};

const otherFilters = [...rangeQuery(start, end), ...termQuery(DATA_STREAM_TYPE, type)];

const aggs = (afterKey?: { dataset: string; namespace: string }) => ({
datasets: {
composite: {
...(afterKey ? { after: afterKey } : {}),
size: 10000,
sources: [
{ dataset: { terms: { field: 'data_stream.dataset' } } },
{ namespace: { terms: { field: 'data_stream.namespace' } } },
],
},
},
aggs: {
datasets: {
composite: {
...(after ? { after } : {}),
size: 10000,
sources: [
{ dataset: { terms: { field: DATA_STREAM_DATASET } } },
{ namespace: { terms: { field: DATA_STREAM_NAMESPACE } } },
],
});

const response = await datasetQualityESClient.msearch({ index: `${type}-*` }, [
// degraded docs per dataset
{
size: 0,
query: {
bool: {
...datasetFilter,
filter: otherFilters,
must: { exists: { field: _IGNORED } },
},
aggs: {
degraded: {
filter: {
exists: {
field: _IGNORED,
},
},
},
},
aggs: aggs(after?.degradedDocs),
},
// total docs per dataset
{
size: 0,
Copy link
Contributor

@mohamedhamed-ahmed mohamedhamed-ahmed May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually need this query to get the total docs? don't you automatically get the total doc counts from bucket.doc_count?

my thoughts was the we just need to add 1 more prop and assign the value to it.

Screenshot 2024-05-09 at 12 45 49

Copy link
Contributor Author

@yngrdyn yngrdyn May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, in the first query we just get the total number of documents with _ignored not null. A bucket in the first query will look like

{
  "key": {
    "dataset": "apm.error",
    "namespace": "default"
  },
  "doc_count": 102
},

Notice that we are not doing the nested aggregation anymore, which I have the theory is the most expensive one. And yes, we do need the total documents in the timerange to get the ratio (percentages).

query: {
bool: {
...datasetFilter,
filter: otherFilters,
},
},
aggs: aggs(after?.totalDocs),
},
});
]);

const currDegradedDocs =
response.aggregations?.datasets.buckets.map((bucket) => ({
response.responses[0].aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`,
count: bucket.doc_count,
})) ?? [];

const degradedDocs = [...prevResults.degradedDocs, ...currDegradedDocs];

const currTotalDocs =
response.responses[1].aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`,
percentage: (bucket.degraded.doc_count * 100) / bucket.doc_count,
count: bucket.degraded.doc_count,
totalDocs: bucket.doc_count,
})) ?? [];

const degradedDocs = [...prevResults, ...currDegradedDocs];
const totalDocs = [...prevResults.totalDocs, ...currTotalDocs];

if (response.aggregations?.datasets.after_key) {
if (
response.responses[0].aggregations?.datasets.after_key ||
response.responses[1].aggregations?.datasets.after_key
) {
return getDegradedDocsPaginated({
esClient,
type,
start,
end,
datasetQuery,
after: {
dataset: response.aggregations?.datasets.after_key.dataset as string,
namespace: response.aggregations?.datasets.after_key.namespace as string,
degradedDocs:
(response.responses[0].aggregations?.datasets.after_key as {
dataset: string;
namespace: string;
}) || after?.degradedDocs,
totalDocs:
(response.responses[1].aggregations?.datasets.after_key as {
dataset: string;
namespace: string;
}) || after?.totalDocs,
},
prevResults: degradedDocs,
prevResults: { degradedDocs, totalDocs },
});
}

return degradedDocs;
const degradedDocsMap = degradedDocs.reduce(
(acc, curr) => ({
...acc,
[curr.dataset]: curr.count,
}),
{}
);

return totalDocs.map((curr) => {
const degradedDocsCount = degradedDocsMap[curr.dataset as keyof typeof degradedDocsMap] || 0;

return {
...curr,
count: degradedDocsCount,
percentage: (degradedDocsCount / curr.totalDocs!) * 100,
};
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import { ESSearchRequest, InferSearchResponseOf } from '@kbn/es-types';
import { ElasticsearchClient } from '@kbn/core/server';
import { Indices } from '@elastic/elasticsearch/lib/api/types';

type DatasetQualityESSearchParams = ESSearchRequest & {
size: number;
Expand All @@ -21,5 +22,15 @@ export function createDatasetQualityESClient(esClient: ElasticsearchClient) {
): Promise<InferSearchResponseOf<TDocument, TParams>> {
return esClient.search<TDocument>(searchParams) as Promise<any>;
},
async msearch<TDocument, TParams extends DatasetQualityESSearchParams>(
index = {} as { index?: Indices },
searches: TParams[]
): Promise<{
responses: Array<InferSearchResponseOf<TDocument, TParams>>;
}> {
return esClient.msearch({
searches: searches.map((search) => [index, search]).flat(),
}) as Promise<any>;
},
};
}