Skip to content

Commit

Permalink
[Streams] Dashboard linking (elastic#204309)
Browse files Browse the repository at this point in the history
Links dashboard to Streams.

Changes:
- Introduces `IndexStorageAdapter` to manage ES indices - see
https://github.com/dgieselaar/kibana/blob/streams-app-asset-linking/x-pack/solutions/observability/packages/utils_server/es/storage/README.md
for motivation
- Introduces `AssetClient` and `AssetService` to manage asset links with
`IndexStorageAdapter`
- `RepositorySupertestClient` to make it easier to use
`@kbn/server-route-repository` with FTR tests
- refactors related to above changes

---------

Co-authored-by: Chris Cowan <[email protected]>
Co-authored-by: Joe Reuter <[email protected]>
Co-authored-by: kibanamachine <[email protected]>
Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
5 people authored Jan 7, 2025
1 parent 8a9202e commit 28414ce
Show file tree
Hide file tree
Showing 64 changed files with 3,834 additions and 262 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,7 @@ packages/kbn-monaco/src/esql @elastic/kibana-esql
/x-pack/solutions/observability/plugins/infra/server/routes/log_analysis @elastic/obs-ux-logs-team
/x-pack/solutions/observability/plugins/infra/server/services/rules @elastic/obs-ux-infra_services-team @elastic/obs-ux-logs-team
/x-pack/test/common/utils/synthtrace @elastic/obs-ux-infra_services-team @elastic/obs-ux-logs-team # Assigned per https://github.com/elastic/kibana/blob/main/packages/kbn-apm-synthtrace/kibana.jsonc#L5
/x-pack/test/common/utils/server_route_repository @elastic/obs-knowledge-team

# Infra Monitoring tests
/x-pack/test/common/services/infra_synthtrace_kibana_client.ts @elastic/obs-ux-infra_services-team
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

import { get } from 'lodash';

export function isRequestAbortedError(error: unknown): error is Error {
export function isRequestAbortedError(error: unknown): error is Error & { name: 'AbortError' } {
return get(error, 'name') === 'AbortError';
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ type DecodedRequestParamsOfType<TRouteParamsRT extends RouteParamsRT> =
: never;

export type EndpointOf<TServerRouteRepository extends ServerRouteRepository> =
keyof TServerRouteRepository;
keyof TServerRouteRepository & string;

export type ReturnOf<
TServerRouteRepository extends ServerRouteRepository,
Expand Down
8 changes: 2 additions & 6 deletions x-pack/packages/kbn-streams-schema/src/helpers/type_guards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,11 @@ export function isStream(subject: any): subject is StreamDefinition {
return isSchema(streamDefintionSchema, subject);
}

export function isIngestStream(
subject: IngestStreamDefinition | WiredStreamDefinition
): subject is IngestStreamDefinition {
export function isIngestStream(subject: StreamDefinition): subject is IngestStreamDefinition {
return isSchema(ingestStreamDefinitonSchema, subject);
}

export function isWiredStream(
subject: IngestStreamDefinition | WiredStreamDefinition
): subject is WiredStreamDefinition {
export function isWiredStream(subject: StreamDefinition): subject is WiredStreamDefinition {
return isSchema(wiredStreamDefinitonSchema, subject);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const ingestStreamDefinitonSchema = z
name: z.string(),
elasticsearch_assets: z.optional(elasticsearchAssetSchema),
stream: ingestStreamConfigDefinitonSchema,
dashboards: z.optional(z.array(z.string())),
})
.strict();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const wiredStreamDefinitonSchema = z
name: z.string(),
elasticsearch_assets: z.optional(elasticsearchAssetSchema),
stream: wiredStreamConfigDefinitonSchema,
dashboards: z.optional(z.array(z.string())),
})
.strict();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

import { schema } from '@kbn/config-schema';

const savedObjectReferenceSchema = schema.object({
type: schema.string(),
id: schema.string(),
});

export const findRulesOptionsSchema = schema.object(
{
perPage: schema.maybe(schema.number()),
Expand All @@ -19,10 +24,7 @@ export const findRulesOptionsSchema = schema.object(
sortField: schema.maybe(schema.string()),
sortOrder: schema.maybe(schema.oneOf([schema.literal('asc'), schema.literal('desc')])),
hasReference: schema.maybe(
schema.object({
type: schema.string(),
id: schema.string(),
})
schema.oneOf([savedObjectReferenceSchema, schema.arrayOf(savedObjectReferenceSchema)])
),
fields: schema.maybe(schema.arrayOf(schema.string())),
filter: schema.maybe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { esqlResultToPlainObjects } from '../esql_result_to_plain_objects';
type SearchRequest = ESSearchRequest & {
index: string | string[];
track_total_hits: number | boolean;
size: number | boolean;
size: number;
};

export interface EsqlOptions {
Expand Down Expand Up @@ -112,10 +112,12 @@ export function createObservabilityEsClient({
client,
logger,
plugin,
labels,
}: {
client: ElasticsearchClient;
logger: Logger;
plugin: string;
plugin?: string;
labels?: Record<string, string>;
}): ObservabilityElasticsearchClient {
// wraps the ES calls in a named APM span for better analysis
// (otherwise it would just eg be a _search span)
Expand All @@ -129,7 +131,8 @@ export function createObservabilityEsClient({
{
name: operationName,
labels: {
plugin,
...labels,
...(plugin ? { plugin } : {}),
},
},
callback,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Storage adapter

Storage adapters are an abstraction for managing & writing data into Elasticsearch, from Kibana plugins.

There are several ways one can use Elasticsearch in Kibana, for instance:

- a simple id-based CRUD table
- timeseries data with regular indices
- timeseries data with data streams

But then there are many choices to be made that make this a very complex problem:

- Elasticsearch asset managmeent
- Authentication
- Schema changes
- Kibana's distributed nature
- Stateful versus serverless

The intent of storage adapters is to come up with an abstraction that allows Kibana developers to have a common interface for writing to and reading data from Elasticsearch. For instance, for setting up your data store, it should not matter how you authenticate (internal user? current user? API keys?).

## Saved objects

Some of these problems are solved by Saved Objects. But Saved Objects come with a lot of baggage - Kibana RBAC, relationships, spaces, all of which might not be
needed for your use case but are still restrictive. One could consider Saved Objects to be the target of an adapter, but Storage Adapters aim to address a wider set of use-cases.

## Philosophy

Storage adapters should largely adhere to the following principles:

- Interfaces are as close to Elasticsearch as possible. Meaning, the `search` method is practically a pass-through for `_search`.
- Strongly-typed. TypeScript types are inferred from the schema. This makes it easy to create fully-typed clients for any storage.
- Lazy writes. No Elasticsearch assets (templates, indices, aliases) get installed unless necessary. Anything that gets persisted to Elasticsearch raises questions (in SDHs, UIs, APIs) and should be avoided when possible. This also helps avoidable upgrade issues (e.g. conflicting mappings for something that never got used).
- Recoverable. If somehow Elasticsearch assets get borked, the adapters should make a best-effort attempt to recover, or log warnings with clear remediation steps.

## Future goals

Currently, we only have the StorageIndexAdapter which writes to plain indices. In the future, we'll want more:

- A StorageDataStreamAdapter or StorageSavedObjectAdapter
- Federated search
- Data/Index Lifecycle Management
- Migration scripts
- Runtime mappings for older versions

## Usage

### Storage index adapter

To use the storage index adapter, instantiate it with an authenticated Elasticsearch client:

```ts
const storageSettings = {
name: '.kibana_streams_assets',
schema: {
properties: {
[ASSET_ASSET_ID]: types.keyword({ required: true }),
[ASSET_TYPE]: types.enum(Object.values(ASSET_TYPES), { required: true }),
},
},
} satisfies IndexStorageSettings;

// create and configure the adapter
const adapter = new StorageIndexAdapter(
esClient: coreStart.elasticsearch.client.asInternalUser,
this.logger.get('assets'),
storageSettings
);

// get the client (its interface is shared across all adapters)
const client = adapter.getClient();

const response = await client.search('operation_name', {
track_total_hits: true
});

```
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import stringify from 'json-stable-stringify';
import objectHash from 'object-hash';
import { IndexStorageSettings } from '.';

export function getSchemaVersion(storage: IndexStorageSettings): string {
const version = objectHash(stringify(storage.schema.properties));
return version;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type {
BulkOperationContainer,
BulkRequest,
BulkResponse,
DeleteRequest,
DeleteResponse,
IndexRequest,
IndexResponse,
SearchRequest,
} from '@elastic/elasticsearch/lib/api/types';
import { InferSearchResponseOf } from '@kbn/es-types';
import { StorageFieldTypeOf, StorageMappingProperty } from './types';

interface StorageSchemaProperties {
[x: string]: StorageMappingProperty;
}

export interface StorageSchema {
properties: StorageSchemaProperties;
}

interface StorageSettingsBase {
schema: StorageSchema;
}

export interface IndexStorageSettings extends StorageSettingsBase {
name: string;
}

export type StorageSettings = IndexStorageSettings;

export type StorageAdapterSearchRequest = Omit<SearchRequest, 'index'>;
export type StorageAdapterSearchResponse<
TDocument,
TSearchRequest extends Omit<SearchRequest, 'index'>
> = InferSearchResponseOf<TDocument, TSearchRequest>;

export type StorageAdapterBulkOperation = Pick<BulkOperationContainer, 'delete' | 'index'>;

export type StorageAdapterBulkRequest<TDocument extends Record<string, any>> = Omit<
BulkRequest,
'operations' | 'index'
> & {
operations: Array<StorageAdapterBulkOperation | TDocument>;
};
export type StorageAdapterBulkResponse = BulkResponse;

export type StorageAdapterDeleteRequest = DeleteRequest;
export type StorageAdapterDeleteResponse = DeleteResponse;

export type StorageAdapterIndexRequest<TDocument = unknown> = Omit<
IndexRequest<TDocument>,
'index'
>;
export type StorageAdapterIndexResponse = IndexResponse;

export interface IStorageAdapter<TStorageSettings extends StorageSettings = never> {
bulk<TDocument extends Record<string, any>>(
request: StorageAdapterBulkRequest<TDocument>
): Promise<StorageAdapterBulkResponse>;
search<TDocument, TSearchRequest extends Omit<SearchRequest, 'index'>>(
request: StorageAdapterSearchRequest
): Promise<StorageAdapterSearchResponse<TDocument, TSearchRequest>>;
index<TDocument>(
request: StorageAdapterIndexRequest<TDocument>
): Promise<StorageAdapterIndexResponse>;
delete(request: StorageAdapterDeleteRequest): Promise<StorageAdapterDeleteResponse>;
}

export type StorageSettingsOf<TStorageAdapter extends IStorageAdapter<StorageSettings>> =
TStorageAdapter extends IStorageAdapter<infer TStorageSettings>
? TStorageSettings extends StorageSettings
? TStorageSettings
: never
: never;

export type StorageDocumentOf<TStorageSettings extends StorageSettings> = {
[TKey in keyof TStorageSettings['schema']['properties']]: StorageFieldTypeOf<
TStorageSettings['schema']['properties'][TKey]
>;
} & { _id: string };

export { StorageIndexAdapter } from './index_adapter';
export { StorageClient } from './storage_client';
export { types } from './types';
Loading

0 comments on commit 28414ce

Please sign in to comment.