Skip to content

Commit

Permalink
Merged pull request #1389 from influxdata/nc-issue#1379
Browse files Browse the repository at this point in the history
Copy batch points slice before modification
  • Loading branch information
nathanielc committed May 19, 2017
2 parents ff7ab2f + d2e0ee5 commit 9dc9500
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased [unreleased]

# Bugfixes

- [#1379](https://github.com/influxdata/kapacitor/issues/1379): Copy batch points slice before modification, fixes potential panics and data corruption.

## v1.3.0-rc3 [2017-05-18]

### Release Notes
Expand Down
1 change: 1 addition & 0 deletions default.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (e *DefaultNode) runDefault(snapshot []byte) error {
case pipeline.BatchEdge:
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
e.timer.Start()
b.Points = b.ShallowCopyPoints()
_, b.Tags = e.setDefaults(nil, b.Tags)
b.UpdateGroup()
for i := range b.Points {
Expand Down
1 change: 1 addition & 0 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (e *DeleteNode) runDelete(snapshot []byte) error {
case pipeline.BatchEdge:
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
e.timer.Start()
b.Points = b.ShallowCopyPoints()
for i := range b.Points {
b.Points[i].Fields, b.Points[i].Tags = e.doDeletes(b.Points[i].Fields, b.Points[i].Tags)
}
Expand Down
1 change: 1 addition & 0 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (d *DerivativeNode) runDerivative([]byte) error {
case pipeline.BatchEdge:
for b, ok := d.ins[0].NextBatch(); ok; b, ok = d.ins[0].NextBatch() {
d.timer.Start()
b.Points = b.ShallowCopyPoints()
var pr, p models.BatchPoint
for i := 0; i < len(b.Points); i++ {
p = b.Points[i]
Expand Down
1 change: 1 addition & 0 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (e *EvalNode) runEval(snapshot []byte) error {
var err error
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
e.timer.Start()
b.Points = b.ShallowCopyPoints()
for i := 0; i < len(b.Points); {
p := b.Points[i]
b.Points[i].Fields, b.Points[i].Tags, err = e.eval(p.Time, b.Group, p.Fields, p.Tags)
Expand Down
8 changes: 8 additions & 0 deletions models/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func (b Batch) Copy() PointInterface {
return cb
}

// ShallowCopyPoints creates a new slice for the points but only shallow copies the points themselves.
// Then if a single point needs to be modified it must first be copied.
func (b Batch) ShallowCopyPoints() []BatchPoint {
points := make([]BatchPoint, len(b.Points))
copy(points, b.Points)
return points
}

func (b Batch) Setter() PointSetter {
return &b
}
Expand Down
1 change: 1 addition & 0 deletions shift.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (s *ShiftNode) runShift([]byte) error {
for b, ok := s.ins[0].NextBatch(); ok; b, ok = s.ins[0].NextBatch() {
s.timer.Start()
b.TMax = b.TMax.Add(s.shift)
b.Points = b.ShallowCopyPoints()
for i, p := range b.Points {
b.Points[i].Time = p.Time.Add(s.shift)
}
Expand Down
2 changes: 1 addition & 1 deletion state_tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (stn *StateTrackingNode) runStateTracking(_ []byte) error {
}
stg.tracker.reset()

b = b.Copy().(models.Batch)
b.Points = b.ShallowCopyPoints()
for i := 0; i < len(b.Points); {
p := &b.Points[i]
pass, err := EvalPredicate(stg.Expression, stg.ScopePool, p.Time, p.Fields, p.Tags)
Expand Down

0 comments on commit 9dc9500

Please sign in to comment.