Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Support excluding certain task types from executing #111036

Merged
Merged
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM
- `monitored_stats_running_average_window`- Dictates the size of the window used to calculate the running average of various "Hot" stats. Learn More: [./MONITORING](./MONITORING.MD)
- `monitored_stats_required_freshness` - Dictates the _required freshness_ of critical "Hot" stats. Learn More: [./MONITORING](./MONITORING.MD)
- `monitored_task_execution_thresholds`- Dictates the threshold of failed task executions. Learn More: [./MONITORING](./MONITORING.MD)
- `unsafe.exclude_task_types` - A list of task types to exclude from running. Supports wildcard usage (such as `namespace:*`). This configuration is experimental, unsupported and can only be used for temporary debugging purposes as it will cause Kibana to behave in unexpected ways.
chrisronline marked this conversation as resolved.
Show resolved Hide resolved

## Task definitions

9 changes: 9 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
@@ -37,6 +37,9 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"unsafe": Object {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
}
`);
@@ -93,6 +96,9 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"unsafe": Object {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
}
`);
@@ -141,6 +147,9 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"unsafe": Object {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
}
`);
4 changes: 4 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
@@ -128,6 +128,10 @@ export const configSchema = schema.object(
max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY,
}),
}),
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
chrisronline marked this conversation as resolved.
Show resolved Hide resolved
}),
},
{
validate: (config) => {
Original file line number Diff line number Diff line change
@@ -68,6 +68,9 @@ describe('EphemeralTaskLifecycle', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
...config,
},
elasticsearchAndSOAvailability$,
Original file line number Diff line number Diff line change
@@ -55,6 +55,9 @@ describe('managed configuration', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
});
logger = context.logger.get('taskManager');

Original file line number Diff line number Diff line change
@@ -39,6 +39,9 @@ describe('Configuration Statistics Aggregator', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
};

const managedConfig = {
Original file line number Diff line number Diff line change
@@ -43,6 +43,9 @@ describe('createMonitoringStatsStream', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
};

it('returns the initial config used to configure Task Manager', async () => {
6 changes: 6 additions & 0 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
@@ -42,6 +42,9 @@ describe('TaskManagerPlugin', () => {
enabled: false,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
});

pluginInitializerContext.env.instanceUuid = '';
@@ -82,6 +85,9 @@ describe('TaskManagerPlugin', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
});

const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
9 changes: 8 additions & 1 deletion x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
@@ -117,7 +117,14 @@ export class TaskManagerPlugin
usageCollection,
monitoredHealth$,
this.config.ephemeral_tasks.enabled,
this.config.ephemeral_tasks.request_capacity
this.config.ephemeral_tasks.request_capacity,
this.config.unsafe.exclude_task_types
);
}

if (this.config.unsafe.exclude_task_types.length) {
this.logger.debug(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we've labelled this as unsafe/experimental, wondering if we should elevate this message to an info, or perhaps even warning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to a warning. It would make it clear what's happening without a way to ignore it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

`Excluding task types from execution: ${this.config.unsafe.exclude_task_types.join(', ')}`
);
}

3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
@@ -64,6 +64,9 @@ describe('TaskPollingLifecycle', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
@@ -126,6 +126,7 @@ export class TaskPollingLifecycle {
this.taskClaiming = new TaskClaiming({
taskStore,
maxAttempts: config.max_attempts,
excludedTaskTypes: config.unsafe.exclude_task_types,
definitions,
logger: this.logger,
getCapacity: (taskType?: string) =>
14 changes: 14 additions & 0 deletions x-pack/plugins/task_manager/server/queries/task_claiming.test.ts
Original file line number Diff line number Diff line change
@@ -102,6 +102,7 @@ describe('TaskClaiming', () => {
new TaskClaiming({
logger: taskManagerLogger,
definitions,
excludedTaskTypes: [],
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getCapacity: () => 10,
@@ -119,11 +120,13 @@ describe('TaskClaiming', () => {
taskClaimingOpts = {},
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
}) {
const definitions = storeOpts.definitions ?? taskDefinitions;
const store = taskStoreMock.create({ taskManagerId: storeOpts.taskManagerId });
@@ -151,6 +154,7 @@ describe('TaskClaiming', () => {
logger: taskManagerLogger,
definitions,
taskStore: store,
excludedTaskTypes,
maxAttempts: taskClaimingOpts.maxAttempts ?? 2,
getCapacity: taskClaimingOpts.getCapacity ?? (() => 10),
...taskClaimingOpts,
@@ -165,17 +169,20 @@ describe('TaskClaiming', () => {
claimingOpts,
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
claimingOpts: Omit<OwnershipClaimingOpts, 'size' | 'taskTypes'>;
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
}) {
const getCapacity = taskClaimingOpts.getCapacity ?? (() => 10);
const { taskClaiming, store } = initialiseTestClaiming({
storeOpts,
taskClaimingOpts,
excludedTaskTypes,
hits,
versionConflicts,
});
@@ -264,6 +271,11 @@ describe('TaskClaiming', () => {
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
foobar: {
title: 'foobar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
});

const [
@@ -282,6 +294,7 @@ describe('TaskClaiming', () => {
claimingOpts: {
claimOwnershipUntil: new Date(),
},
excludedTaskTypes: ['foobar'],
});
expect(query).toMatchObject({
bool: {
@@ -1241,6 +1254,7 @@ if (doc['task.runAt'].size()!=0) {
const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
definitions,
excludedTaskTypes: [],
taskStore,
maxAttempts: 2,
getCapacity,
55 changes: 37 additions & 18 deletions x-pack/plugins/task_manager/server/queries/task_claiming.ts
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
* This module contains helpers for managing the task manager storage layer.
*/
import apm from 'elastic-apm-node';
import minimatch from 'minimatch';
import { Subject, Observable, from, of } from 'rxjs';
import { map, mergeScan } from 'rxjs/operators';
import { difference, partition, groupBy, mapValues, countBy, pick, isPlainObject } from 'lodash';
@@ -57,6 +58,7 @@ export interface TaskClaimingOpts {
definitions: TaskTypeDictionary;
taskStore: TaskStore;
maxAttempts: number;
excludedTaskTypes: string[];
getCapacity: (taskType?: string) => number;
}

@@ -115,6 +117,7 @@ export class TaskClaiming {
private logger: Logger;
private readonly taskClaimingBatchesByType: TaskClaimingBatches;
private readonly taskMaxAttempts: Record<string, number>;
private readonly excludedTaskTypes: string[];

/**
* Constructs a new TaskStore.
@@ -130,6 +133,7 @@ export class TaskClaiming {
this.logger = opts.logger;
this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions);
this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions));
this.excludedTaskTypes = opts.excludedTaskTypes;

this.events$ = new Subject<TaskClaim>();
}
@@ -354,6 +358,16 @@ export class TaskClaiming {
};
};

private isTaskTypeExcluded(taskType: string) {
for (const excludedType of this.excludedTaskTypes) {
if (minimatch(taskType, excludedType)) {
return true;
}
}

return false;
}

private async markAvailableTasksAsClaimed({
claimOwnershipUntil,
claimTasksById,
@@ -362,9 +376,11 @@ export class TaskClaiming {
}: OwnershipClaimingOpts): Promise<UpdateByQueryResult> {
const { taskTypesToSkip = [], taskTypesToClaim = [] } = groupBy(
this.definitions.getAllTypes(),
(type) => (taskTypes.has(type) ? 'taskTypesToClaim' : 'taskTypesToSkip')
(type) =>
taskTypes.has(type) && !this.isTaskTypeExcluded(type)
? 'taskTypesToClaim'
: 'taskTypesToSkip'
);

const queryForScheduledTasks = mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
@@ -382,29 +398,32 @@ export class TaskClaiming {
sort.unshift('_score');
}

const query = matchesClauses(
claimTasksById && claimTasksById.length
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
: queryForScheduledTasks,
filterDownBy(InactiveTasks)
);
const script = updateFieldsAndMarkAsFailed(
{
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById || [],
taskTypesToClaim,
taskTypesToSkip,
pick(this.taskMaxAttempts, taskTypesToClaim)
);

const apmTrans = apm.startTransaction(
'markAvailableTasksAsClaimed',
`taskManager markAvailableTasksAsClaimed`
);
try {
const result = await this.taskStore.updateByQuery(
{
query: matchesClauses(
claimTasksById && claimTasksById.length
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
: queryForScheduledTasks,
filterDownBy(InactiveTasks)
),
script: updateFieldsAndMarkAsFailed(
{
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById || [],
taskTypesToClaim,
taskTypesToSkip,
pick(this.taskMaxAttempts, taskTypesToClaim)
),
query,
script,
sort,
},
{
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import { MonitoredHealth } from '../routes/health';
import { TaskPersistence } from '../task_events';
import { registerTaskManagerUsageCollector } from './task_manager_usage_collector';
import { sleep } from '../test_utils';
import { TaskManagerUsage } from './types';

describe('registerTaskManagerUsageCollector', () => {
let collector: Collector<unknown>;
@@ -31,25 +32,45 @@ describe('registerTaskManagerUsageCollector', () => {
return createUsageCollectionSetupMock().makeUsageCollector(config);
});

registerTaskManagerUsageCollector(usageCollectionMock, monitoringStats$, true, 10);
registerTaskManagerUsageCollector(usageCollectionMock, monitoringStats$, true, 10, []);

const mockHealth = getMockMonitoredHealth();
monitoringStats$.next(mockHealth);
await sleep(1001);

expect(usageCollectionMock.makeUsageCollector).toBeCalled();
const telemetry = await collector.fetch(fetchContext);
expect(telemetry).toMatchObject({
ephemeral_tasks_enabled: true,
ephemeral_request_capacity: 10,
ephemeral_stats: {
status: mockHealth.stats.ephemeral?.status,
load: mockHealth.stats.ephemeral?.value.load,
executions_per_cycle: mockHealth.stats.ephemeral?.value.executionsPerCycle,
queued_tasks: mockHealth.stats.ephemeral?.value.queuedTasks,
},
const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage;
expect(telemetry.ephemeral_tasks_enabled).toBe(true);
expect(telemetry.ephemeral_request_capacity).toBe(10);
expect(telemetry.ephemeral_stats).toMatchObject({
status: mockHealth.stats.ephemeral?.status,
load: mockHealth.stats.ephemeral?.value.load,
executions_per_cycle: mockHealth.stats.ephemeral?.value.executionsPerCycle,
queued_tasks: mockHealth.stats.ephemeral?.value.queuedTasks,
});
});

it('should report telemetry on the excluded task types', async () => {
const monitoringStats$ = new Subject<MonitoredHealth>();
const usageCollectionMock = createUsageCollectionSetupMock();
const fetchContext = createCollectorFetchContextWithKibanaMock();
usageCollectionMock.makeUsageCollector.mockImplementation((config) => {
collector = new Collector(logger, config);
return createUsageCollectionSetupMock().makeUsageCollector(config);
});

registerTaskManagerUsageCollector(usageCollectionMock, monitoringStats$, true, 10, [
'actions:*',
]);

const mockHealth = getMockMonitoredHealth();
monitoringStats$.next(mockHealth);
await sleep(1001);

expect(usageCollectionMock.makeUsageCollector).toBeCalled();
const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage;
expect(telemetry.task_type_exclusion).toEqual(['actions:*']);
});
});

function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
Loading