Skip to content

Commit

Permalink
address some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan committed Aug 20, 2020
1 parent ded9cb4 commit 04d11f5
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
32 changes: 17 additions & 15 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ import (
)

var (
m3TypeTag = []byte("__m3_type__")
m3CounterValue = []byte("counter")
m3GaugeValue = []byte("gauge")
m3TimerValue = []byte("timer")

m3MetricsPrefix = []byte("__m3")
m3MetricsPrefixString = string(m3MetricsPrefix)
m3MetricsGraphiteAggregation = []byte("__m3_graphite_aggregation__")
m3MetricsPrefix = []byte("__m3")
m3MetricsPrefixString = string(m3MetricsPrefix)

m3TypeTag = []byte(m3MetricsPrefixString + "_type__")
m3MetricsGraphiteAggregation = []byte(m3MetricsPrefixString + "_graphite_aggregation__")
)

type metricsAppenderPool struct {
Expand Down Expand Up @@ -121,13 +122,8 @@ func newMetricsAppender(pool *metricsAppenderPool) *metricsAppender {
func (a *metricsAppender) reset(opts metricsAppenderOptions) {
a.metricsAppenderOptions = opts

// Copy over any previous inuse encoders to the cached encoders list. If
// there are no cached encoders then create one.
a.cachedEncoders = append(a.cachedEncoders, a.inuseEncoders...)
a.inuseEncoders = a.inuseEncoders[:0]
if len(a.cachedEncoders) == 0 {
a.cachedEncoders = append(a.cachedEncoders, opts.tagEncoderPool.Get())
}
// Copy over any previous inuse encoders to the cached encoders list.
a.moveInuseEncoders()

// Make sure a.defaultStagedMetadatasCopies is right length.
capRequired := len(opts.defaultStagedMetadatasProtos)
Expand Down Expand Up @@ -405,15 +401,22 @@ func (a *metricsAppender) NextMetric() {
a.tags.values = a.tags.values[:0]

// Move the inuse encoders to cached as we should be done with using them.
a.cachedEncoders = append(a.cachedEncoders, a.inuseEncoders...)
a.inuseEncoders = a.inuseEncoders[:0]
a.moveInuseEncoders()
}

func (a *metricsAppender) Finalize() {
// Return to pool.
a.pool.Put(a)
}

func (a *metricsAppender) moveInuseEncoders() {
a.cachedEncoders = append(a.cachedEncoders, a.inuseEncoders...)
for i := range a.inuseEncoders {
a.inuseEncoders[i] = nil
}
a.inuseEncoders = a.inuseEncoders[:0]
}

func (a *metricsAppender) tagEncoder() serialize.TagEncoder {
// Take an encoder from the cached encoder list, if not present get one
// from the pool. Add the returned encoder to the used list.
Expand Down Expand Up @@ -497,7 +500,6 @@ func (a *metricsAppender) newSamplesAppender(
tags *tags,
sm metadata.StagedMetadata,
) (samplesAppender, error) {
// Get a new tag encoder from the unused list if not present create a new one.
tagEncoder := a.tagEncoder()
if err := tagEncoder.Encode(tags); err != nil {
return samplesAppender{}, err
Expand Down Expand Up @@ -549,7 +551,7 @@ func augmentTags(

// Add any additional tags we need to.
for _, tag := range t {
// If these are simply random tags to be added, then just add them.
// If the tag is not special tag, then just add it.
if !bytes.HasPrefix(tag.Name, m3MetricsPrefix) {
if len(tag.Name) > 0 && len(tag.Value) > 0 {
tags.append(tag.Name, tag.Value)
Expand Down
9 changes: 6 additions & 3 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ type Tag struct {

// Rule returns the mapping rule for the mapping rule configuration.
func (r MappingRuleConfiguration) Rule() (view.MappingRule, error) {
ruleID := uuid.New()
id := uuid.New()
name := r.Name
if name == "" {
name = ruleID
name = id
}
filter := r.Filter

Expand Down Expand Up @@ -350,7 +350,7 @@ func (r MappingRuleConfiguration) Rule() (view.MappingRule, error) {
}

return view.MappingRule{
ID: ruleID,
ID: id,
Name: name,
Filter: filter,
AggregationID: aggID,
Expand Down Expand Up @@ -696,6 +696,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

for _, rollupRule := range cfg.Rules.RollupRules {
if strings.Contains(rollupRule.Filter, m3MetricsPrefixString) {
m3PrefixFilter = true
}
rule, err := rollupRule.Rule()
if err != nil {
return agg{}, err
Expand Down
6 changes: 1 addition & 5 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,11 +629,7 @@ func newM3DBStorage(
if n := namespaces.NumAggregatedClusterNamespaces(); n > 0 {
logger.Info("configuring downsampler to use with aggregated cluster namespaces",
zap.Int("numAggregatedClusterNamespaces", n))
var (
autoMappingRules []downsample.AutoMappingRule
err error
)
autoMappingRules, err = newDownsamplerAutoMappingRules(namespaces)
autoMappingRules, err := newDownsamplerAutoMappingRules(namespaces)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down

0 comments on commit 04d11f5

Please sign in to comment.