Skip to content

Commit

Permalink
Parse statsd lines with multiple metric bits
Browse files Browse the repository at this point in the history
  • Loading branch information
MerlinDMC committed Nov 14, 2015
1 parent bf8e0f4 commit e4ada04
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 79 deletions.
166 changes: 87 additions & 79 deletions plugins/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,101 +255,109 @@ func (s *Statsd) parseStatsdLine(line string) error {
s.Lock()
defer s.Unlock()

m := metric{}

// Validate splitting the line on "|"
pipesplit := strings.Split(line, "|")
if len(pipesplit) < 2 {
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
// Validate splitting the line on ":"
bits := strings.Split(line, ":")
if len(bits) < 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
} else if len(pipesplit) > 2 {
sr := pipesplit[2]
errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
if err != nil {
log.Printf(errmsg, err.Error(), line)
} else {
// sample rate successfully parsed
m.samplerate = samplerate
}
} else {
log.Printf(errmsg, "", line)
}
}

// Validate metric type
switch pipesplit[1] {
case "g", "c", "s", "ms", "h":
m.mtype = pipesplit[1]
default:
log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
return errors.New("Error Parsing statsd line")
}
// Extract bucket name from individual metric bits
bucketName, bits := bits[0], bits[1:]

// Validate splitting the rest of the line on ":"
colonsplit := strings.Split(pipesplit[0], ":")
if len(colonsplit) != 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.bucket = colonsplit[0]
// Add a metric for each bit available
for _, bit := range bits {
m := metric{}

m.bucket = bucketName

// Parse the value
if strings.ContainsAny(colonsplit[1], "-+") {
if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
// Validate splitting the bit on "|"
pipesplit := strings.Split(bit, "|")
if len(pipesplit) < 2 {
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
} else if len(pipesplit) > 2 {
sr := pipesplit[2]
errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
if err != nil {
log.Printf(errmsg, err.Error(), line)
} else {
// sample rate successfully parsed
m.samplerate = samplerate
}
} else {
log.Printf(errmsg, "", line)
}
}
m.additive = true
}

switch m.mtype {
case "g", "ms", "h":
v, err := strconv.ParseFloat(colonsplit[1], 64)
if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line)
// Validate metric type
switch pipesplit[1] {
case "g", "c", "s", "ms", "h":
m.mtype = pipesplit[1]
default:
log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
case "c", "s":
v, err := strconv.ParseInt(colonsplit[1], 10, 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")

// Parse the value
if strings.ContainsAny(pipesplit[0], "-+") {
if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.additive = true
}

switch m.mtype {
case "g", "ms", "h":
v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
case "c", "s":
v, err := strconv.ParseInt(pipesplit[0], 10, 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)
}
m.intvalue = v
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)

// Parse the name & tags from bucket
m.name, m.tags = s.parseName(m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
case "g":
m.tags["metric_type"] = "gauge"
case "s":
m.tags["metric_type"] = "set"
case "ms":
m.tags["metric_type"] = "timing"
case "h":
m.tags["metric_type"] = "histogram"
}
m.intvalue = v
}

// Parse the name & tags from bucket
m.name, m.tags = s.parseName(m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
case "g":
m.tags["metric_type"] = "gauge"
case "s":
m.tags["metric_type"] = "set"
case "ms":
m.tags["metric_type"] = "timing"
case "h":
m.tags["metric_type"] = "histogram"
}
// Make a unique key for the measurement name/tags
var tg []string
for k, v := range m.tags {
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(tg)
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)

// Make a unique key for the measurement name/tags
var tg []string
for k, v := range m.tags {
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
s.aggregate(m)
}
sort.Strings(tg)
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)

s.aggregate(m)
return nil
}

Expand Down
40 changes: 40 additions & 0 deletions plugins/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,46 @@ func TestParse_MeasurementsWithSameName(t *testing.T) {
}
}

// Test that measurements with multiple bits, are treated as different outputs
func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
s := NewStatsd()

// Test that counters work
valid_lines := []string{
"valid.multiple:0|ms|@0.1:1|ms",
}

for _, line := range valid_lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}

if len(s.timings) != 1 {
t.Errorf("Expected 1 measurement, found %d", len(s.timings))
}

if cachedtiming, ok := s.timings["metric_type=timingvalid_multiple"]; !ok {
t.Errorf("Expected cached measurement with hash 'metric_type=timingvalid_multiple' not found")
} else {
if cachedtiming.name != "valid_multiple" {
t.Errorf("Expected the name to be 'valid_multiple', got %s", cachedtiming.name)
}

// A 0 at samplerate 0.1 will add 10 values of 0,
// plus the second bit of value 1
// which adds uup to 11 individual datapoints to be cached
if cachedtiming.stats.n != 11 {
t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n)
}

if cachedtiming.stats.upper != 1 {
t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper)
}
}
}

// Valid lines should be parsed and their values should be cached
func TestParse_ValidLines(t *testing.T) {
s := NewStatsd()
Expand Down

0 comments on commit e4ada04

Please sign in to comment.