From f495deead78a3ed65635d7563a5967420059754c Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Mon, 4 Apr 2016 16:56:53 -0600 Subject: [PATCH] fixes #387 #400 #401 --- CHANGELOG.md | 5 +- batch.go | 88 ++++++++++++++++++++++++++---------- eval.go | 70 +++++++++++++++++----------- expvar/expvar.go | 19 ++++++++ join.go | 113 +++++++++++++++++++++++++--------------------- models/batch.go | 4 +- node.go | 4 +- pipeline/batch.go | 13 ++++++ pipeline/eval.go | 10 ++++ query.go | 2 +- tick/eval.go | 7 +-- timer/timer.go | 64 -------------------------- 12 files changed, 220 insertions(+), 179 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 180080bdd..715d17331 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,11 +38,14 @@ format a TICKscript file according to a common standard. - [#389](https://github.com/influxdata/kapacitor/pull/389): Adds benchmarks to Kapacitor for basic use cases. - [#390](https://github.com/influxdata/kapacitor/issues/390): BREAKING: Remove old `.mapReduce` functions. - [#381](https://github.com/influxdata/kapacitor/pull/381): Adding enable/disable/delete/reload tasks by glob. - +- [#401](https://github.com/influxdata/kapacitor/issues/401): Add `.align()` property to BatchNode so you can align query start and stop times. ### Bugfixes - [#378](https://github.com/influxdata/kapacitor/issues/378): Fix issue where derivative would divide by zero. +- [#387](https://github.com/influxdata/kapacitor/issues/387): Add `.quiet()` option to EvalNode so errors can be suppressed if expected. +- [#400](https://github.com/influxdata/kapacitor/issues/400): All query/connection errors are counted and reported in BatchNode stats. +- [#412](https://github.com/influxdata/kapacitor/pull/412): Fix issues with batch queries dropping points because of nil fields. ## v0.11.0 [2016-03-22] diff --git a/batch.go b/batch.go index a2f4bc2d4..4f1a3c5d7 100644 --- a/batch.go +++ b/batch.go @@ -105,6 +105,11 @@ func (s *SourceBatchNode) collectedCount() (count int64) { return } +const ( + statsQueryErrors = "query_errors" + statsConnectErrors = "connect_errors" +) + type BatchNode struct { node b *pipeline.BatchNode @@ -160,23 +165,7 @@ func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode, l *log.Logger) (*Bat } switch { case n.Every != 0: - bn.ticker = newTimeTicker(n.Every) - case n.Cron != "": - var err error - bn.ticker, err = newCronTicker(n.Cron) - if err != nil { - return nil, err - } - default: - return nil, errors.New("must define one of 'every' or 'cron'") - } - - if n.Every != 0 && n.Cron != "" { - return nil, errors.New("must not set both 'every' and 'cron' properties") - } - switch { - case n.Every != 0: - bn.ticker = newTimeTicker(n.Every) + bn.ticker = newTimeTicker(n.Every, n.AlignFlag) case n.Cron != "": var err error bn.ticker, err = newCronTicker(n.Cron) @@ -270,6 +259,7 @@ func (b *BatchNode) doQuery() error { if err != nil { b.logger.Println("E! failed to connect to InfluxDB:", err) b.timer.Stop() + b.statMap.Add(statsConnectErrors, 1) break } q := client.Query{ @@ -281,12 +271,14 @@ func (b *BatchNode) doQuery() error { if err != nil { b.logger.Println("E! query failed:", err) b.timer.Stop() + b.statMap.Add(statsQueryErrors, 1) break } if err := resp.Error(); err != nil { b.logger.Println("E! query failed:", err) b.timer.Stop() + b.statMap.Add(statsQueryErrors, 1) break } @@ -295,6 +287,7 @@ func (b *BatchNode) doQuery() error { batches, err := models.ResultToBatches(res) if err != nil { b.logger.Println("E! failed to understand query result:", err) + b.statMap.Add(statsQueryErrors, 1) continue } b.timer.Pause() @@ -312,6 +305,8 @@ func (b *BatchNode) doQuery() error { } func (b *BatchNode) runBatch([]byte) error { + b.statMap.Add(statsQueryErrors, 0) + b.statMap.Add(statsConnectErrors, 0) errC := make(chan error, 1) go func() { defer func() { @@ -371,20 +366,61 @@ type ticker interface { } type timeTicker struct { - every time.Duration - ticker *time.Ticker - mu sync.Mutex + every time.Duration + alignChan chan time.Time + stopping chan struct{} + ticker *time.Ticker + mu sync.Mutex + wg sync.WaitGroup } -func newTimeTicker(every time.Duration) *timeTicker { - return &timeTicker{every: every} +func newTimeTicker(every time.Duration, align bool) *timeTicker { + t := &timeTicker{ + every: every, + } + if align { + t.alignChan = make(chan time.Time) + t.stopping = make(chan struct{}) + } + return t } func (t *timeTicker) Start() <-chan time.Time { t.mu.Lock() defer t.mu.Unlock() - t.ticker = time.NewTicker(t.every) - return t.ticker.C + if t.alignChan != nil { + t.wg.Add(1) + go func() { + defer t.wg.Done() + // Sleep until we are roughly aligned + now := time.Now() + next := now.Truncate(t.every).Add(t.every) + after := time.NewTicker(next.Sub(now)) + select { + case <-after.C: + after.Stop() + case <-t.stopping: + after.Stop() + return + } + t.ticker = time.NewTicker(t.every) + // Send first event since we waited for it explicitly + t.alignChan <- next + for { + select { + case <-t.stopping: + return + case now := <-t.ticker.C: + now = now.Truncate(t.every) + t.alignChan <- now + } + } + }() + return t.alignChan + } else { + t.ticker = time.NewTicker(t.every) + return t.ticker.C + } } func (t *timeTicker) Stop() { @@ -393,6 +429,10 @@ func (t *timeTicker) Stop() { if t.ticker != nil { t.ticker.Stop() } + if t.alignChan != nil { + close(t.stopping) + } + t.wg.Wait() } func (t *timeTicker) Next(now time.Time) time.Time { diff --git a/eval.go b/eval.go index 84e6b813f..e81b0c3e1 100644 --- a/eval.go +++ b/eval.go @@ -5,15 +5,21 @@ import ( "log" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick" ) +const ( + statsEvalErrors = "eval_errors" +) + type EvalNode struct { node e *pipeline.EvalNode expressions []*tick.StatefulExpr + evalErrors *expvar.Int } // Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point. @@ -36,15 +42,13 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN } func (e *EvalNode) runEval(snapshot []byte) error { + e.evalErrors = &expvar.Int{} + e.statMap.Set(statsEvalErrors, e.evalErrors) switch e.Provides() { case pipeline.StreamEdge: for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() { e.timer.Start() - fields, err := e.eval(p.Time, p.Fields, p.Tags) - if err != nil { - return err - } - p.Fields = fields + p.Fields = e.eval(p.Time, p.Fields, p.Tags) e.timer.Stop() for _, child := range e.outs { err := child.CollectPoint(p) @@ -57,11 +61,7 @@ func (e *EvalNode) runEval(snapshot []byte) error { for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() { e.timer.Start() for i, p := range b.Points { - fields, err := e.eval(p.Time, p.Fields, p.Tags) - if err != nil { - return err - } - b.Points[i].Fields = fields + b.Points[i].Fields = e.eval(p.Time, p.Fields, p.Tags) } e.timer.Stop() for _, child := range e.outs { @@ -75,27 +75,35 @@ func (e *EvalNode) runEval(snapshot []byte) error { return nil } -func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) (models.Fields, error) { +func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) models.Fields { vars, err := mergeFieldsAndTags(now, fields, tags) if err != nil { - return nil, err + e.logger.Println("E!", err) + return nil } for i, expr := range e.expressions { - v, err := expr.EvalNum(vars) - if err != nil { - return nil, err + if v, err := expr.EvalNum(vars); err == nil { + name := e.e.AsList[i] + vars.Set(name, v) + } else { + e.evalErrors.Add(1) + if !e.e.QuiteFlag { + e.logger.Println("E!", err) + } } - name := e.e.AsList[i] - vars.Set(name, v) } var newFields models.Fields if e.e.KeepFlag { if l := len(e.e.KeepList); l != 0 { newFields = make(models.Fields, l) for _, f := range e.e.KeepList { - newFields[f], err = vars.Get(f) - if err != nil { - return nil, err + if v, err := vars.Get(f); err == nil { + newFields[f] = v + } else { + e.evalErrors.Add(1) + if !e.e.QuiteFlag { + e.logger.Println("E!", err) + } } } } else { @@ -104,20 +112,28 @@ func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]str newFields[f] = v } for _, f := range e.e.AsList { - newFields[f], err = vars.Get(f) - if err != nil { - return nil, err + if v, err := vars.Get(f); err == nil { + newFields[f] = v + } else { + e.evalErrors.Add(1) + if !e.e.QuiteFlag { + e.logger.Println("E!", err) + } } } } } else { newFields = make(models.Fields, len(e.e.AsList)) for _, f := range e.e.AsList { - newFields[f], err = vars.Get(f) - if err != nil { - return nil, err + if v, err := vars.Get(f); err == nil { + newFields[f] = v + } else { + e.evalErrors.Add(1) + if !e.e.QuiteFlag { + e.logger.Println("E!", err) + } } } } - return newFields, nil + return newFields } diff --git a/expvar/expvar.go b/expvar/expvar.go index cb367cbd2..4883eeea7 100644 --- a/expvar/expvar.go +++ b/expvar/expvar.go @@ -7,6 +7,7 @@ import ( "expvar" "fmt" "math" + "sort" "strconv" "sync" "sync/atomic" @@ -174,6 +175,24 @@ func (v *Map) Do(f func(expvar.KeyValue)) { v.doLocked(f) } +// DoSorted calls f for each entry in the map in sorted order. +// The map is locked during the iteration, +// but existing entries may be concurrently updated. +func (v *Map) DoSorted(f func(expvar.KeyValue)) { + v.mu.RLock() + defer v.mu.RUnlock() + keys := make([]string, len(v.m)) + i := 0 + for key := range v.m { + keys[i] = key + i++ + } + sort.Strings(keys) + for _, k := range keys { + f(expvar.KeyValue{k, v.m[k]}) + } +} + // doLocked calls f for each entry in the map. // v.mu must be held for reads. func (v *Map) doLocked(f func(expvar.KeyValue)) { diff --git a/join.go b/join.go index 5f0e16925..b47f00f41 100644 --- a/join.go +++ b/join.go @@ -31,10 +31,6 @@ type JoinNode struct { lowMark time.Time reported map[int]bool allReported bool - - //timing vars - mpSetter *timer.MultiPartSetter - postVar timer.Setter } // Create a new JoinNode, which takes pairs from parent streams combines them into a single point. @@ -80,51 +76,37 @@ func newJoinNode(et *ExecutingTask, n *pipeline.JoinNode, l *log.Logger) (*JoinN jn.fill = influxql.NoFill } jn.node.runF = jn.runJoin - jn.node.stopF = jn.stopJoin return jn, nil } -func (j *JoinNode) stopJoin() { - j.mpSetter.Stop() -} - func (j *JoinNode) runJoin([]byte) error { j.groups = make(map[models.GroupID]*group) - wg := sync.WaitGroup{} - - // Setup multi part timer to time both the - // 'pre' work and 'post' work. - avgExecVar := &durationVar{} - j.statMap.Set(statAverageExecTime, avgExecVar) - j.mpSetter = timer.NewMultiPartSetter(avgExecVar) - // Time work to resolve join.on dimensions. - pre := j.mpSetter.NewPart() - preVar := &MaxDuration{setter: pre} - // Time work to join points. - post := j.mpSetter.NewPart() - j.postVar = &MaxDuration{setter: post} + + groupErrs := make(chan error, 1) + done := make(chan struct{}, len(j.ins)) for i := range j.ins { // Start gorouting per parent so we do not deadlock. // This way independent of the order that parents receive data // we can handle it. - wg.Add(1) - t := j.et.tm.TimingService.NewTimer(preVar) + t := j.et.tm.TimingService.NewTimer(j.statMap.Get(statAverageExecTime).(timer.Setter)) go func(i int, t timer.Timer) { - defer wg.Done() + defer func() { + done <- struct{}{} + }() in := j.ins[i] for p, ok := in.Next(); ok; p, ok = in.Next() { t.Start() srcP := srcPoint{src: i, p: p} if len(j.j.Dimensions) > 0 { // Match points with their group based on join dimensions. - j.matchPoints(srcP) + j.matchPoints(srcP, groupErrs) } else { // Just send point on to group, we are not joining on specific dimensions. func() { j.mu.Lock() defer j.mu.Unlock() - group := j.getGroup(p) + group := j.getGroup(p, groupErrs) // Send current point group.points <- srcP }() @@ -133,14 +115,23 @@ func (j *JoinNode) runJoin([]byte) error { } }(i, t) } - wg.Wait() + for range j.ins { + select { + case <-done: + case err := <-groupErrs: + return err + } + } // No more points are comming signal all groups to finish up. for _, group := range j.groups { close(group.points) } j.runningGroups.Wait() for _, group := range j.groups { - group.emitAll() + err := group.emitAll() + if err != nil { + return err + } } return nil } @@ -149,7 +140,7 @@ func (j *JoinNode) runJoin([]byte) error { // with the less specific points as they arrive. // // Where 'more specific' means, that a point has more dimensions than the join.on dimensions. -func (j *JoinNode) matchPoints(p srcPoint) { +func (j *JoinNode) matchPoints(p srcPoint, groupErrs chan<- error) { j.mu.Lock() defer j.mu.Unlock() @@ -169,7 +160,7 @@ func (j *JoinNode) matchPoints(p srcPoint) { matched := false for _, match := range matches { if match.p.PointTime().Round(j.j.Tolerance).Equal(t) { - j.sendMatchPoint(p, match) + j.sendMatchPoint(p, match, groupErrs) matched = true } } @@ -203,7 +194,7 @@ func (j *JoinNode) matchPoints(p srcPoint) { l := len(buf) for i = 0; i < l; i++ { if buf[i].p.PointTime().Round(j.j.Tolerance).Equal(t) { - j.sendMatchPoint(buf[i], p) + j.sendMatchPoint(buf[i], p, groupErrs) } else { break } @@ -215,13 +206,13 @@ func (j *JoinNode) matchPoints(p srcPoint) { // Add the specific tags from the specific point to the matched point // and then send both on to the group. -func (j *JoinNode) sendMatchPoint(specific, matched srcPoint) { +func (j *JoinNode) sendMatchPoint(specific, matched srcPoint, groupErrs chan<- error) { np := matched.p.Copy().Setter() for key, value := range specific.p.PointTags() { np.SetNewDimTag(key, value) } np.UpdateGroup() - group := j.getGroup(specific.p) + group := j.getGroup(specific.p, groupErrs) // Send current point group.points <- specific // Send new matched point @@ -230,13 +221,22 @@ func (j *JoinNode) sendMatchPoint(specific, matched srcPoint) { } // safely get the group for the point or create one if it doesn't exist. -func (j *JoinNode) getGroup(p models.PointInterface) *group { +func (j *JoinNode) getGroup(p models.PointInterface, groupErrs chan<- error) *group { group := j.groups[p.PointGroup()] if group == nil { group = newGroup(len(j.ins), j) j.groups[p.PointGroup()] = group j.runningGroups.Add(1) - go group.run() + go func() { + err := group.run() + if err != nil { + j.logger.Println("E! join group error:", err) + select { + case groupErrs <- err: + default: + } + } + }() } return group } @@ -254,7 +254,6 @@ type group struct { oldestTime time.Time j *JoinNode points chan srcPoint - timer timer.Timer } func newGroup(i int, j *JoinNode) *group { @@ -263,23 +262,24 @@ func newGroup(i int, j *JoinNode) *group { head: make([]time.Time, i), j: j, points: make(chan srcPoint), - timer: j.et.tm.TimingService.NewTimer(j.postVar), } } // start consuming incoming points -func (g *group) run() { +func (g *group) run() error { defer g.j.runningGroups.Done() for sp := range g.points { - g.timer.Start() - g.collect(sp.src, sp.p) - g.timer.Stop() + err := g.collect(sp.src, sp.p) + if err != nil { + return err + } } + return nil } // collect a point from a given parent. // emit the oldest set if we have collected enough data. -func (g *group) collect(i int, p models.PointInterface) { +func (g *group) collect(i int, p models.PointInterface) error { t := p.PointTime().Round(g.j.j.Tolerance) if t.Before(g.oldestTime) || g.oldestTime.IsZero() { g.oldestTime = t @@ -307,14 +307,21 @@ func (g *group) collect(i int, p models.PointInterface) { } } if emit { - g.emit() + err := g.emit() + if err != nil { + return err + } } + return nil } // emit a set and update the oldestTime. -func (g *group) emit() { +func (g *group) emit() error { set := g.sets[g.oldestTime] - g.emitJoinedSet(set) + err := g.emitJoinedSet(set) + if err != nil { + return err + } delete(g.sets, g.oldestTime) g.oldestTime = time.Time{} @@ -323,13 +330,19 @@ func (g *group) emit() { g.oldestTime = t } } + return nil } // emit sets until we have none left. -func (g *group) emitAll() { +func (g *group) emitAll() error { + var lastErr error for len(g.sets) > 0 { - g.emit() + err := g.emit() + if err != nil { + lastErr = err + } } + return lastErr } // emit a single joined set @@ -341,26 +354,22 @@ func (g *group) emitJoinedSet(set *joinset) error { case pipeline.StreamEdge: p, ok := set.JoinIntoPoint() if ok { - g.timer.Pause() for _, out := range g.j.outs { err := out.CollectPoint(p) if err != nil { return err } } - g.timer.Resume() } case pipeline.BatchEdge: b, ok := set.JoinIntoBatch() if ok { - g.timer.Pause() for _, out := range g.j.outs { err := out.CollectBatch(b) if err != nil { return err } } - g.timer.Resume() } } return nil diff --git a/models/batch.go b/models/batch.go index df40fbcf8..50a22f4a8 100644 --- a/models/batch.go +++ b/models/batch.go @@ -144,7 +144,6 @@ func ResultToBatches(res client.Result) ([]Batch, error) { } b.Points = make([]BatchPoint, 0, len(series.Values)) for _, v := range series.Values { - var skip bool fields := make(Fields) var t time.Time for i, c := range series.Columns { @@ -170,13 +169,12 @@ func ResultToBatches(res client.Result) ([]Batch, error) { } } if value == nil { - skip = true break } fields[c] = value } } - if !skip { + if len(fields) > 0 { if t.After(b.TMax) { b.TMax = t } diff --git a/node.go b/node.go index 42ce324fb..9cd106513 100644 --- a/node.go +++ b/node.go @@ -200,7 +200,7 @@ func (n *node) edot(buf *bytes.Buffer, labels bool) { n.Name(), ), )) - n.statMap.Do(func(kv expvar.KeyValue) { + n.statMap.DoSorted(func(kv expvar.KeyValue) { buf.Write([]byte( fmt.Sprintf("%s=%s ", kv.Key, @@ -227,7 +227,7 @@ func (n *node) edot(buf *bytes.Buffer, labels bool) { n.Name(), ), )) - n.statMap.Do(func(kv expvar.KeyValue) { + n.statMap.DoSorted(func(kv expvar.KeyValue) { buf.Write([]byte( fmt.Sprintf("%s=\"%s\" ", kv.Key, diff --git a/pipeline/batch.go b/pipeline/batch.go index e16f1cf08..15885917d 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -84,6 +84,11 @@ type BatchNode struct { // The Every property is mutually exclusive with the Cron property. Every time.Duration + // Align start and end times with the Every value + // Does not apply if Cron is used. + // tick:ignore + AlignFlag bool `tick:"Align"` + // Define a schedule using a cron syntax. // // The specific cron implementation is documented here: @@ -142,3 +147,11 @@ func (b *BatchNode) GroupBy(d ...interface{}) *BatchNode { b.Dimensions = d return b } + +// Align start and stop times for quiries with even boundaries of the BatchNode.Every property. +// Does not apply if using the BatchNode.Cron property. +// tick:property +func (b *BatchNode) Align() *BatchNode { + b.AlignFlag = true + return b +} diff --git a/pipeline/eval.go b/pipeline/eval.go index 48365d97c..31641fd34 100644 --- a/pipeline/eval.go +++ b/pipeline/eval.go @@ -35,6 +35,9 @@ type EvalNode struct { // keep all fields. // tick:ignore KeepList []string + + // tick:ignore + QuiteFlag bool `tick:"Quiet"` } func newEvalNode(e EdgeType, exprs []tick.Node) *EvalNode { @@ -89,3 +92,10 @@ func (e *EvalNode) Keep(fields ...string) *EvalNode { e.KeepList = fields return e } + +// Suppress errors during evaluation. +// tick:property +func (e *EvalNode) Quiet() *EvalNode { + e.QuiteFlag = true + return e +} diff --git a/query.go b/query.go index 9de22b358..b27d41d16 100644 --- a/query.go +++ b/query.go @@ -30,7 +30,7 @@ func NewQuery(q string) (*Query, error) { // Add in time condition nodes query.startTL = &influxql.TimeLiteral{} startExpr := &influxql.BinaryExpr{ - Op: influxql.GT, + Op: influxql.GTE, LHS: &influxql.VarRef{Val: "time"}, RHS: query.startTL, } diff --git a/tick/eval.go b/tick/eval.go index 2fcc0483c..5754d14c5 100644 --- a/tick/eval.go +++ b/tick/eval.go @@ -400,8 +400,6 @@ type ReflectionDescriber struct { properties map[string]reflect.Value } -var structTagPattern = regexp.MustCompile(`tick:"(\w+)"`) - func NewReflectionDescriber(obj interface{}) (*ReflectionDescriber, error) { r := &ReflectionDescriber{ obj: obj, @@ -463,10 +461,9 @@ func getProperties(desc string, rv reflect.Value, rStructType reflect.Type, chai } continue } - matches := structTagPattern.FindStringSubmatch(string(property.Tag)) - if matches != nil && matches[1] != "" { + methodName := property.Tag.Get("tick") + if methodName != "" { // Property is set via a property method. - methodName := matches[1] method := rv.MethodByName(methodName) if method.IsValid() { propertyMethods[methodName] = method diff --git a/timer/timer.go b/timer/timer.go index ee0d1dc48..34ed387d2 100644 --- a/timer/timer.go +++ b/timer/timer.go @@ -2,7 +2,6 @@ package timer import ( "math/rand" - "sync" "time" ) @@ -139,69 +138,6 @@ func (m *movavg) update(value float64) float64 { return m.avg } -// A setter that can have distinct part or sections. -// By using a MultiPartSetter one can time distinct sections -// of code and have their individuals times summed -// to form a total timed value. -type MultiPartSetter struct { - mu sync.Mutex - wg sync.WaitGroup - stopping chan struct{} - - setter Setter - partValues []int64 - updates chan update -} - -func NewMultiPartSetter(setter Setter) *MultiPartSetter { - mp := &MultiPartSetter{ - setter: setter, - updates: make(chan update), - stopping: make(chan struct{}), - } - mp.wg.Add(1) - go mp.run() - return mp -} - -// Add a new distinct part. As new timings are set -// for this part they will contribute to the total time. -func (mp *MultiPartSetter) NewPart() Setter { - mp.mu.Lock() - defer mp.mu.Unlock() - - p := part{ - id: len(mp.partValues), - updates: mp.updates, - stopping: mp.stopping, - } - mp.partValues = append(mp.partValues, 0) - - return p -} - -func (mp *MultiPartSetter) Stop() { - close(mp.stopping) - mp.wg.Wait() -} - -func (mp *MultiPartSetter) run() { - defer mp.wg.Done() - for { - select { - case <-mp.stopping: - return - case update := <-mp.updates: - mp.partValues[update.part] = update.value - var sum int64 - for _, v := range mp.partValues { - sum += v - } - mp.setter.Set(sum) - } - } -} - type update struct { part int value int64