Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk helper document manipulation #1209

Closed
delvedor opened this issue May 29, 2020 · 3 comments · Fixed by #1732
Closed

Bulk helper document manipulation #1209

delvedor opened this issue May 29, 2020 · 3 comments · Fixed by #1732
Assignees

Comments

@delvedor
Copy link
Member

Currently, you can't manipulate the document that gets passed to the bulk helper, if you need to do that, you should pipe the datasource in a transform stream or inside an async generator before giving it to the bulk helper:

const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

async function * manipulate (stream) {
  for await (const chunk of stream) {
    // chunk manipulation
    yield chunk
  }
}

const stream = createReadStream('./dataset.ndjson').pipe(split())

const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
  datasource: manipulate(stream),
  onDocument (doc) {
    return {
      index: { _index: 'my-index' }
    }
  }
})

While the solution above works perfectly, it does not support the case where the document contains metadata that does not need to be indexed, such as the index name for example:

const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

// assuming the document has the following shape:
// { index: string, id: string, body: object }

const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
  datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)),
  onDocument (doc) {
    return {
      index: { _index: doc.index, _id: doc.id }
    }
  }
})

In the code above we are indexing the document metadata as well.
For solving this problem, we could introduce another callback, to allow the document manipulation after the creation of the bulk action:

const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

// assuming the document has the following shape:
// { index: string, id: string, body: object }

const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
  datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)),
  onDocument (doc) {
    return {
      index: { _index: doc.index, _id: doc.id }
    }
  },
  transformDocument (doc) {
    return doc.body
  }
})

Another solution, but it could cause a breaking change in some cases (which is not a big deal given that the helpers are still experimental):

const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

// assuming the document has the following shape:
// { index: string, id: string, body: object }

const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
  datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)),
  onDocument (doc) {
    return {
      index: { _index: doc.index, _id: doc.id },
      document: doc.body
    }
  }
})
@SeanBarry
Copy link

SeanBarry commented Sep 16, 2020

This is an issue I have come across too. I think the last proposed solution would work nicely. Searching for something similar in the documentation is what led me to here.

return {
      index: { _index: doc.index, _id: doc.id },
      document: doc.body
 }

@baronworks
Copy link

Was trying to figure out a way myself to transform the doc, and if using split2 there is an easy way to do it by passing in your own function:

datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse))

replace the JSON.parse with your own function

datasource: createReadStream('./dataset.ndjson').pipe(split(myFunction))

...
const myFunction = function(doc) {    
  let myDoc = doc.body;
  // parse and/or manipulate as needed, here just grabbing the doc.body
  return myDoc ;
}

Mpdreamz added a commit to Mpdreamz/kibana that referenced this issue Oct 18, 2022
…y<TFields>`'s that are expected to be sent over the wire and ones that simply exist as a intermediary concept.

This is now modeled as `Entity<TFields>`'s being `things` that produce `Signal<TFields>`'s.

e.g `Service` is an `Entity` that itself can't emit signals but we can create an `Instance` entity from a service that can emit `Span`/`Transaction`/`Error` signals.

This was effectively how the DSL already worked:

```js
var entity = apm.service(`opbeans-go-${index}`, ENVIRONMENT, 'go').instance('instance')
var signal = instance
            .transaction(transactionName)
            .timestamp(timestamp)
```

In fact this PR makes 0 changes to the existing DSL it just models it more explicitly.

The major benefit of having wrapped `TFields` in a `Signal<TFields>` is that a signal can take more ownership of how it gets send over the wire and where.

```ts
abstract class Signal<TFields extends Fields> extends Entity<TFields> {
  /** Each signal should indicate where it intends to write to */
  abstract getWriteTarget(): string | undefined;

  /**
   * Yields the signal(s) to the stream processor
   * A signal may emit 0-Many signals depending on the use case its modelling
   */
  yieldSignals(): Array<Signal<TFields>> {
    return [this];
  }

  /** Controls how the document is written to Elasticsearch */
  toDocument(): Record<string, any> {
    const newDoc: Record<string, any> = {};
    dedot(this.fields, newDoc);
    if (typeof newDoc['@timestamp'] === 'number') {
      const timestamp = newDoc['@timestamp'];
      newDoc['@timestamp'] = new Date(timestamp).toISOString();
    }
    return newDoc;
  }
}
```

In later PR's this can be extended to control the bulk action and more features such as being able to (arbitrarily) skip signals from being written at all.

In order to support `Signal<TFields>` I needed to write a custom Elasticsearch js client serializer. This is because the `bulk<TDocument = Signal<TFields>>(options)` helper has no build in way to unwrap `TDocument` just before serializing in the `onDocument` callback that sets up the `BulkAction`.

see: elastic/elasticsearch-js#1209
see: elastic/elasticsearch-js#1610

In order to make this easier we tag each instance of Signal:

```ts
const isSignal = Symbol('signal');
export abstract class Signal<TFields extends Fields> extends Entity<TFields> {
  constructor(fields: TFields) {
    super(fields);
    this[isSignal] = true;
  }

  public static IsSignal = isSignal;
  public [isSignal]: boolean | null;
}
```

So that the serializer only unwraps `Signal` and all others just fallback to the default serializer.

Two other changes in this PR were needed to facilitate scrubbing all APM specifics from `StreamProcessor`.

A scenario now declares which writeTargets it expects:

```ts
const scenario: Scenario<AgentConfigFields> = async (options: ScenarioOptions) => {
  const logger = getLogger(options);

  return {
    writeTargets: BaseApmSignal.WriteTargets,
    mapToIndex: (signal) => "x"
    generate:  (options: ScenarioOptions) => { }
  };
}
```

This informs the commandline interface option `--clean` how to clean the scenario. This helps to keep `StreamProcessor` generic without specific APM knowledge. `StreamProcessor` now also tracks all the places it wrote too and refreshes only those if `refreshAfterIndex: true`.

`mapToIndex` allows the scenario to overwrite any defaults for the `Signal<TFields>.getWriteTarget()`. This callback is invoked per signal. This allows a scenario to take full ownership for all or some signals where the data is written.
@JoshMock
Copy link
Member

This improvement, introduced in #1732 by @robdasilva, will be released in 8.8.2 and 8.9.0 in a few days.

#1951 has docs that detail how to use this improvement. Existing usage of onDocument will continue to work as expected. 🏆

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants