diff --git a/plugins/parsers/json_v2/parser.go b/plugins/parsers/json_v2/parser.go index 64153f1cfcbb7..e9feaf415b6b8 100644 --- a/plugins/parsers/json_v2/parser.go +++ b/plugins/parsers/json_v2/parser.go @@ -23,8 +23,6 @@ type Parser struct { // measurementName is the the name of the current config used in each line protocol measurementName string - // timestamp is the timestamp used in each line protocol, defaults to time.Now() - timestamp time.Time // **** Specific for object configuration **** // subPathResults contains the results of sub-gjson path expressions provided in fields/tags table within object config @@ -112,10 +110,12 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { } // timestamp defaults to current time, or can be parsed from the JSON using a GJSON path expression - p.timestamp = time.Now() + timestamp := time.Now() if c.TimestampPath != "" { result := gjson.GetBytes(input, c.TimestampPath) + if result.Type == gjson.Null { + p.Log.Debugf("Message: %s", input) return nil, fmt.Errorf(GJSONPathNUllErrorMSG) } if !result.IsArray() && !result.IsObject() { @@ -125,24 +125,25 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { } var err error - p.timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone) + timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone) + if err != nil { return nil, err } } } - fields, err := p.processMetric(input, c.Fields, false) + fields, err := p.processMetric(input, c.Fields, false, timestamp) if err != nil { return nil, err } - tags, err := p.processMetric(input, c.Tags, true) + tags, err := p.processMetric(input, c.Tags, true, timestamp) if err != nil { return nil, err } - objects, err := p.processObjects(input, c.JSONObjects) + objects, err := p.processObjects(input, c.JSONObjects, timestamp) if err != nil { return nil, err } @@ -168,7 +169,7 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { // processMetric will iterate over all 'field' or 'tag' configs and create metrics for each // A field/tag can either be a single value or an array of values, each resulting in its own metric // For multiple configs, a set of metrics is created from the cartesian product of each separate config -func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegraf.Metric, error) { +func (p *Parser) processMetric(input []byte, data []DataSet, tag bool, timestamp time.Time) ([]telegraf.Metric, error) { if len(data) == 0 { return nil, nil } @@ -207,13 +208,13 @@ func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegr p.measurementName, map[string]string{}, map[string]interface{}{}, - p.timestamp, + timestamp, ), Result: result, } // Expand all array's and nested arrays into separate metrics - nodes, err := p.expandArray(mNode) + nodes, err := p.expandArray(mNode, timestamp) if err != nil { return nil, err } @@ -259,7 +260,7 @@ func mergeMetric(a telegraf.Metric, m telegraf.Metric) { } // expandArray will recursively create a new MetricNode for each element in a JSON array or single value -func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { +func (p *Parser) expandArray(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) { var results []telegraf.Metric if result.IsObject() { @@ -267,7 +268,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { p.Log.Debugf("Found object in query ignoring it please use 'object' to gather metrics from objects") return results, nil } - r, err := p.combineObject(result) + r, err := p.combineObject(result, timestamp) if err != nil { return nil, err } @@ -285,14 +286,14 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { p.measurementName, map[string]string{}, map[string]interface{}{}, - p.timestamp, + timestamp, ) if val.IsObject() { n := result n.ParentIndex += val.Index n.Metric = m n.Result = val - r, err := p.combineObject(n) + r, err := p.combineObject(n, timestamp) if err != nil { return false } @@ -311,7 +312,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { n.ParentIndex += val.Index n.Metric = m n.Result = val - r, err := p.expandArray(n) + r, err := p.expandArray(n, timestamp) if err != nil { return false } @@ -400,7 +401,7 @@ func (p *Parser) existsInpathResults(index int) *PathResult { } // processObjects will iterate over all 'object' configs and create metrics for each -func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.Metric, error) { +func (p *Parser) processObjects(input []byte, objects []JSONObject, timestamp time.Time) ([]telegraf.Metric, error) { p.iterateObjects = true var t []telegraf.Metric for _, c := range objects { @@ -449,11 +450,11 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf. p.measurementName, map[string]string{}, map[string]interface{}{}, - p.timestamp, + timestamp, ), Result: result, } - metrics, err := p.expandArray(rootObject) + metrics, err := p.expandArray(rootObject, timestamp) if err != nil { return nil, err } @@ -465,7 +466,7 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf. // combineObject will add all fields/tags to a single metric // If the object has multiple array's as elements it won't comine those, they will remain separate metrics -func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) { +func (p *Parser) combineObject(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) { var results []telegraf.Metric if result.IsArray() || result.IsObject() { var err error @@ -519,12 +520,12 @@ func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) { arrayNode.Tag = tag if val.IsObject() { - results, err = p.combineObject(arrayNode) + results, err = p.combineObject(arrayNode, timestamp) if err != nil { return false } } else { - r, err := p.expandArray(arrayNode) + r, err := p.expandArray(arrayNode, timestamp) if err != nil { return false }