diff --git a/src/helpers.ts b/src/helpers.ts index 7df2016f2..c29f274f2 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -74,11 +74,11 @@ export interface BulkStats { aborted: boolean } -interface IndexAction { +interface IndexActionOperation { index: T.BulkIndexOperation } -interface CreateAction { +interface CreateActionOperation { create: T.BulkCreateOperation } @@ -90,7 +90,9 @@ interface DeleteAction { delete: T.BulkDeleteOperation } -type UpdateAction = [UpdateActionOperation, Record] +type CreateAction = CreateActionOperation | [CreateActionOperation, unknown] +type IndexAction = IndexActionOperation | [IndexActionOperation, unknown] +type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction] type Action = IndexAction | CreateAction | UpdateAction | DeleteAction export interface OnDropDocument { @@ -619,22 +621,21 @@ export default class Helpers { for await (const chunk of datasource) { if (shouldAbort) break timeoutRef.refresh() - const action = onDocument(chunk) - const operation = Array.isArray(action) - ? Object.keys(action[0])[0] - : Object.keys(action)[0] + const result = onDocument(chunk) + const [action, payload] = Array.isArray(result) ? result : [result, chunk] + const operation = Object.keys(action)[0] if (operation === 'index' || operation === 'create') { actionBody = serializer.serialize(action) - payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk) + payloadBody = typeof payload === 'string' + ? payload + : serializer.serialize(payload) chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) bulkBody.push(actionBody, payloadBody) } else if (operation === 'update') { - // @ts-expect-error in case of update action is an array - actionBody = serializer.serialize(action[0]) + actionBody = serializer.serialize(action) payloadBody = typeof chunk === 'string' ? `{"doc":${chunk}}` - // @ts-expect-error in case of update action is an array - : serializer.serialize({ doc: chunk, ...action[1] }) + : serializer.serialize({ doc: chunk, ...payload }) chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) bulkBody.push(actionBody, payloadBody) } else if (operation === 'delete') { diff --git a/test/unit/helpers/bulk.test.ts b/test/unit/helpers/bulk.test.ts index 5a182009e..adf5fb32f 100644 --- a/test/unit/helpers/bulk.test.ts +++ b/test/unit/helpers/bulk.test.ts @@ -785,6 +785,58 @@ test('bulk index', t => { t.end() }) + t.test('Should use payload returned by `onDocument`', async t => { + let count = 0 + const updatedAt = '1970-01-01T12:00:00.000Z' + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + t.equal(params.path, '/_bulk') + t.match(params.headers, { + 'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8', + 'x-elastic-client-meta': `es=${clientVersion},js=${nodeVersion},t=${transportVersion},hc=${nodeVersion},h=bp` + }) + // @ts-expect-error + const [action, payload] = params.body.split('\n') + t.same(JSON.parse(action), { index: { _index: 'test' } }) + t.same(JSON.parse(payload), { ...dataset[count++], updatedAt }) + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (doc) { + t.type(doc.user, 'string') // testing that doc is type of Document + return [ + { + index: { + _index: 'test' + } + }, + { ...doc, updatedAt }] + }, + onDrop (doc) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + t.end() }) @@ -835,6 +887,58 @@ test('bulk create', t => { aborted: false }) }) + + t.test('Should use payload returned by `onDocument`', async t => { + let count = 0 + const updatedAt = '1970-01-01T12:00:00.000Z' + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + t.equal(params.path, '/_bulk') + t.match(params.headers, { 'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8' }) + // @ts-expect-error + const [action, payload] = params.body.split('\n') + t.same(JSON.parse(action), { create: { _index: 'test', _id: count } }) + t.same(JSON.parse(payload), { ...dataset[count++], updatedAt }) + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + let id = 0 + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (doc) { + return [ + { + create: { + _index: 'test', + _id: String(id++) + } + }, + { ...doc, updatedAt } + ] + }, + onDrop (doc) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + t.end() })