Skip to content

Commit

Permalink
Switch Reporting to Task Manager (#64853)
Browse files Browse the repository at this point in the history
* [Reporting] Task Manager

* fix startup

* use synchronous config for task registration

* fix eslint

* pr-90365

* --wip-- [skip ci]

* set maxConcurrency to 0 if pollEnabled is false

* add test for execute_report

* remove unused test file

* more tests

* remove unused test files

* remove priority

* logging cleanups

* fix for queue.pollEnabled: false

* more logging fixes for less duplicated code

* update jest snapshots

* polish

* remove unnecessary

* Update mapping.ts

* polish

* fix bug if instance gets a monitoring task and pollEnabled is false

* simplification

* cosmetic

* fix test

* stop monitoring task sabotage

* update api docs

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
tsullivan and kibanamachine authored Mar 9, 2021
1 parent add02f1 commit 9fef424
Show file tree
Hide file tree
Showing 51 changed files with 1,598 additions and 2,652 deletions.
305 changes: 256 additions & 49 deletions api_docs/reporting.json

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions docs/user/reporting/script-example.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ Here is an example response for a successfully queued report:
"created_by": "elastic",
"payload": ..., <2>
"timeout": 120000,
"max_attempts": 3,
"priority": 10
"max_attempts": 3
}
}
---------------------------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions x-pack/plugins/reporting/common/schema_utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import moment from 'moment';
import { numberToDuration } from './schema_utils';

describe('Schema Utils', () => {
it('numberToDuration converts a number/Duration into a Duration object', () => {
expect(numberToDuration(500)).toMatchInlineSnapshot(`"PT0.5S"`);
expect(numberToDuration(moment.duration(1, 'hour'))).toMatchInlineSnapshot(`"PT1H"`);
});
});
7 changes: 7 additions & 0 deletions x-pack/plugins/reporting/common/schema_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ export const durationToNumber = (value: number | moment.Duration): number => {
return value.asMilliseconds();
};

export const numberToDuration = (value: number | moment.Duration): moment.Duration => {
if (typeof value === 'number') {
return moment.duration(value, 'milliseconds');
}
return value;
};

export const byteSizeValueToNumber = (value: number | ByteSizeValue) => {
if (typeof value === 'number') {
return value;
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/reporting/kibana.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"management",
"licensing",
"uiActions",
"taskManager",
"embeddable",
"share",
"features"
Expand Down
51 changes: 45 additions & 6 deletions x-pack/plugins/reporting/server/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
BasePath,
ElasticsearchServiceSetup,
KibanaRequest,
PluginInitializerContext,
SavedObjectsClientContract,
SavedObjectsServiceStart,
UiSettingsServiceStart,
Expand All @@ -21,12 +22,14 @@ import { LicensingPluginSetup } from '../../licensing/server';
import { SecurityPluginSetup } from '../../security/server';
import { DEFAULT_SPACE_ID } from '../../spaces/common/constants';
import { SpacesPluginSetup } from '../../spaces/server';
import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server';
import { ReportingConfig } from './';
import { HeadlessChromiumDriverFactory } from './browsers/chromium/driver_factory';
import { ReportingConfigType } from './config';
import { checkLicense, getExportTypesRegistry, LevelLogger } from './lib';
import { ESQueueInstance } from './lib/create_queue';
import { screenshotsObservableFactory, ScreenshotsObservableFn } from './lib/screenshots';
import { ReportingStore } from './lib/store';
import { ExecuteReportTask, MonitorReportsTask, ReportTaskParams } from './lib/tasks';
import { ReportingPluginRouter } from './types';

export interface ReportingInternalSetup {
Expand All @@ -37,14 +40,15 @@ export interface ReportingInternalSetup {
licensing: LicensingPluginSetup;
security?: SecurityPluginSetup;
spaces?: SpacesPluginSetup;
taskManager: TaskManagerSetupContract;
}

export interface ReportingInternalStart {
browserDriverFactory: HeadlessChromiumDriverFactory;
esqueue: ESQueueInstance;
store: ReportingStore;
savedObjects: SavedObjectsServiceStart;
uiSettings: UiSettingsServiceStart;
taskManager: TaskManagerStartContract;
}

export class ReportingCore {
Expand All @@ -53,24 +57,43 @@ export class ReportingCore {
private readonly pluginSetup$ = new Rx.ReplaySubject<boolean>(); // observe async background setupDeps and config each are done
private readonly pluginStart$ = new Rx.ReplaySubject<ReportingInternalStart>(); // observe async background startDeps
private exportTypesRegistry = getExportTypesRegistry();
private executeTask: ExecuteReportTask;
private monitorTask: MonitorReportsTask;
private config?: ReportingConfig;
private executing: Set<string>;

constructor(private logger: LevelLogger) {}
constructor(private logger: LevelLogger, context: PluginInitializerContext<ReportingConfigType>) {
const config = context.config.get<ReportingConfigType>();
this.executeTask = new ExecuteReportTask(this, config, this.logger);
this.monitorTask = new MonitorReportsTask(this, config, this.logger);
this.executing = new Set();
}

/*
* Register setupDeps
*/
public pluginSetup(setupDeps: ReportingInternalSetup) {
this.pluginSetup$.next(true); // trigger the observer
this.pluginSetupDeps = setupDeps; // cache

const { executeTask, monitorTask } = this;
setupDeps.taskManager.registerTaskDefinitions({
[executeTask.TYPE]: executeTask.getTaskDefinition(),
[monitorTask.TYPE]: monitorTask.getTaskDefinition(),
});
}

/*
* Register startDeps
*/
public pluginStart(startDeps: ReportingInternalStart) {
public async pluginStart(startDeps: ReportingInternalStart) {
this.pluginStart$.next(startDeps); // trigger the observer
this.pluginStartDeps = startDeps; // cache

const { taskManager } = startDeps;
const { executeTask, monitorTask } = this;
// enable this instance to generate reports and to monitor for pending reports
await Promise.all([executeTask.init(taskManager), monitorTask.init(taskManager)]);
}

/*
Expand Down Expand Up @@ -151,8 +174,12 @@ export class ReportingCore {
return this.exportTypesRegistry;
}

public async getEsqueue() {
return (await this.getPluginStartDeps()).esqueue;
public async scheduleTask(report: ReportTaskParams) {
return await this.executeTask.scheduleTask(report);
}

public async getStore() {
return (await this.getPluginStartDeps()).store;
}

public async getLicenseInfo() {
Expand Down Expand Up @@ -239,4 +266,16 @@ export class ReportingCore {
const savedObjectsClient = await this.getSavedObjectsClient(request);
return await this.getUiSettingsServiceFactory(savedObjectsClient);
}

public trackReport(reportId: string) {
this.executing.add(reportId);
}

public untrackReport(reportId: string) {
this.executing.delete(reportId);
}

public countConcurrentReports(): number {
return this.executing.size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* 2.0.
*/

import { CSV_JOB_TYPE_DEPRECATED } from '../../../common/constants';
import { cryptoFactory } from '../../lib';
import { CreateJobFn, CreateJobFnFactory } from '../../types';
import {
Expand All @@ -16,9 +15,7 @@ import {

export const createJobFnFactory: CreateJobFnFactory<
CreateJobFn<JobParamsDeprecatedCSV, TaskPayloadDeprecatedCSV>
> = function createJobFactoryFn(reporting, parentLogger) {
const logger = parentLogger.clone([CSV_JOB_TYPE_DEPRECATED, 'create-job']);

> = function createJobFactoryFn(reporting, logger) {
const config = reporting.getConfig();
const crypto = cryptoFactory(config.get('encryptionKey'));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { CONTENT_TYPE_CSV, CSV_JOB_TYPE_DEPRECATED } from '../../../common/constants';
import { CONTENT_TYPE_CSV } from '../../../common/constants';
import { RunTaskFn, RunTaskFnFactory } from '../../types';
import { decryptJobHeaders } from '../common';
import { createGenerateCsv } from './generate_csv';
Expand All @@ -18,7 +18,7 @@ export const runTaskFnFactory: RunTaskFnFactory<

return async function runTask(jobId, job, cancellationToken) {
const elasticsearch = reporting.getElasticsearchService();
const logger = parentLogger.clone([CSV_JOB_TYPE_DEPRECATED, 'execute-job', jobId]);
const logger = parentLogger.clone([jobId]);
const generateCsv = createGenerateCsv(logger);

const encryptionKey = config.get('encryptionKey');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import { notFound, notImplemented } from '@hapi/boom';
import { get } from 'lodash';
import { CSV_FROM_SAVEDOBJECT_JOB_TYPE } from '../../../common/constants';
import { CsvFromSavedObjectRequest } from '../../routes/generate_from_savedobject_immediate';
import type { ReportingRequestHandlerContext } from '../../types';
import { CreateJobFnFactory } from '../../types';
import {
JobParamsPanelCsv,
Expand All @@ -18,7 +18,6 @@ import {
SavedObjectServiceError,
VisObjectAttributesJSON,
} from './types';
import type { ReportingRequestHandlerContext } from '../../types';

export type ImmediateCreateJobFn = (
jobParams: JobParamsPanelCsv,
Expand All @@ -30,7 +29,7 @@ export const createJobFnFactory: CreateJobFnFactory<ImmediateCreateJobFn> = func
reporting,
parentLogger
) {
const logger = parentLogger.clone([CSV_FROM_SAVEDOBJECT_JOB_TYPE, 'create-job']);
const logger = parentLogger.clone(['create-job']);

return async function createJob(jobParams, context, req) {
const { savedObjectType, savedObjectId } = jobParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import { KibanaRequest } from 'src/core/server';
import { CancellationToken } from '../../../common';
import { CONTENT_TYPE_CSV, CSV_FROM_SAVEDOBJECT_JOB_TYPE } from '../../../common/constants';
import { CONTENT_TYPE_CSV } from '../../../common/constants';
import { TaskRunResult } from '../../lib/tasks';
import type { ReportingRequestHandlerContext } from '../../types';
import { RunTaskFnFactory } from '../../types';
import { createGenerateCsv } from '../csv/generate_csv';
import { getGenerateCsvParams } from './lib/get_csv_job';
import { JobPayloadPanelCsv } from './types';
import type { ReportingRequestHandlerContext } from '../../types';

/*
* ImmediateExecuteFn receives the job doc payload because the payload was
Expand All @@ -31,7 +31,7 @@ export const runTaskFnFactory: RunTaskFnFactory<ImmediateExecuteFn> = function e
parentLogger
) {
const config = reporting.getConfig();
const logger = parentLogger.clone([CSV_FROM_SAVEDOBJECT_JOB_TYPE, 'execute-job']);
const logger = parentLogger.clone(['execute-job']);

return async function runTask(jobId, jobPayload, context, req) {
const generateCsv = createGenerateCsv(logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
* 2.0.
*/

import { PNG_JOB_TYPE } from '../../../../common/constants';
import { cryptoFactory } from '../../../lib';
import { CreateJobFn, CreateJobFnFactory } from '../../../types';
import { validateUrls } from '../../common';
import { JobParamsPNG, TaskPayloadPNG } from '../types';

export const createJobFnFactory: CreateJobFnFactory<
CreateJobFn<JobParamsPNG, TaskPayloadPNG>
> = function createJobFactoryFn(reporting, parentLogger) {
const logger = parentLogger.clone([PNG_JOB_TYPE, 'execute-job']);
> = function createJobFactoryFn(reporting, logger) {
const config = reporting.getConfig();
const crypto = cryptoFactory(config.get('encryptionKey'));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
* 2.0.
*/

import { PDF_JOB_TYPE } from '../../../../common/constants';
import { cryptoFactory } from '../../../lib';
import { CreateJobFn, CreateJobFnFactory } from '../../../types';
import { validateUrls } from '../../common';
import { JobParamsPDF, TaskPayloadPDF } from '../types';

export const createJobFnFactory: CreateJobFnFactory<
CreateJobFn<JobParamsPDF, TaskPayloadPDF>
> = function createJobFactoryFn(reporting, parentLogger) {
> = function createJobFactoryFn(reporting, logger) {
const config = reporting.getConfig();
const crypto = cryptoFactory(config.get('encryptionKey'));
const logger = parentLogger.clone([PDF_JOB_TYPE, 'create-job']);

return async function createJob(
{ title, relativeUrls, browserTimezone, layout, objectType },
Expand Down
75 changes: 0 additions & 75 deletions x-pack/plugins/reporting/server/lib/create_queue.ts

This file was deleted.

Loading

0 comments on commit 9fef424

Please sign in to comment.