diff --git a/CHANGELOG.md b/CHANGELOG.md index 95e30bb1a..f5b69577d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased [unreleased] +# Bugfixes + +- [#1379](https://github.com/influxdata/kapacitor/issues/1379): Copy batch points slice before modification, fixes potential panics and data corruption. + ## v1.3.0-rc3 [2017-05-18] ### Release Notes diff --git a/default.go b/default.go index 463f9c8eb..a8e92c1fb 100644 --- a/default.go +++ b/default.go @@ -54,6 +54,7 @@ func (e *DefaultNode) runDefault(snapshot []byte) error { case pipeline.BatchEdge: for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() { e.timer.Start() + b.Points = b.ShallowCopyPoints() _, b.Tags = e.setDefaults(nil, b.Tags) b.UpdateGroup() for i := range b.Points { diff --git a/delete.go b/delete.go index abecae327..066214466 100644 --- a/delete.go +++ b/delete.go @@ -77,6 +77,7 @@ func (e *DeleteNode) runDelete(snapshot []byte) error { case pipeline.BatchEdge: for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() { e.timer.Start() + b.Points = b.ShallowCopyPoints() for i := range b.Points { b.Points[i].Fields, b.Points[i].Tags = e.doDeletes(b.Points[i].Fields, b.Points[i].Tags) } diff --git a/derivative.go b/derivative.go index 8df0a4389..2d536e1ef 100644 --- a/derivative.go +++ b/derivative.go @@ -69,6 +69,7 @@ func (d *DerivativeNode) runDerivative([]byte) error { case pipeline.BatchEdge: for b, ok := d.ins[0].NextBatch(); ok; b, ok = d.ins[0].NextBatch() { d.timer.Start() + b.Points = b.ShallowCopyPoints() var pr, p models.BatchPoint for i := 0; i < len(b.Points); i++ { p = b.Points[i] diff --git a/eval.go b/eval.go index 21384ea5a..a68373595 100644 --- a/eval.go +++ b/eval.go @@ -104,6 +104,7 @@ func (e *EvalNode) runEval(snapshot []byte) error { var err error for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() { e.timer.Start() + b.Points = b.ShallowCopyPoints() for i := 0; i < len(b.Points); { p := b.Points[i] b.Points[i].Fields, b.Points[i].Tags, err = e.eval(p.Time, b.Group, p.Fields, p.Tags) diff --git a/models/batch.go b/models/batch.go index e196fad88..5d7be3319 100644 --- a/models/batch.go +++ b/models/batch.go @@ -78,6 +78,14 @@ func (b Batch) Copy() PointInterface { return cb } +// ShallowCopyPoints creates a new slice for the points but only shallow copies the points themselves. +// Then if a single point needs to be modified it must first be copied. +func (b Batch) ShallowCopyPoints() []BatchPoint { + points := make([]BatchPoint, len(b.Points)) + copy(points, b.Points) + return points +} + func (b Batch) Setter() PointSetter { return &b } diff --git a/shift.go b/shift.go index 69c751f08..4c95de67d 100644 --- a/shift.go +++ b/shift.go @@ -47,6 +47,7 @@ func (s *ShiftNode) runShift([]byte) error { for b, ok := s.ins[0].NextBatch(); ok; b, ok = s.ins[0].NextBatch() { s.timer.Start() b.TMax = b.TMax.Add(s.shift) + b.Points = b.ShallowCopyPoints() for i, p := range b.Points { b.Points[i].Time = p.Time.Add(s.shift) } diff --git a/state_tracking.go b/state_tracking.go index d36ac9939..43b57a1ad 100644 --- a/state_tracking.go +++ b/state_tracking.go @@ -115,7 +115,7 @@ func (stn *StateTrackingNode) runStateTracking(_ []byte) error { } stg.tracker.reset() - b = b.Copy().(models.Batch) + b.Points = b.ShallowCopyPoints() for i := 0; i < len(b.Points); { p := &b.Points[i] pass, err := EvalPredicate(stg.Expression, stg.ScopePool, p.Time, p.Fields, p.Tags)