Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Data Stream for Reporting storage #176022

Merged
merged 17 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/settings/reporting-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ reports, you might need to change the following settings.
If capturing a report fails for any reason, {kib} will re-queue the report job for retry, as many times as this setting. Defaults to `3`.

`xpack.reporting.queue.indexInterval`::
How often the index that stores reporting jobs rolls over to a new index. Valid values are `year`, `month`, `week`, `day`, and `hour`. Defaults to `week`.
deprecated:[8.15.0,This setting has no effect.] How often Reporting creates a new index to store report jobs and file contents.
Valid values are `year`, `month`, `week`, `day`, and `hour`. Defaults to `week`.
*NOTE*: This setting exists for backwards compatibility, but is unused. Use the built-in ILM policy provided for the reporting plugin to customize the rollover of Reporting data.

[[xpack-reportingQueue-pollEnabled]] `xpack.reporting.queue.pollEnabled` ::
When `true`, enables the {kib} instance to poll {es} for pending jobs and claim them for
Expand Down
7 changes: 3 additions & 4 deletions docs/user/reporting/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ NOTE: When you create a dashboard report that includes a data table or saved sea

. To view and manage reports, open the main menu, then click *Stack Management > Reporting*.

NOTE: Reports are stored in {es} and managed by the `kibana-reporting` {ilm}
({ilm-init}) policy. By default, the policy stores reports forever. To learn
more about {ilm-init} policies, refer to the {es}
{ref}/index-lifecycle-management.html[{ilm-init} documentation].
NOTE: In "stateful" deployments, reports are stored in {es} and managed by the `kibana-reporting` {ilm}
({ilm-init}) policy. By default, the policy stores reports forever. To learn more about {ilm-init} policies, refer
to the {es} {ref}/index-lifecycle-management.html[{ilm-init} documentation].

[float]
[[csv-limitations]]
Expand Down
2 changes: 1 addition & 1 deletion docs/user/reporting/script-example.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ An example response for a successfully queued report:
---------------------------------------------------------

<1> The relative path on the {kib} host for downloading the report.
<2> (Not included in the example) Internal representation of the reporting job, as found in the `.reporting-*` index.
<2> (Not included in the example) Internal representation of the reporting job, as found in the `.reporting-*` storage.
1 change: 1 addition & 0 deletions packages/kbn-reporting/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export const REPORTING_MANAGEMENT_HOME = '/app/management/insightsAndAlerting/re
* ILM
*/

// The ILM policy manages stored reports only in stateful deployments.
export const ILM_POLICY_NAME = 'kibana-reporting';

/*
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-reporting/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ export interface ReportSource {
migration_version: string; // for reminding the user to update their POST URL
attempts: number; // initially populated as 0
created_at: string; // timestamp in UTC
'@timestamp'?: string; // creation timestamp, only used for data streams compatibility
status: JOB_STATUS;

/*
Expand Down
6 changes: 0 additions & 6 deletions packages/kbn-reporting/common/url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ export interface LocatorParams<P extends SerializableRecord = SerializableRecord
params: P;
}

export type IlmPolicyMigrationStatus = 'policy-not-found' | 'indices-not-managed-by-policy' | 'ok';

export interface IlmPolicyStatusResponse {
status: IlmPolicyMigrationStatus;
}

type Url = string;
type UrlLocatorTuple = [url: Url, locatorParams: LocatorParams];

Expand Down
11 changes: 10 additions & 1 deletion packages/kbn-reporting/server/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,16 @@ export const PLUGIN_ID = 'reporting';
* Storage
*/

export const REPORTING_SYSTEM_INDEX = '.reporting';
// Used to index new documents
export const REPORTING_DATA_STREAM_ALIAS = '.kibana-reporting';
// Used to retrieve settings
export const REPORTING_DATA_STREAM_WILDCARD = '.kibana-reporting*';
// Index pattern of plain indices before Reporting used Data Stream storage
export const REPORTING_LEGACY_INDICES = '.reporting-*';
// Used to search for all reports and check for managing privileges
export const REPORTING_DATA_STREAM_WILDCARD_WITH_LEGACY = '.reporting-*,.kibana-reporting*';
// Name of component template which Kibana overrides for lifecycle settings
export const REPORTING_DATA_STREAM_COMPONENT_TEMPLATE = 'kibana-reporting@custom';

/*
* Telemetry
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/reporting/server/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export const config: PluginConfigDescriptor<ReportingConfigType> = {
},
schema: ConfigSchema,
deprecations: ({ unused }) => [
unused('queue.indexInterval', { level: 'warning' }), // unused since 8.15
unused('capture.browser.chromium.maxScreenshotDimension', { level: 'warning' }), // unused since 7.8
unused('capture.browser.type', { level: 'warning' }),
unused('poll.jobCompletionNotifier.intervalErrorMultiplier', { level: 'warning' }), // unused since 7.10
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/reporting/server/deprecations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const registerDeprecations = ({
core.deprecations.registerDeprecations({
getDeprecations: async (ctx) => {
return [
...(await getIlmPolicyDeprecationsInfo(ctx, { reportingCore })),
...(await getIlmPolicyDeprecationsInfo(ctx)),
...(await getReportingRoleDeprecationsInfo(ctx, { reportingCore })),
];
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@

import type { GetDeprecationsContext } from '@kbn/core/server';
import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/server/mocks';
import { createMockConfigSchema } from '@kbn/reporting-mocks-server';

import { ReportingCore } from '../core';
import { createMockReportingCore } from '../test_helpers';

import { getDeprecationsInfo } from './migrate_existing_indices_ilm_policy';

Expand All @@ -21,12 +17,10 @@ type ScopedClusterClientMock = ReturnType<
describe("Migrate existing indices' ILM policy deprecations", () => {
let esClient: ScopedClusterClientMock;
let deprecationsCtx: GetDeprecationsContext;
let reportingCore: ReportingCore;

beforeEach(async () => {
esClient = elasticsearchServiceMock.createScopedClusterClient();
deprecationsCtx = { esClient, savedObjectsClient: savedObjectsClientMock.create() };
reportingCore = await createMockReportingCore(createMockConfigSchema());
});

const createIndexSettings = (lifecycleName: string) => ({
Expand All @@ -47,7 +41,7 @@ describe("Migrate existing indices' ILM policy deprecations", () => {
indexB: createIndexSettings('kibana-reporting'),
});

expect(await getDeprecationsInfo(deprecationsCtx, { reportingCore })).toMatchInlineSnapshot(`
expect(await getDeprecationsInfo(deprecationsCtx)).toMatchInlineSnapshot(`
Array [
Object {
"correctiveActions": Object {
Expand All @@ -60,7 +54,7 @@ describe("Migrate existing indices' ILM policy deprecations", () => {
],
},
"level": "warning",
"message": "New reporting indices will be managed by the \\"kibana-reporting\\" provisioned ILM policy. You must edit this policy to manage the report lifecycle. This change targets all indices prefixed with \\".reporting-*\\".",
"message": "New reporting indices will be managed by the \\"kibana-reporting\\" provisioned ILM policy. You must edit this policy to manage the report lifecycle. This change targets the hidden system index pattern \\".kibana-reporting*\\".",
"title": "Found reporting indices managed by custom ILM policy.",
},
]
Expand All @@ -73,14 +67,10 @@ describe("Migrate existing indices' ILM policy deprecations", () => {
indexB: createIndexSettings('kibana-reporting'),
});

expect(await getDeprecationsInfo(deprecationsCtx, { reportingCore })).toMatchInlineSnapshot(
`Array []`
);
expect(await getDeprecationsInfo(deprecationsCtx)).toMatchInlineSnapshot(`Array []`);

esClient.asInternalUser.indices.getSettings.mockResponse({});

expect(await getDeprecationsInfo(deprecationsCtx, { reportingCore })).toMatchInlineSnapshot(
`Array []`
);
expect(await getDeprecationsInfo(deprecationsCtx)).toMatchInlineSnapshot(`Array []`);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,14 @@
import { DeprecationsDetails, GetDeprecationsContext } from '@kbn/core/server';
import { i18n } from '@kbn/i18n';
import { ILM_POLICY_NAME, INTERNAL_ROUTES } from '@kbn/reporting-common';
import { ReportingCore } from '../core';
import { deprecations } from '../lib/deprecations';
import { REPORTING_DATA_STREAM_WILDCARD } from '@kbn/reporting-server';
import { IlmPolicyManager } from '../lib/store';

interface ExtraDependencies {
reportingCore: ReportingCore;
}

export const getDeprecationsInfo = async (
{ esClient }: GetDeprecationsContext,
{ reportingCore }: ExtraDependencies
): Promise<DeprecationsDetails[]> => {
const store = await reportingCore.getStore();
const indexPattern = store.getReportingIndexPattern();

const migrationStatus = await deprecations.checkIlmMigrationStatus({
reportingCore,
elasticsearchClient: esClient.asInternalUser,
});
export const getDeprecationsInfo = async ({
esClient,
}: GetDeprecationsContext): Promise<DeprecationsDetails[]> => {
const ilmPolicyManager = IlmPolicyManager.create({ client: esClient.asInternalUser });
const migrationStatus = await ilmPolicyManager.checkIlmMigrationStatus();

if (migrationStatus !== 'ok') {
return [
Expand All @@ -35,10 +25,10 @@ export const getDeprecationsInfo = async (
}),
level: 'warning',
message: i18n.translate('xpack.reporting.deprecations.migrateIndexIlmPolicyActionMessage', {
defaultMessage: `New reporting indices will be managed by the "{reportingIlmPolicy}" provisioned ILM policy. You must edit this policy to manage the report lifecycle. This change targets all indices prefixed with "{indexPattern}".`,
defaultMessage: `New reporting indices will be managed by the "{reportingIlmPolicy}" provisioned ILM policy. You must edit this policy to manage the report lifecycle. This change targets the hidden system index pattern "{indexPattern}".`,
values: {
reportingIlmPolicy: ILM_POLICY_NAME,
indexPattern,
indexPattern: REPORTING_DATA_STREAM_WILDCARD,
},
}),
correctiveActions: {
Expand Down
24 changes: 18 additions & 6 deletions x-pack/plugins/reporting/server/lib/content_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ describe('ContentStream', () => {
'body.query.constant_score.filter.bool.must.0.term._id',
'something'
);
expect(request2).toHaveProperty('index', 'somewhere');
expect(request2).toHaveProperty('index', '.reporting-*,.kibana-reporting*');
expect(request2).toHaveProperty(
'body.query.constant_score.filter.bool.must.0.term.parent_id',
'something'
);
expect(request3).toHaveProperty('index', 'somewhere');
expect(request3).toHaveProperty('index', '.reporting-*,.kibana-reporting*');
expect(request3).toHaveProperty(
'body.query.constant_score.filter.bool.must.0.term.parent_id',
'something'
Expand Down Expand Up @@ -293,8 +293,11 @@ describe('ContentStream', () => {
1,
expect.objectContaining({
id: expect.any(String),
index: 'somewhere',
index: '.kibana-reporting',
op_type: 'create',
refresh: 'wait_for',
body: {
'@timestamp': '1970-01-01T00:00:00.000Z',
parent_id: 'something',
output: {
content: '34',
Expand All @@ -307,8 +310,11 @@ describe('ContentStream', () => {
2,
expect.objectContaining({
id: expect.any(String),
index: 'somewhere',
index: '.kibana-reporting',
op_type: 'create',
refresh: 'wait_for',
body: {
'@timestamp': '1970-01-01T00:00:00.000Z',
parent_id: 'something',
output: {
content: '56',
Expand All @@ -335,9 +341,12 @@ describe('ContentStream', () => {
1,
expect.objectContaining({
id: expect.any(String),
index: 'somewhere',
index: '.kibana-reporting',
op_type: 'create',
refresh: 'wait_for',
body: {
parent_id: 'something',
'@timestamp': '1970-01-01T00:00:00.000Z',
output: {
content: Buffer.from('456').toString('base64'),
chunk: 1,
Expand All @@ -349,9 +358,12 @@ describe('ContentStream', () => {
2,
expect.objectContaining({
id: expect.any(String),
index: 'somewhere',
index: '.kibana-reporting',
op_type: 'create',
refresh: 'wait_for',
body: {
parent_id: 'something',
'@timestamp': '1970-01-01T00:00:00.000Z',
output: {
content: Buffer.from('78').toString('base64'),
chunk: 2,
Expand Down
29 changes: 21 additions & 8 deletions x-pack/plugins/reporting/server/lib/content_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import { Duplex } from 'stream';
import { v4 as uuidv4 } from 'uuid';

import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { ReportSource } from '@kbn/reporting-common/types';
import {
REPORTING_DATA_STREAM_ALIAS,
REPORTING_DATA_STREAM_WILDCARD_WITH_LEGACY,
} from '@kbn/reporting-server';
import type { ReportingCore } from '..';

const ONE_MB = 1024 * 1024;
Expand All @@ -31,6 +35,7 @@ interface ChunkOutput {
}

interface ChunkSource {
'@timestamp': string;
parent_id: string;
output: ChunkOutput;
}
Expand Down Expand Up @@ -90,7 +95,7 @@ export class ContentStream extends Duplex {

private async readHead() {
const { id, index } = this.document;
const body: SearchRequest['body'] = {
const body: SearchRequest = {
_source: { includes: ['output.content', 'output.size', 'jobtype'] },
query: {
constant_score: {
Expand All @@ -110,13 +115,14 @@ export class ContentStream extends Duplex {
const hits = response?.hits?.hits?.[0];

this.jobSize = hits?._source?.output?.size;
this.logger.debug(`Reading job of size ${this.jobSize}`);

return hits?._source?.output?.content;
}

private async readChunk() {
const { id, index } = this.document;
const body: SearchRequest['body'] = {
const { id } = this.document;
const body: SearchRequest = {
_source: { includes: ['output.content'] },
query: {
constant_score: {
Expand All @@ -132,7 +138,10 @@ export class ContentStream extends Duplex {

this.logger.debug(`Reading chunk #${this.chunksRead}.`);

const response = await this.client.search<ChunkSource>({ body, index });
const response = await this.client.search<ChunkSource>({
body,
index: REPORTING_DATA_STREAM_WILDCARD_WITH_LEGACY,
});
const hits = response?.hits?.hits?.[0];

return hits?._source?.output.content;
Expand Down Expand Up @@ -179,10 +188,11 @@ export class ContentStream extends Duplex {
}

private async writeHead(content: string) {
this.logger.debug(`Updating report contents.`);
this.logger.debug(`Updating chunk #0 (${this.document.id}).`);

const body = await this.client.update<ReportSource>({
...this.document,
refresh: 'wait_for',
body: {
doc: {
output: { content },
Expand All @@ -194,16 +204,19 @@ export class ContentStream extends Duplex {
}

private async writeChunk(content: string) {
const { id: parentId, index } = this.document;
const { id: parentId } = this.document;
const id = uuidv4();

this.logger.debug(`Writing chunk #${this.chunksWritten} (${id}).`);

await this.client.index<ChunkSource>({
id,
index,
index: REPORTING_DATA_STREAM_ALIAS,
refresh: 'wait_for',
op_type: 'create',
body: {
parent_id: parentId,
'@timestamp': new Date(0).toISOString(), // required for data streams compatibility
output: {
content,
chunk: this.chunksWritten,
Expand Down
Loading