From fa0cfc5715b93c539b81464dc673c0ef5ce94d92 Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Wed, 25 Oct 2017 16:18:29 +0200 Subject: [PATCH] Fix double @timestamp key when using JSON decoding (#5436) The MergeJSONFields was adding the parsed @timestamp key to the fields, instead of modifying it into the Event structure. This change makes it return the new timestamp (or the empty Timestamp if no change required), and the caller sets it into the event. --- filebeat/harvester/reader/json.go | 9 +++---- filebeat/harvester/reader/json_test.go | 37 ++++++++++++++++---------- filebeat/prospector/log/harvester.go | 16 +++++++---- 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/filebeat/harvester/reader/json.go b/filebeat/harvester/reader/json.go index 1ba816c6382d..e84a90c809ed 100644 --- a/filebeat/harvester/reader/json.go +++ b/filebeat/harvester/reader/json.go @@ -93,7 +93,7 @@ func createJSONError(message string) common.MapStr { // respecting the KeysUnderRoot and OverwriteKeys configuration options. // If MessageKey is defined, the Text value from the event always // takes precedence. -func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config JSONConfig) { +func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config JSONConfig) time.Time { // The message key might have been modified by multiline if len(config.MessageKey) > 0 && text != nil { jsonFields[config.MessageKey] = *text @@ -111,6 +111,7 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, case common.Time: ts = time.Time(ts) } + delete(data, "@timestamp") } event := &beat.Event{ Timestamp: ts, @@ -118,9 +119,7 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, } jsontransform.WriteJSONKeys(event, jsonFields, config.OverwriteKeys) - // if timestamp has been set -> add to data - if !event.Timestamp.IsZero() { - data["@timestamp"] = common.Time(event.Timestamp) - } + return event.Timestamp } + return time.Time{} } diff --git a/filebeat/harvester/reader/json_test.go b/filebeat/harvester/reader/json_test.go index d2625ab2dd11..09ed3b64c7e4 100644 --- a/filebeat/harvester/reader/json_test.go +++ b/filebeat/harvester/reader/json_test.go @@ -176,11 +176,12 @@ func TestAddJSONFields(t *testing.T) { now := time.Now().UTC() tests := []struct { - Name string - Data common.MapStr - Text *string - JSONConfig JSONConfig - ExpectedItems common.MapStr + Name string + Data common.MapStr + Text *string + JSONConfig JSONConfig + ExpectedItems common.MapStr + ExpectedTimestamp time.Time }{ { // by default, don't overwrite keys @@ -192,6 +193,7 @@ func TestAddJSONFields(t *testing.T) { "type": "test_type", "text": "hello", }, + ExpectedTimestamp: time.Time{}, }, { // overwrite keys if asked @@ -203,6 +205,7 @@ func TestAddJSONFields(t *testing.T) { "type": "test", "text": "hello", }, + ExpectedTimestamp: time.Time{}, }, { // without keys_under_root, put everything in a json key @@ -213,6 +216,7 @@ func TestAddJSONFields(t *testing.T) { ExpectedItems: common.MapStr{ "json": common.MapStr{"type": "test", "text": "hello"}, }, + ExpectedTimestamp: time.Time{}, }, { // when MessageKey is defined, the Text overwrites the value of that key @@ -224,6 +228,7 @@ func TestAddJSONFields(t *testing.T) { "json": common.MapStr{"type": "test", "text": "hello"}, "type": "test_type", }, + ExpectedTimestamp: time.Time{}, }, { // when @timestamp is in JSON and overwrite_keys is true, parse it @@ -233,9 +238,9 @@ func TestAddJSONFields(t *testing.T) { Text: &text, JSONConfig: JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, ExpectedItems: common.MapStr{ - "@timestamp": common.MustParseTime("2016-04-05T18:47:18.444Z"), - "type": "test", + "type": "test", }, + ExpectedTimestamp: time.Time(common.MustParseTime("2016-04-05T18:47:18.444Z")), }, { // when the parsing on @timestamp fails, leave the existing value and add an error key @@ -245,10 +250,10 @@ func TestAddJSONFields(t *testing.T) { Text: &text, JSONConfig: JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, ExpectedItems: common.MapStr{ - "@timestamp": common.Time(now), - "type": "test", - "error": common.MapStr{"type": "json", "message": "@timestamp not overwritten (parse error on 2016-04-05T18:47:18.44XX4Z)"}, + "type": "test", + "error": common.MapStr{"type": "json", "message": "@timestamp not overwritten (parse error on 2016-04-05T18:47:18.44XX4Z)"}, }, + ExpectedTimestamp: time.Time{}, }, { // when the @timestamp has the wrong type, leave the existing value and add an error key @@ -258,10 +263,10 @@ func TestAddJSONFields(t *testing.T) { Text: &text, JSONConfig: JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, ExpectedItems: common.MapStr{ - "@timestamp": common.Time(now), - "type": "test", - "error": common.MapStr{"type": "json", "message": "@timestamp not overwritten (not string)"}, + "type": "test", + "error": common.MapStr{"type": "json", "message": "@timestamp not overwritten (not string)"}, }, + ExpectedTimestamp: time.Time{}, }, { // if overwrite_keys is true, but the `type` key in json is not a string, ignore it @@ -273,6 +278,7 @@ func TestAddJSONFields(t *testing.T) { "type": "test_type", "error": common.MapStr{"type": "json", "message": "type not overwritten (not string)"}, }, + ExpectedTimestamp: time.Time{}, }, { // if overwrite_keys is true, but the `type` key in json is empty, ignore it @@ -284,6 +290,7 @@ func TestAddJSONFields(t *testing.T) { "type": "test_type", "error": common.MapStr{"type": "json", "message": "type not overwritten (invalid value [])"}, }, + ExpectedTimestamp: time.Time{}, }, { // if overwrite_keys is true, but the `type` key in json starts with _, ignore it @@ -295,6 +302,7 @@ func TestAddJSONFields(t *testing.T) { "type": "test_type", "error": common.MapStr{"type": "json", "message": "type not overwritten (invalid value [_type])"}, }, + ExpectedTimestamp: time.Time{}, }, } @@ -305,12 +313,13 @@ func TestAddJSONFields(t *testing.T) { jsonFields = fields.(common.MapStr) } - MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig) + ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig) t.Log("Executing test:", test) for k, v := range test.ExpectedItems { assert.Equal(t, v, test.Data[k]) } + assert.Equal(t, test.ExpectedTimestamp, ts) }) } } diff --git a/filebeat/prospector/log/harvester.go b/filebeat/prospector/log/harvester.go index a163e3cd5b01..4c2e3d5d752c 100644 --- a/filebeat/prospector/log/harvester.go +++ b/filebeat/prospector/log/harvester.go @@ -270,8 +270,17 @@ func (h *Harvester) Run() error { jsonFields = f.(common.MapStr) } + data.Event = beat.Event{ + Timestamp: message.Ts, + } + if h.config.JSON != nil && len(jsonFields) > 0 { - reader.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) + ts := reader.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) + if !ts.IsZero() { + // there was a `@timestamp` key in the event, so overwrite + // the resulting timestamp + data.Event.Timestamp = ts + } } else if &text != nil { if fields == nil { fields = common.MapStr{} @@ -279,10 +288,7 @@ func (h *Harvester) Run() error { fields["message"] = text } - data.Event = beat.Event{ - Timestamp: message.Ts, - Fields: fields, - } + data.Event.Fields = fields } // Always send event to update state, also if lines was skipped