Skip to content

Commit

Permalink
fix: timestamp change during execution of json_v2 parser. (#10657)
Browse files Browse the repository at this point in the history
Co-authored-by: Kristian Grimsby <[email protected]>
  • Loading branch information
2 people authored and MyaLongmire committed Jul 6, 2022
1 parent a1db1d8 commit 76c14de
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions plugins/parsers/json_v2/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type Parser struct {

// measurementName is the the name of the current config used in each line protocol
measurementName string
// timestamp is the timestamp used in each line protocol, defaults to time.Now()
timestamp time.Time

// **** Specific for object configuration ****
// subPathResults contains the results of sub-gjson path expressions provided in fields/tags table within object config
Expand Down Expand Up @@ -112,10 +110,12 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
}

// timestamp defaults to current time, or can be parsed from the JSON using a GJSON path expression
p.timestamp = time.Now()
timestamp := time.Now()
if c.TimestampPath != "" {
result := gjson.GetBytes(input, c.TimestampPath)

if result.Type == gjson.Null {
p.Log.Debugf("Message: %s", input)
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
}
if !result.IsArray() && !result.IsObject() {
Expand All @@ -125,24 +125,25 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
}

var err error
p.timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone)
timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone)

if err != nil {
return nil, err
}
}
}

fields, err := p.processMetric(input, c.Fields, false)
fields, err := p.processMetric(input, c.Fields, false, timestamp)
if err != nil {
return nil, err
}

tags, err := p.processMetric(input, c.Tags, true)
tags, err := p.processMetric(input, c.Tags, true, timestamp)
if err != nil {
return nil, err
}

objects, err := p.processObjects(input, c.JSONObjects)
objects, err := p.processObjects(input, c.JSONObjects, timestamp)
if err != nil {
return nil, err
}
Expand All @@ -168,7 +169,7 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
// processMetric will iterate over all 'field' or 'tag' configs and create metrics for each
// A field/tag can either be a single value or an array of values, each resulting in its own metric
// For multiple configs, a set of metrics is created from the cartesian product of each separate config
func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegraf.Metric, error) {
func (p *Parser) processMetric(input []byte, data []DataSet, tag bool, timestamp time.Time) ([]telegraf.Metric, error) {
if len(data) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -207,13 +208,13 @@ func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegr
p.measurementName,
map[string]string{},
map[string]interface{}{},
p.timestamp,
timestamp,
),
Result: result,
}

// Expand all array's and nested arrays into separate metrics
nodes, err := p.expandArray(mNode)
nodes, err := p.expandArray(mNode, timestamp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -259,15 +260,15 @@ func mergeMetric(a telegraf.Metric, m telegraf.Metric) {
}

// expandArray will recursively create a new MetricNode for each element in a JSON array or single value
func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
func (p *Parser) expandArray(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) {
var results []telegraf.Metric

if result.IsObject() {
if !p.iterateObjects {
p.Log.Debugf("Found object in query ignoring it please use 'object' to gather metrics from objects")
return results, nil
}
r, err := p.combineObject(result)
r, err := p.combineObject(result, timestamp)
if err != nil {
return nil, err
}
Expand All @@ -285,14 +286,14 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
p.measurementName,
map[string]string{},
map[string]interface{}{},
p.timestamp,
timestamp,
)
if val.IsObject() {
n := result
n.ParentIndex += val.Index
n.Metric = m
n.Result = val
r, err := p.combineObject(n)
r, err := p.combineObject(n, timestamp)
if err != nil {
return false
}
Expand All @@ -311,7 +312,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
n.ParentIndex += val.Index
n.Metric = m
n.Result = val
r, err := p.expandArray(n)
r, err := p.expandArray(n, timestamp)
if err != nil {
return false
}
Expand Down Expand Up @@ -400,7 +401,7 @@ func (p *Parser) existsInpathResults(index int) *PathResult {
}

// processObjects will iterate over all 'object' configs and create metrics for each
func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.Metric, error) {
func (p *Parser) processObjects(input []byte, objects []JSONObject, timestamp time.Time) ([]telegraf.Metric, error) {
p.iterateObjects = true
var t []telegraf.Metric
for _, c := range objects {
Expand Down Expand Up @@ -449,11 +450,11 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.
p.measurementName,
map[string]string{},
map[string]interface{}{},
p.timestamp,
timestamp,
),
Result: result,
}
metrics, err := p.expandArray(rootObject)
metrics, err := p.expandArray(rootObject, timestamp)
if err != nil {
return nil, err
}
Expand All @@ -465,7 +466,7 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.

// combineObject will add all fields/tags to a single metric
// If the object has multiple array's as elements it won't comine those, they will remain separate metrics
func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) {
func (p *Parser) combineObject(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) {
var results []telegraf.Metric
if result.IsArray() || result.IsObject() {
var err error
Expand Down Expand Up @@ -519,12 +520,12 @@ func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) {
arrayNode.Tag = tag

if val.IsObject() {
results, err = p.combineObject(arrayNode)
results, err = p.combineObject(arrayNode, timestamp)
if err != nil {
return false
}
} else {
r, err := p.expandArray(arrayNode)
r, err := p.expandArray(arrayNode, timestamp)
if err != nil {
return false
}
Expand Down

0 comments on commit 76c14de

Please sign in to comment.