Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] [Fleet] Migrate ES client (#92805) #94051

Merged
merged 1 commit into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api_docs/fleet.json
Original file line number Diff line number Diff line change
Expand Up @@ -2453,7 +2453,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/fleet/server/services/package_policy.ts",
"lineNumber": 594
"lineNumber": 592
},
"signature": [
"PackagePolicyService"
Expand Down
3 changes: 1 addition & 2 deletions x-pack/plugins/fleet/server/routes/agent_policy/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ export const createAgentPolicyHandler: RequestHandler<
> = async (context, request, response) => {
const soClient = context.core.savedObjects.client;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const user = (await appContextService.getSecurity()?.authc.getCurrentUser(request)) || undefined;
const withSysMonitoring = request.query.sys_monitoring ?? false;
try {
Expand All @@ -130,7 +129,7 @@ export const createAgentPolicyHandler: RequestHandler<
if (withSysMonitoring && newSysPackagePolicy !== undefined && agentPolicy !== undefined) {
newSysPackagePolicy.policy_id = agentPolicy.id;
newSysPackagePolicy.namespace = agentPolicy.namespace;
await packagePolicyService.create(soClient, esClient, callCluster, newSysPackagePolicy, {
await packagePolicyService.create(soClient, esClient, newSysPackagePolicy, {
user,
bumpRevision: false,
});
Expand Down
78 changes: 38 additions & 40 deletions x-pack/plugins/fleet/server/routes/data_streams/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,63 +16,59 @@ import { defaultIngestErrorHandler } from '../../errors';

const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*';

interface ESDataStreamInfoResponse {
data_streams: Array<{
interface ESDataStreamInfo {
name: string;
timestamp_field: {
name: string;
timestamp_field: {
};
indices: Array<{ index_name: string; index_uuid: string }>;
generation: number;
_meta?: {
package?: {
name: string;
};
indices: Array<{ index_name: string; index_uuid: string }>;
generation: number;
_meta?: {
package?: {
name: string;
};
managed_by?: string;
managed?: boolean;
[key: string]: any;
};
status: string;
template: string;
ilm_policy: string;
hidden: boolean;
}>;
managed_by?: string;
managed?: boolean;
[key: string]: any;
};
status: string;
template: string;
ilm_policy: string;
hidden: boolean;
}

interface ESDataStreamStatsResponse {
data_streams: Array<{
data_stream: string;
backing_indices: number;
store_size_bytes: number;
maximum_timestamp: number;
}>;
interface ESDataStreamStats {
data_stream: string;
backing_indices: number;
store_size_bytes: number;
maximum_timestamp: number;
}

export const getListHandler: RequestHandler = async (context, request, response) => {
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const esClient = context.core.elasticsearch.client.asCurrentUser;

const body: GetDataStreamsResponse = {
data_streams: [],
};

try {
// Get matching data streams, their stats, and package SOs
const [
{ data_streams: dataStreamsInfo },
{ data_streams: dataStreamStats },
{
body: { data_streams: dataStreamsInfo },
},
{
body: { data_streams: dataStreamStats },
},
packageSavedObjects,
] = await Promise.all([
callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}`,
}) as Promise<ESDataStreamInfoResponse>,
callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}/_stats`,
}) as Promise<ESDataStreamStatsResponse>,
esClient.indices.getDataStream({ name: DATA_STREAM_INDEX_PATTERN }),
esClient.indices.dataStreamsStats({ name: DATA_STREAM_INDEX_PATTERN }),
getPackageSavedObjects(context.core.savedObjects.client),
]);
const dataStreamsInfoByName = keyBy(dataStreamsInfo, 'name');
const dataStreamsStatsByName = keyBy(dataStreamStats, 'data_stream');

const dataStreamsInfoByName = keyBy<ESDataStreamInfo>(dataStreamsInfo, 'name');
const dataStreamsStatsByName = keyBy<ESDataStreamStats>(dataStreamStats, 'data_stream');

// Combine data stream info
const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName);
Expand All @@ -99,8 +95,10 @@ export const getListHandler: RequestHandler = async (context, request, response)

// Query backing indices to extract data stream dataset, namespace, and type values
const {
aggregations: { dataset, namespace, type },
} = await callCluster('search', {
body: {
aggregations: { dataset, namespace, type },
},
} = await esClient.search({
index: dataStream.indices.map((index) => index.index_name),
body: {
size: 0,
Expand Down
18 changes: 9 additions & 9 deletions x-pack/plugins/fleet/server/routes/epm/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ export const installPackageFromRegistryHandler: RequestHandler<
TypeOf<typeof InstallPackageFromRegistryRequestSchema.body>
> = async (context, request, response) => {
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const { pkgkey } = request.params;
const { pkgName, pkgVersion } = splitPkgKey(pkgkey);
const installedPkg = await getInstallationObject({ savedObjectsClient, pkgName });
Expand All @@ -235,7 +235,7 @@ export const installPackageFromRegistryHandler: RequestHandler<
installSource: 'registry',
savedObjectsClient,
pkgkey,
callCluster,
esClient,
force: request.body?.force,
});
const body: InstallPackageResponse = {
Expand All @@ -250,7 +250,7 @@ export const installPackageFromRegistryHandler: RequestHandler<
pkgName,
pkgVersion,
installedPkg,
callCluster,
esClient,
});

return defaultResult;
Expand Down Expand Up @@ -278,10 +278,10 @@ export const bulkInstallPackagesFromRegistryHandler: RequestHandler<
TypeOf<typeof BulkUpgradePackagesFromRegistryRequestSchema.body>
> = async (context, request, response) => {
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const bulkInstalledResponses = await bulkInstallPackages({
savedObjectsClient,
callCluster,
esClient,
packagesToUpgrade: request.body.packages,
});
const payload = bulkInstalledResponses.map(bulkInstallServiceResponseToHttpEntry);
Expand All @@ -303,14 +303,14 @@ export const installPackageByUploadHandler: RequestHandler<
});
}
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const contentType = request.headers['content-type'] as string; // from types it could also be string[] or undefined but this is checked later
const archiveBuffer = Buffer.from(request.body);
try {
const res = await installPackage({
installSource: 'upload',
savedObjectsClient,
callCluster,
esClient,
archiveBuffer,
contentType,
});
Expand All @@ -329,8 +329,8 @@ export const deletePackageHandler: RequestHandler<
try {
const { pkgkey } = request.params;
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const res = await removeInstallation({ savedObjectsClient, pkgkey, callCluster });
const esClient = context.core.elasticsearch.client.asCurrentUser;
const res = await removeInstallation({ savedObjectsClient, pkgkey, esClient });
const body: DeletePackageResponse = {
response: res,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jest.mock('../../services/package_policy', (): {
compilePackagePolicyInputs: jest.fn((packageInfo, dataInputs) => Promise.resolve(dataInputs)),
buildPackagePolicyFromPackage: jest.fn(),
bulkCreate: jest.fn(),
create: jest.fn((soClient, esClient, callCluster, newData) =>
create: jest.fn((soClient, esClient, newData) =>
Promise.resolve({
...newData,
inputs: newData.inputs.map((input) => ({
Expand Down Expand Up @@ -204,7 +204,8 @@ describe('When calling package policy', () => {
);
await routeHandler(context, request, response);
expect(response.ok).toHaveBeenCalled();
expect(packagePolicyServiceMock.create.mock.calls[0][3]).toEqual({

expect(packagePolicyServiceMock.create.mock.calls[0][2]).toEqual({
policy_id: 'a5ca00c0-b30c-11ea-9732-1bb05811278c',
description: '',
enabled: true,
Expand Down
13 changes: 3 additions & 10 deletions x-pack/plugins/fleet/server/routes/package_policy/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ export const createPackagePolicyHandler: RequestHandler<
> = async (context, request, response) => {
const soClient = context.core.savedObjects.client;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const user = (await appContextService.getSecurity()?.authc.getCurrentUser(request)) || undefined;
try {
const newData = await packagePolicyService.runExternalCallbacks(
Expand All @@ -90,15 +89,9 @@ export const createPackagePolicyHandler: RequestHandler<
);

// Create package policy
const packagePolicy = await packagePolicyService.create(
soClient,
esClient,
callCluster,
newData,
{
user,
}
);
const packagePolicy = await packagePolicyService.create(soClient, esClient, newData, {
user,
});
const body: CreatePackagePolicyResponse = { item: packagePolicy };
return response.ok({
body,
Expand Down
8 changes: 3 additions & 5 deletions x-pack/plugins/fleet/server/routes/setup/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ export const createFleetSetupHandler: RequestHandler<
try {
const soClient = context.core.savedObjects.client;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
await setupIngestManager(soClient, esClient, callCluster);
await setupFleet(soClient, esClient, callCluster, {
await setupIngestManager(soClient, esClient);
await setupFleet(soClient, esClient, {
forceRecreate: request.body?.forceRecreate ?? false,
});

Expand All @@ -80,11 +79,10 @@ export const createFleetSetupHandler: RequestHandler<
export const FleetSetupHandler: RequestHandler = async (context, request, response) => {
const soClient = context.core.savedObjects.client;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;

try {
const body: PostIngestSetupResponse = { isInitialized: true };
await setupIngestManager(soClient, esClient, callCluster);
await setupIngestManager(soClient, esClient);
return response.ok({
body,
});
Expand Down
11 changes: 0 additions & 11 deletions x-pack/plugins/fleet/server/services/api_keys/security.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type { Request } from '@hapi/hapi';
import { KibanaRequest } from '../../../../../../src/core/server';
import type { SavedObjectsClientContract } from '../../../../../../src/core/server';
import { FleetAdminUserInvalidError, isESClientError } from '../../errors';
import type { CallESAsCurrentUser } from '../../types';
import { appContextService } from '../app_context';
import { outputService } from '../output';

Expand Down Expand Up @@ -56,16 +55,6 @@ export async function createAPIKey(
throw err;
}
}
export async function authenticate(callCluster: CallESAsCurrentUser) {
try {
await callCluster('transport.request', {
path: '/_security/_authenticate',
method: 'GET',
});
} catch (e) {
throw new Error('ApiKey is not valid: impossible to authenticate user');
}
}

export async function invalidateAPIKeys(soClient: SavedObjectsClientContract, ids: string[]) {
const adminUser = await outputService.getAdminUser(soClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
* 2.0.
*/

import type { SavedObjectsClientContract } from 'kibana/server';
import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server';

import { ElasticsearchAssetType } from '../../../../../common/types/models';
import type {
EsAssetReference,
InstallablePackage,
RegistryDataStream,
} from '../../../../../common/types/models';
import type { CallESAsCurrentUser } from '../../../../types';
import { getInstallation } from '../../packages';
import { saveInstalledEsRefs } from '../../packages/install';
import { getAsset } from '../transform/common';
Expand All @@ -33,7 +32,7 @@ interface IlmPathDataset {
export const installIlmForDataStream = async (
registryPackage: InstallablePackage,
paths: string[],
callCluster: CallESAsCurrentUser,
esClient: ElasticsearchClient,
savedObjectsClient: SavedObjectsClientContract
) => {
const installation = await getInstallation({ savedObjectsClient, pkgName: registryPackage.name });
Expand All @@ -46,7 +45,7 @@ export const installIlmForDataStream = async (

// delete all previous ilm
await deleteIlms(
callCluster,
esClient,
previousInstalledIlmEsAssets.map((asset) => asset.id)
);
// install the latest dataset
Expand Down Expand Up @@ -86,7 +85,7 @@ export const installIlmForDataStream = async (
);

const installationPromises = ilmInstallations.map(async (ilmInstallation) => {
return handleIlmInstall({ callCluster, ilmInstallation });
return handleIlmInstall({ esClient, ilmInstallation });
});

installedIlms = await Promise.all(installationPromises).then((results) => results.flat());
Expand All @@ -111,13 +110,13 @@ export const installIlmForDataStream = async (
};

async function handleIlmInstall({
callCluster,
esClient,
ilmInstallation,
}: {
callCluster: CallESAsCurrentUser;
esClient: ElasticsearchClient;
ilmInstallation: IlmInstallation;
}): Promise<EsAssetReference> {
await callCluster('transport.request', {
await esClient.transport.request({
method: 'PUT',
path: `/_ilm/policy/${ilmInstallation.installationName}`,
body: ilmInstallation.content,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@
* 2.0.
*/

import type { SavedObjectsClientContract } from 'kibana/server';
import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server';

import { ElasticsearchAssetType } from '../../../../types';
import type { CallESAsCurrentUser, EsAssetReference } from '../../../../types';
import type { EsAssetReference } from '../../../../types';
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common/constants';

export const deleteIlms = async (callCluster: CallESAsCurrentUser, ilmPolicyIds: string[]) => {
export const deleteIlms = async (esClient: ElasticsearchClient, ilmPolicyIds: string[]) => {
await Promise.all(
ilmPolicyIds.map(async (ilmPolicyId) => {
await callCluster('transport.request', {
method: 'DELETE',
path: `_ilm/policy/${ilmPolicyId}`,
ignore: [404, 400],
});
await esClient.transport.request(
{
method: 'DELETE',
path: `_ilm/policy/${ilmPolicyId}`,
},
{
ignore: [404, 400],
}
);
})
);
};
Expand Down
Loading