From eb306ad386c9a341801cbdfaa36ade5a23d8a40a Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 24 Jul 2024 20:59:35 +0100 Subject: [PATCH 1/4] Add dynamic templates to bulk indexer item --- bulk_indexer.go | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/bulk_indexer.go b/bulk_indexer.go index c126a92..3e2e184 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -269,14 +269,15 @@ func (b *BulkIndexer) BytesUncompressedFlushed() int { } type BulkIndexerItem struct { - Index string - DocumentID string - Body io.WriterTo + Index string + DocumentID string + Body io.WriterTo + DynamicTemplates map[string]string } // Add encodes an item in the buffer. func (b *BulkIndexer) Add(item BulkIndexerItem) error { - b.writeMeta(item.Index, item.DocumentID) + b.writeMeta(item.Index, item.DocumentID, item.DynamicTemplates) if _, err := item.Body.WriteTo(b.writer); err != nil { return fmt.Errorf("failed to write bulk indexer item: %w", err) } @@ -287,18 +288,39 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error { return nil } -func (b *BulkIndexer) writeMeta(index, documentID string) { +func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[string]string) { b.jsonw.RawString(`{"create":{`) + first := true if documentID != "" { b.jsonw.RawString(`"_id":`) b.jsonw.String(documentID) + first = false } if index != "" { - if documentID != "" { + if !first { b.jsonw.RawByte(',') } b.jsonw.RawString(`"_index":`) b.jsonw.String(index) + first = false + } + if dynamicTemplates != nil && len(dynamicTemplates) > 0 { + if !first { + b.jsonw.RawByte(',') + } + b.jsonw.RawString(`"dynamic_templates":{`) + var i int + for k, v := range dynamicTemplates { + if i > 0 { + b.jsonw.RawByte(',') + } + b.jsonw.String(k) + b.jsonw.RawByte(':') + b.jsonw.String(v) + i++ + } + b.jsonw.RawByte('}') + first = false } b.jsonw.RawString("}}\n") b.writer.Write(b.jsonw.Bytes()) From 6676bb3fef619cb0a88c0b247f550f1f184500ce Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 7 Aug 2024 10:47:03 +0100 Subject: [PATCH 2/4] Update bulk_indexer.go Co-authored-by: Andrew Wilkins --- bulk_indexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bulk_indexer.go b/bulk_indexer.go index 3e2e184..8ae3488 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -304,7 +304,7 @@ func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[s b.jsonw.String(index) first = false } - if dynamicTemplates != nil && len(dynamicTemplates) > 0 { + if len(dynamicTemplates) > 0 { if !first { b.jsonw.RawByte(',') } From 839c8a985d9b5ee76f670614ec7fbbf82f0ec055 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 7 Aug 2024 10:55:03 +0100 Subject: [PATCH 3/4] Use bool instead of int to track first --- bulk_indexer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bulk_indexer.go b/bulk_indexer.go index 8ae3488..a242692 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -309,15 +309,15 @@ func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[s b.jsonw.RawByte(',') } b.jsonw.RawString(`"dynamic_templates":{`) - var i int + firstDynamicTemplate := true for k, v := range dynamicTemplates { - if i > 0 { + if !firstDynamicTemplate { b.jsonw.RawByte(',') } b.jsonw.String(k) b.jsonw.RawByte(':') b.jsonw.String(v) - i++ + firstDynamicTemplate = false } b.jsonw.RawByte('}') first = false From 623ef0c8ff7522aea5f74ea15a66e28457cc7097 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 7 Aug 2024 11:23:27 +0100 Subject: [PATCH 4/4] Add test --- bulk_indexer_test.go | 37 ++++++++++++++++++++++++++++++ docappendertest/docappendertest.go | 17 ++++++++++++-- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/bulk_indexer_test.go b/bulk_indexer_test.go index 08de4f4..d8497cc 100644 --- a/bulk_indexer_test.go +++ b/bulk_indexer_test.go @@ -128,3 +128,40 @@ func TestBulkIndexer(t *testing.T) { }) } } + +func TestDynamicTemplates(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result, _, dynamicTemplates := docappendertest.DecodeBulkRequestWithStatsAndDynamicTemplates(r) + require.Equal(t, []map[string]string{ + {"one": "two", "three": "four"}, + {"five": "six", "seven": "eight"}, + }, dynamicTemplates) + json.NewEncoder(w).Encode(result) + }) + indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + DynamicTemplates: map[string]string{"one": "two", "three": "four"}, + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + DynamicTemplates: map[string]string{"five": "six", "seven": "eight"}, + }) + require.NoError(t, err) + + stat, err := indexer.Flush(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(2), stat.Indexed) +} diff --git a/docappendertest/docappendertest.go b/docappendertest/docappendertest.go index 2f33ab9..ee224cb 100644 --- a/docappendertest/docappendertest.go +++ b/docappendertest/docappendertest.go @@ -56,6 +56,17 @@ func DecodeBulkRequestWithStats(r *http.Request) ( docs [][]byte, res esutil.BulkIndexerResponse, stats RequestStats) { + indexed, result, stats, _ := DecodeBulkRequestWithStatsAndDynamicTemplates(r) + return indexed, result, stats +} + +// DecodeBulkRequestWithStatsAndDynamicTemplates decodes a /_bulk request's body, +// returning the decoded documents and a response body and stats about request, and per-request dynamic templates. +func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) ( + docs [][]byte, + res esutil.BulkIndexerResponse, + stats RequestStats, + dynamicTemplates []map[string]string) { body := r.Body switch r.Header.Get("Content-Encoding") { case "gzip": @@ -76,7 +87,8 @@ func DecodeBulkRequestWithStats(r *http.Request) ( var result esutil.BulkIndexerResponse for scanner.Scan() { action := make(map[string]struct { - Index string `json:"_index"` + Index string `json:"_index"` + DynamicTemplates map[string]string `json:"dynamic_templates"` }) if err := json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&action); err != nil { panic(err) @@ -96,8 +108,9 @@ func DecodeBulkRequestWithStats(r *http.Request) ( item := esutil.BulkIndexerResponseItem{Status: http.StatusCreated, Index: action[actionType].Index} result.Items = append(result.Items, map[string]esutil.BulkIndexerResponseItem{actionType: item}) + dynamicTemplates = append(dynamicTemplates, action[actionType].DynamicTemplates) } - return indexed, result, RequestStats{int64(cr.bytesRead)} + return indexed, result, RequestStats{int64(cr.bytesRead)}, dynamicTemplates } // NewMockElasticsearchClient returns an elasticsearch.Client which sends /_bulk requests to bulkHandler.