-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathinflux-line-format.go
149 lines (130 loc) · 3.72 KB
/
influx-line-format.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package csv
import (
"bytes"
"fmt"
"io"
"os"
"regexp"
"sort"
"strconv"
"time"
)
// InfluxLineFormatProcess is a process which converts a CSV file into
// influx line format.
type InfluxLineFormatProcess struct {
Measurement string // the name of the measurement
Timestamp string // the name of the timestamp column
Format string // the format of the timestamp column (for format see documentation of go time.Parse())
Location string // the location in which the timestamp is interpreted (per go time.LoadLocation())
Tags []string // the columns to be used as tags
Values []string // the columns to be used as values.
}
// from influxdb
var tagEscapeCodes = map[byte][]byte{
',': []byte(`\,`),
' ': []byte(`\ `),
'=': []byte(`\=`),
}
func escapeTag(in []byte) []byte {
for b, esc := range tagEscapeCodes {
if bytes.Contains(in, []byte{b}) {
in = bytes.Replace(in, []byte{b}, esc, -1)
}
}
return in
}
// Run exhausts the reader, writing one record in influx line format per CSV input record.
func (p *InfluxLineFormatProcess) Run(reader Reader, out io.Writer, errCh chan<- error) {
errCh <- func() (err error) {
defer reader.Close()
sort.Strings(p.Tags)
sort.Strings(p.Values)
// see: http://stackoverflow.com/questions/13340717/json-numbers-regular-expression
numberMatcher := regexp.MustCompile("^ *-?(?:0|[1-9]\\d*)(?:\\.\\d+)?(?:[eE][+-]?\\d+)? *$")
if location, err := time.LoadLocation(p.Location); err != nil {
return err
} else {
maxLen := len(p.Measurement)
count := 1
for data := range reader.C() {
count++
stringTs := data.Get(p.Timestamp)
parse := func(s string) (time.Time, error) {
if p.Format == "ns" {
if ns, err := strconv.ParseInt(s, 10, 64); err != nil {
return time.Unix(0, 0), err
} else {
return time.Unix(0, ns), nil
}
} else if p.Format == "s" {
if sec, err := strconv.ParseInt(s, 10, 64); err != nil {
return time.Unix(0, 0), err
} else {
return time.Unix(sec, 0), nil
}
} else if p.Format == "ms" {
if ms, err := strconv.ParseInt(s, 10, 64); err != nil {
return time.Unix(0, 0), err
} else {
return time.Unix(0, ms*1000000), nil
}
} else {
return time.ParseInLocation(p.Format, stringTs, location)
}
}
if ts, err := parse(stringTs); err != nil {
return err
} else {
buffer := make([]byte, 0, maxLen)
buffer = append(buffer, p.Measurement...)
for _, t := range p.Tags {
v := data.Get(t)
if v == "" {
continue
}
buffer = append(buffer, ","...)
buffer = append(buffer, t...)
buffer = append(buffer, "="...)
buffer = append(buffer, escapeTag([]byte(v))...)
}
buffer = append(buffer, " "...)
first := true
appended := 0
for _, f := range p.Values {
v := data.Get(f)
if v == "" {
continue
}
appended++
if !first {
buffer = append(buffer, ","...)
} else {
first = false
}
buffer = append(buffer, f...)
buffer = append(buffer, "="...)
if numberMatcher.MatchString(v) || v == "true" || v == "false" {
buffer = append(buffer, v...)
} else {
buffer = append(buffer, strconv.Quote(v)...)
}
}
if appended == 0 {
fmt.Fprintf(os.Stderr, "%d: dropping field-less point\n", count)
continue
}
if len(buffer) > maxLen {
maxLen = len(buffer)
}
buffer = append(buffer, " "...)
buffer = append(buffer, strconv.FormatInt(ts.UnixNano(), 10)...)
buffer = append(buffer, "\n"...)
if _, err := out.Write(buffer); err != nil {
return err
}
}
}
return reader.Error()
}
}()
}