Skip to content

Commit

Permalink
Use Data Stream for Reporting storage (#176022)
Browse files Browse the repository at this point in the history
## Summary

Closes #161608

* [X] Depends on elastic/elasticsearch#97765
* [x] Depends on elastic/elasticsearch#107581
* [x] Add create a new report job and check the details of the templated
data stream.
* [x] Run Discover tests in Flaky Test Runner:
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/5999

## Release Note

Reporting internal storage has been changed from using regular indices
to a data stream configuration for a more efficient sharding strategy.
This change is not expected to have any impact to users.

## Screenshots

### Upgrade test (manual process)
Using a report generated before this change, and a report generated
after "upgrading":

![image](https://github.com/elastic/kibana/assets/908371/f92193d8-70d6-4fa5-b1b7-8f6c1a0a5e9f)
Even though the two reports are in different types storage, they are
still managed by the same policy:

![image](https://github.com/elastic/kibana/assets/908371/9bd68d99-72ed-4cf0-bef9-55e644f039b7)
Looking at the details of the policy shows how the different types of
storage are used:

![image](https://github.com/elastic/kibana/assets/908371/6c0d1f80-97cb-4990-b2a8-45deab7528bc)

### Log lines

Initial startup in clean environment
```
[2024-05-13T13:22:49.138-07:00][INFO ][plugins.reporting.store] Creating ILM policy for reporting data stream: kibana-reporting
[2024-05-13T13:22:53.337-07:00][INFO ][plugins.reporting.store] Linking ILM policy to reporting data stream: .kibana-reporting, component template: kibana-reporting@custom
```

Kibana restart with ES running continuously
```
[2024-05-13T13:24:32.733-07:00][DEBUG][plugins.reporting.store] Found ILM policy kibana-reporting; skipping creation.
[2024-05-13T13:24:32.733-07:00][INFO ][plugins.reporting.store] Linking ILM policy to reporting data stream: .kibana-reporting, component template: kibana-reporting@custom
```

### Checklist

- [x]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
~~See
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/5302
(internal link)~~
  • Loading branch information
tsullivan authored May 21, 2024
1 parent 3fc2b89 commit 56383cc
Show file tree
Hide file tree
Showing 38 changed files with 409 additions and 444 deletions.
4 changes: 3 additions & 1 deletion docs/settings/reporting-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ reports, you might need to change the following settings.
If capturing a report fails for any reason, {kib} will re-queue the report job for retry, as many times as this setting. Defaults to `3`.

`xpack.reporting.queue.indexInterval`::
How often the index that stores reporting jobs rolls over to a new index. Valid values are `year`, `month`, `week`, `day`, and `hour`. Defaults to `week`.
deprecated:[8.15.0,This setting has no effect.] How often Reporting creates a new index to store report jobs and file contents.
Valid values are `year`, `month`, `week`, `day`, and `hour`. Defaults to `week`.
*NOTE*: This setting exists for backwards compatibility, but is unused. Use the built-in ILM policy provided for the reporting plugin to customize the rollover of Reporting data.

[[xpack-reportingQueue-pollEnabled]] `xpack.reporting.queue.pollEnabled` ::
When `true`, enables the {kib} instance to poll {es} for pending jobs and claim them for
Expand Down
7 changes: 3 additions & 4 deletions docs/user/reporting/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ NOTE: When you create a dashboard report that includes a data table or saved sea

. To view and manage reports, open the main menu, then click *Stack Management > Reporting*.

NOTE: Reports are stored in {es} and managed by the `kibana-reporting` {ilm}
({ilm-init}) policy. By default, the policy stores reports forever. To learn
more about {ilm-init} policies, refer to the {es}
{ref}/index-lifecycle-management.html[{ilm-init} documentation].
NOTE: In "stateful" deployments, reports are stored in {es} and managed by the `kibana-reporting` {ilm}
({ilm-init}) policy. By default, the policy stores reports forever. To learn more about {ilm-init} policies, refer
to the {es} {ref}/index-lifecycle-management.html[{ilm-init} documentation].

[float]
[[csv-limitations]]
Expand Down
2 changes: 1 addition & 1 deletion docs/user/reporting/script-example.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ An example response for a successfully queued report:
---------------------------------------------------------

<1> The relative path on the {kib} host for downloading the report.
<2> (Not included in the example) Internal representation of the reporting job, as found in the `.reporting-*` index.
<2> (Not included in the example) Internal representation of the reporting job, as found in the `.reporting-*` storage.
1 change: 1 addition & 0 deletions packages/kbn-reporting/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export const REPORTING_MANAGEMENT_HOME = '/app/management/insightsAndAlerting/re
* ILM
*/

// The ILM policy manages stored reports only in stateful deployments.
export const ILM_POLICY_NAME = 'kibana-reporting';

/*
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-reporting/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ export interface ReportSource {
migration_version: string; // for reminding the user to update their POST URL
attempts: number; // initially populated as 0
created_at: string; // timestamp in UTC
'@timestamp'?: string; // creation timestamp, only used for data streams compatibility
status: JOB_STATUS;

/*
Expand Down
6 changes: 0 additions & 6 deletions packages/kbn-reporting/common/url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ export interface LocatorParams<P extends SerializableRecord = SerializableRecord
params: P;
}

export type IlmPolicyMigrationStatus = 'policy-not-found' | 'indices-not-managed-by-policy' | 'ok';

export interface IlmPolicyStatusResponse {
status: IlmPolicyMigrationStatus;
}

type Url = string;
type UrlLocatorTuple = [url: Url, locatorParams: LocatorParams];

Expand Down
11 changes: 10 additions & 1 deletion packages/kbn-reporting/server/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,16 @@ export const PLUGIN_ID = 'reporting';
* Storage
*/

export const REPORTING_SYSTEM_INDEX = '.reporting';
// Used to index new documents
export const REPORTING_DATA_STREAM_ALIAS = '.kibana-reporting';
// Used to retrieve settings
export const REPORTING_DATA_STREAM_WILDCARD = '.kibana-reporting*';
// Index pattern of plain indices before Reporting used Data Stream storage
export const REPORTING_LEGACY_INDICES = '.reporting-*';
// Used to search for all reports and check for managing privileges
export const REPORTING_DATA_STREAM_WILDCARD_WITH_LEGACY = '.reporting-*,.kibana-reporting*';
// Name of component template which Kibana overrides for lifecycle settings
export const REPORTING_DATA_STREAM_COMPONENT_TEMPLATE = 'kibana-reporting@custom';

/*
* Telemetry
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/reporting/server/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export const config: PluginConfigDescriptor<ReportingConfigType> = {
},
schema: ConfigSchema,
deprecations: ({ unused }) => [
unused('queue.indexInterval', { level: 'warning' }), // unused since 8.15
unused('capture.browser.chromium.maxScreenshotDimension', { level: 'warning' }), // unused since 7.8
unused('capture.browser.type', { level: 'warning' }),
unused('poll.jobCompletionNotifier.intervalErrorMultiplier', { level: 'warning' }), // unused since 7.10
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/reporting/server/deprecations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const registerDeprecations = ({
core.deprecations.registerDeprecations({
getDeprecations: async (ctx) => {
return [
...(await getIlmPolicyDeprecationsInfo(ctx, { reportingCore })),
...(await getIlmPolicyDeprecationsInfo(ctx)),
...(await getReportingRoleDeprecationsInfo(ctx, { reportingCore })),
];
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@

import type { GetDeprecationsContext } from '@kbn/core/server';
import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/server/mocks';
import { createMockConfigSchema } from '@kbn/reporting-mocks-server';

import { ReportingCore } from '../core';
import { createMockReportingCore } from '../test_helpers';

import { getDeprecationsInfo } from './migrate_existing_indices_ilm_policy';

Expand All @@ -21,12 +17,10 @@ type ScopedClusterClientMock = ReturnType<
describe("Migrate existing indices' ILM policy deprecations", () => {
let esClient: ScopedClusterClientMock;
let deprecationsCtx: GetDeprecationsContext;
let reportingCore: ReportingCore;

beforeEach(async () => {
esClient = elasticsearchServiceMock.createScopedClusterClient();
deprecationsCtx = { esClient, savedObjectsClient: savedObjectsClientMock.create() };
reportingCore = await createMockReportingCore(createMockConfigSchema());
});

const createIndexSettings = (lifecycleName: string) => ({
Expand All @@ -47,7 +41,7 @@ describe("Migrate existing indices' ILM policy deprecations", () => {
indexB: createIndexSettings('kibana-reporting'),
});

expect(await getDeprecationsInfo(deprecationsCtx, { reportingCore })).toMatchInlineSnapshot(`
expect(await getDeprecationsInfo(deprecationsCtx)).toMatchInlineSnapshot(`
Array [
Object {
"correctiveActions": Object {
Expand All @@ -60,7 +54,7 @@ describe("Migrate existing indices' ILM policy deprecations", () => {
],
},
"level": "warning",
"message": "New reporting indices will be managed by the \\"kibana-reporting\\" provisioned ILM policy. You must edit this policy to manage the report lifecycle. This change targets all indices prefixed with \\".reporting-*\\".",
"message": "New reporting indices will be managed by the \\"kibana-reporting\\" provisioned ILM policy. You must edit this policy to manage the report lifecycle. This change targets the hidden system index pattern \\".kibana-reporting*\\".",
"title": "Found reporting indices managed by custom ILM policy.",
},
]
Expand All @@ -73,14 +67,10 @@ describe("Migrate existing indices' ILM policy deprecations", () => {
indexB: createIndexSettings('kibana-reporting'),
});

expect(await getDeprecationsInfo(deprecationsCtx, { reportingCore })).toMatchInlineSnapshot(
`Array []`
);
expect(await getDeprecationsInfo(deprecationsCtx)).toMatchInlineSnapshot(`Array []`);

esClient.asInternalUser.indices.getSettings.mockResponse({});

expect(await getDeprecationsInfo(deprecationsCtx, { reportingCore })).toMatchInlineSnapshot(
`Array []`
);
expect(await getDeprecationsInfo(deprecationsCtx)).toMatchInlineSnapshot(`Array []`);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,14 @@
import { DeprecationsDetails, GetDeprecationsContext } from '@kbn/core/server';
import { i18n } from '@kbn/i18n';
import { ILM_POLICY_NAME, INTERNAL_ROUTES } from '@kbn/reporting-common';
import { ReportingCore } from '../core';
import { deprecations } from '../lib/deprecations';
import { REPORTING_DATA_STREAM_WILDCARD } from '@kbn/reporting-server';
import { IlmPolicyManager } from '../lib/store';

interface ExtraDependencies {
reportingCore: ReportingCore;
}

export const getDeprecationsInfo = async (
{ esClient }: GetDeprecationsContext,
{ reportingCore }: ExtraDependencies
): Promise<DeprecationsDetails[]> => {
const store = await reportingCore.getStore();
const indexPattern = store.getReportingIndexPattern();

const migrationStatus = await deprecations.checkIlmMigrationStatus({
reportingCore,
elasticsearchClient: esClient.asInternalUser,
});
export const getDeprecationsInfo = async ({
esClient,
}: GetDeprecationsContext): Promise<DeprecationsDetails[]> => {
const ilmPolicyManager = IlmPolicyManager.create({ client: esClient.asInternalUser });
const migrationStatus = await ilmPolicyManager.checkIlmMigrationStatus();

if (migrationStatus !== 'ok') {
return [
Expand All @@ -35,10 +25,10 @@ export const getDeprecationsInfo = async (
}),
level: 'warning',
message: i18n.translate('xpack.reporting.deprecations.migrateIndexIlmPolicyActionMessage', {
defaultMessage: `New reporting indices will be managed by the "{reportingIlmPolicy}" provisioned ILM policy. You must edit this policy to manage the report lifecycle. This change targets all indices prefixed with "{indexPattern}".`,
defaultMessage: `New reporting indices will be managed by the "{reportingIlmPolicy}" provisioned ILM policy. You must edit this policy to manage the report lifecycle. This change targets the hidden system index pattern "{indexPattern}".`,
values: {
reportingIlmPolicy: ILM_POLICY_NAME,
indexPattern,
indexPattern: REPORTING_DATA_STREAM_WILDCARD,
},
}),
correctiveActions: {
Expand Down
24 changes: 18 additions & 6 deletions x-pack/plugins/reporting/server/lib/content_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ describe('ContentStream', () => {
'body.query.constant_score.filter.bool.must.0.term._id',
'something'
);
expect(request2).toHaveProperty('index', 'somewhere');
expect(request2).toHaveProperty('index', '.reporting-*,.kibana-reporting*');
expect(request2).toHaveProperty(
'body.query.constant_score.filter.bool.must.0.term.parent_id',
'something'
);
expect(request3).toHaveProperty('index', 'somewhere');
expect(request3).toHaveProperty('index', '.reporting-*,.kibana-reporting*');
expect(request3).toHaveProperty(
'body.query.constant_score.filter.bool.must.0.term.parent_id',
'something'
Expand Down Expand Up @@ -293,8 +293,11 @@ describe('ContentStream', () => {
1,
expect.objectContaining({
id: expect.any(String),
index: 'somewhere',
index: '.kibana-reporting',
op_type: 'create',
refresh: 'wait_for',
body: {
'@timestamp': '1970-01-01T00:00:00.000Z',
parent_id: 'something',
output: {
content: '34',
Expand All @@ -307,8 +310,11 @@ describe('ContentStream', () => {
2,
expect.objectContaining({
id: expect.any(String),
index: 'somewhere',
index: '.kibana-reporting',
op_type: 'create',
refresh: 'wait_for',
body: {
'@timestamp': '1970-01-01T00:00:00.000Z',
parent_id: 'something',
output: {
content: '56',
Expand All @@ -335,9 +341,12 @@ describe('ContentStream', () => {
1,
expect.objectContaining({
id: expect.any(String),
index: 'somewhere',
index: '.kibana-reporting',
op_type: 'create',
refresh: 'wait_for',
body: {
parent_id: 'something',
'@timestamp': '1970-01-01T00:00:00.000Z',
output: {
content: Buffer.from('456').toString('base64'),
chunk: 1,
Expand All @@ -349,9 +358,12 @@ describe('ContentStream', () => {
2,
expect.objectContaining({
id: expect.any(String),
index: 'somewhere',
index: '.kibana-reporting',
op_type: 'create',
refresh: 'wait_for',
body: {
parent_id: 'something',
'@timestamp': '1970-01-01T00:00:00.000Z',
output: {
content: Buffer.from('78').toString('base64'),
chunk: 2,
Expand Down
29 changes: 21 additions & 8 deletions x-pack/plugins/reporting/server/lib/content_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import { Duplex } from 'stream';
import { v4 as uuidv4 } from 'uuid';

import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { ReportSource } from '@kbn/reporting-common/types';
import {
REPORTING_DATA_STREAM_ALIAS,
REPORTING_DATA_STREAM_WILDCARD_WITH_LEGACY,
} from '@kbn/reporting-server';
import type { ReportingCore } from '..';

const ONE_MB = 1024 * 1024;
Expand All @@ -31,6 +35,7 @@ interface ChunkOutput {
}

interface ChunkSource {
'@timestamp': string;
parent_id: string;
output: ChunkOutput;
}
Expand Down Expand Up @@ -90,7 +95,7 @@ export class ContentStream extends Duplex {

private async readHead() {
const { id, index } = this.document;
const body: SearchRequest['body'] = {
const body: SearchRequest = {
_source: { includes: ['output.content', 'output.size', 'jobtype'] },
query: {
constant_score: {
Expand All @@ -110,13 +115,14 @@ export class ContentStream extends Duplex {
const hits = response?.hits?.hits?.[0];

this.jobSize = hits?._source?.output?.size;
this.logger.debug(`Reading job of size ${this.jobSize}`);

return hits?._source?.output?.content;
}

private async readChunk() {
const { id, index } = this.document;
const body: SearchRequest['body'] = {
const { id } = this.document;
const body: SearchRequest = {
_source: { includes: ['output.content'] },
query: {
constant_score: {
Expand All @@ -132,7 +138,10 @@ export class ContentStream extends Duplex {

this.logger.debug(`Reading chunk #${this.chunksRead}.`);

const response = await this.client.search<ChunkSource>({ body, index });
const response = await this.client.search<ChunkSource>({
body,
index: REPORTING_DATA_STREAM_WILDCARD_WITH_LEGACY,
});
const hits = response?.hits?.hits?.[0];

return hits?._source?.output.content;
Expand Down Expand Up @@ -179,10 +188,11 @@ export class ContentStream extends Duplex {
}

private async writeHead(content: string) {
this.logger.debug(`Updating report contents.`);
this.logger.debug(`Updating chunk #0 (${this.document.id}).`);

const body = await this.client.update<ReportSource>({
...this.document,
refresh: 'wait_for',
body: {
doc: {
output: { content },
Expand All @@ -194,16 +204,19 @@ export class ContentStream extends Duplex {
}

private async writeChunk(content: string) {
const { id: parentId, index } = this.document;
const { id: parentId } = this.document;
const id = uuidv4();

this.logger.debug(`Writing chunk #${this.chunksWritten} (${id}).`);

await this.client.index<ChunkSource>({
id,
index,
index: REPORTING_DATA_STREAM_ALIAS,
refresh: 'wait_for',
op_type: 'create',
body: {
parent_id: parentId,
'@timestamp': new Date(0).toISOString(), // required for data streams compatibility
output: {
content,
chunk: this.chunksWritten,
Expand Down
Loading

0 comments on commit 56383cc

Please sign in to comment.