diff --git a/README.md b/README.md index 9c2c65cd9f244..ae1e63f369adf 100644 --- a/README.md +++ b/README.md @@ -364,11 +364,12 @@ For documentation on the latest development code see the [documentation index][d ## Serializers - [InfluxDB Line Protocol](/plugins/serializers/influx) -- [JSON](/plugins/serializers/json) +- [Carbon2](/plugins/serializers/carbon2) - [Graphite](/plugins/serializers/graphite) +- [JSON](/plugins/serializers/json) +- [MessagePack](/plugins/serializers/msgpack) - [ServiceNow](/plugins/serializers/nowmetric) - [SplunkMetric](/plugins/serializers/splunkmetric) -- [Carbon2](/plugins/serializers/carbon2) - [Wavefront](/plugins/serializers/wavefront) ## Processor Plugins diff --git a/docs/DATA_FORMATS_OUTPUT.md b/docs/DATA_FORMATS_OUTPUT.md index 0d0bdfff4bb27..720c922de6755 100644 --- a/docs/DATA_FORMATS_OUTPUT.md +++ b/docs/DATA_FORMATS_OUTPUT.md @@ -8,6 +8,7 @@ plugins. 1. [Carbon2](/plugins/serializers/carbon2) 1. [Graphite](/plugins/serializers/graphite) 1. [JSON](/plugins/serializers/json) +1. [MessagePack](/plugins/serializers/msgpack) 1. [Prometheus](/plugins/serializers/prometheus) 1. [Prometheus Remote Write](/plugins/serializers/prometheusremotewrite) 1. [ServiceNow Metrics](/plugins/serializers/nowmetric) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index ad499955067b4..97125ffd1a325 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -132,6 +132,7 @@ following works: - github.com/opencontainers/image-spec [Apache License 2.0](https://github.com/opencontainers/image-spec/blob/master/LICENSE) - github.com/opentracing/opentracing-go [Apache License 2.0](https://github.com/opentracing/opentracing-go/blob/master/LICENSE) - github.com/openzipkin/zipkin-go-opentracing [MIT License](https://github.com/openzipkin/zipkin-go-opentracing/blob/master/LICENSE) +- github.com/philhofer/fwd [MIT License](https://github.com/philhofer/fwd/blob/master/LICENSE.md) - github.com/pierrec/lz4 [BSD 3-Clause "New" or "Revised" License](https://github.com/pierrec/lz4/blob/master/LICENSE) - github.com/pkg/errors [BSD 2-Clause "Simplified" License](https://github.com/pkg/errors/blob/master/LICENSE) - github.com/pmezard/go-difflib [BSD 3-Clause Clear License](https://github.com/pmezard/go-difflib/blob/master/LICENSE) @@ -157,6 +158,7 @@ following works: - github.com/tidwall/gjson [MIT License](https://github.com/tidwall/gjson/blob/master/LICENSE) - github.com/tidwall/match [MIT License](https://github.com/tidwall/match/blob/master/LICENSE) - github.com/tidwall/pretty [MIT License](https://github.com/tidwall/pretty/blob/master/LICENSE) +- github.com/tinylib/msgp [MIT License](https://github.com/tinylib/msgp/blob/master/LICENSE) - github.com/vishvananda/netlink [Apache License 2.0](https://github.com/vishvananda/netlink/blob/master/LICENSE) - github.com/vishvananda/netns [Apache License 2.0](https://github.com/vishvananda/netns/blob/master/LICENSE) - github.com/vjeantet/grok [Apache License 2.0](https://github.com/vjeantet/grok/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 06fd402c51079..aaa19949dd710 100644 --- a/go.mod +++ b/go.mod @@ -126,6 +126,7 @@ require ( github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62 github.com/tedsuo/ifrit v0.0.0-20191009134036-9a97d0632f00 // indirect github.com/tidwall/gjson v1.6.0 + github.com/tinylib/msgp v1.1.5 github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e // indirect github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc // indirect github.com/vjeantet/grok v1.0.1 @@ -138,12 +139,11 @@ require ( go.starlark.net v0.0.0-20200901195727-6e684ef5eeee golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect - golang.org/x/net v0.0.0-20200904194848-62affa334b73 + golang.org/x/net v0.0.0-20201021035429-f5854403a974 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d - golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a - golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6 + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 + golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f golang.org/x/text v0.3.3 - golang.org/x/tools v0.0.0-20200317043434-63da46f3035e // indirect golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4 google.golang.org/api v0.20.0 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884 diff --git a/go.sum b/go.sum index c9bbf781149c7..db92645af3929 100644 --- a/go.sum +++ b/go.sum @@ -561,6 +561,8 @@ github.com/openzipkin/zipkin-go-opentracing v0.3.4/go.mod h1:js2AbwmHW0YD9DwIw2J github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b/go.mod h1:x/hU0bfdWIhuOT1SKwiJg++yvkk6EuOtJk8WtDZqgr8= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= @@ -671,6 +673,9 @@ github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= +github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= +github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec/go.mod h1:owBmyHYMLkxyrugmfwE/DLJyW8Ro9mkphwuVErQ0iUw= github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e h1:f1yevOHP+Suqk0rVc13fIkzcLULJbyQcXDba2klljD0= @@ -691,7 +696,7 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= -github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 h1:f6CCNiTjQZ0uWK4jPwhwYB8QIGGfn0ssD9kVzRUUUpk= github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4/go.mod h1:aEV29XrmTYFr3CiRxZeGHpkvbwq+prZduBqMaascyCU= go.opencensus.io v0.20.1 h1:pMEjRZ1M4ebWGikflH7nQpV6+Zr88KBMA2XJD3sbijw= @@ -763,6 +768,8 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0 h1:KU7oHjnv3XNWfa5COkzUifxZmxp1TyI7ImMXqFxLwvQ= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -792,11 +799,12 @@ golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA= golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421 h1:Wo7BWFiOk0QRFMLYMqJGFMd9CgUAcGx7V+qEg/h5IBI= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -811,8 +819,8 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o= -golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -852,6 +860,8 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6 h1:DvY3Zkh7KabQE/kfzMvYvKirSiguP9Q/veMtkYyf0o8= golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -902,8 +912,8 @@ golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200204074204-1cc6d1ef6c74/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20200317043434-63da46f3035e h1:8ogAbHWoJTPepnVbNRqXLOpzMkl0rtRsM7crbflc4XM= -golang.org/x/tools v0.0.0-20200317043434-63da46f3035e/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= +golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9 h1:sEvmEcJVKBNUvgCUClbUQeHOAa9U0I2Ce1BooMvVCY4= +golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/plugins/serializers/msgpack/README.md b/plugins/serializers/msgpack/README.md new file mode 100644 index 0000000000000..5607cc64c05bc --- /dev/null +++ b/plugins/serializers/msgpack/README.md @@ -0,0 +1,45 @@ +# MessagePack: + +MessagePack is an efficient binary serialization format. It lets you exchange data among multiple languages like JSON. + +https://msgpack.org + +### Format Definitions: + +Output of this format is MessagePack binary representation of metrics that have identical structure of the below JSON. + +``` +{ + "name":"cpu", + "time": , // https://github.com/msgpack/msgpack/blob/master/spec.md#timestamp-extension-type + "tags":{ + "tag_1":"host01", + ... + }, + "fields":{ + "field_1":30, + "field_2":true, + "field_3":"field_value" + "field_4":30.1 + ... + } +} +``` + +MessagePack has it's own timestamp representation. You can find additional informations from [MessagePack specification](https://github.com/msgpack/msgpack/blob/master/spec.md#timestamp-extension-type). + +### MessagePack Configuration: + +There are no additional configuration options for MessagePack format. + +```toml +[[outputs.file]] + ## Files to write to, "stdout" is a specially handled file. + files = ["stdout", "/tmp/metrics.out"] + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "msgpack" +``` \ No newline at end of file diff --git a/plugins/serializers/msgpack/metric.go b/plugins/serializers/msgpack/metric.go new file mode 100644 index 0000000000000..6b8a00878b6a8 --- /dev/null +++ b/plugins/serializers/msgpack/metric.go @@ -0,0 +1,104 @@ +package msgpack + +import ( + "encoding/binary" + "time" + + "github.com/tinylib/msgp/msgp" +) + +//go:generate msgp + +// Metric is structure to define MessagePack message format +// will be used by msgp code generator +type Metric struct { + Name string `msg:"name"` + Time MessagePackTime `msg:"time,extension"` + Tags map[string]string `msg:"tags"` + Fields map[string]interface{} `msg:"fields"` +} + +// MessagePackTime implements the official timestamp extension type +// https://github.com/msgpack/msgpack/blob/master/spec.md#timestamp-extension-type +// +// tinylib/msgp has been using their own custom extension type and the official extension +// is not available. (https://github.com/tinylib/msgp/issues/214) +type MessagePackTime struct { + time time.Time +} + +func init() { + msgp.RegisterExtension(-1, func() msgp.Extension { return new(MessagePackTime) }) +} + +// ExtensionType implements the Extension interface +func (*MessagePackTime) ExtensionType() int8 { + return -1 +} + +// Len implements the Extension interface +// The timestamp extension uses variable length encoding depending the input +// +// 32bits: [1970-01-01 00:00:00 UTC, 2106-02-07 06:28:16 UTC) range. If the nanoseconds part is 0 +// 64bits: [1970-01-01 00:00:00.000000000 UTC, 2514-05-30 01:53:04.000000000 UTC) range. +// 96bits: [-584554047284-02-23 16:59:44 UTC, 584554051223-11-09 07:00:16.000000000 UTC) range. +func (t *MessagePackTime) Len() int { + sec := t.time.Unix() + nsec := t.time.Nanosecond() + + if sec < 0 || sec >= (1<<34) { // 96 bits encoding + return 12 + } + if sec >= (1<<32) || nsec != 0 { + return 8 + } + return 4 +} + +// MarshalBinaryTo implements the Extension interface +func (t *MessagePackTime) MarshalBinaryTo(buf []byte) error { + len := t.Len() + + if len == 4 { + sec := t.time.Unix() + binary.BigEndian.PutUint32(buf, uint32(sec)) + } else if len == 8 { + sec := t.time.Unix() + nsec := t.time.Nanosecond() + + data := uint64(nsec)<<34 | (uint64(sec) & 0x03_ffff_ffff) + binary.BigEndian.PutUint64(buf, data) + } else if len == 12 { + sec := t.time.Unix() + nsec := t.time.Nanosecond() + + binary.BigEndian.PutUint32(buf, uint32(nsec)) + binary.BigEndian.PutUint64(buf[4:], uint64(sec)) + } + + return nil +} + +// UnmarshalBinary implements the Extension interface +func (t *MessagePackTime) UnmarshalBinary(buf []byte) error { + len := len(buf) + + if len == 4 { + sec := binary.BigEndian.Uint32(buf) + t.time = time.Unix(int64(sec), 0) + } else if len == 8 { + data := binary.BigEndian.Uint64(buf) + + nsec := (data & 0xfffffffc_00000000) >> 34 + sec := (data & 0x00000003_ffffffff) + + t.time = time.Unix(int64(sec), int64(nsec)) + } else if len == 12 { + nsec := binary.BigEndian.Uint32(buf) + sec := binary.BigEndian.Uint64(buf[4:]) + + t.time = time.Unix(int64(sec), int64(nsec)) + } + + return nil +} diff --git a/plugins/serializers/msgpack/metric_gen.go b/plugins/serializers/msgpack/metric_gen.go new file mode 100644 index 0000000000000..f02b0aba28503 --- /dev/null +++ b/plugins/serializers/msgpack/metric_gen.go @@ -0,0 +1,417 @@ +package msgpack + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *MessagePackTime) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z MessagePackTime) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 0 + err = en.Append(0x80) + if err != nil { + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z MessagePackTime) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 0 + o = append(o, 0x80) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *MessagePackTime) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z MessagePackTime) Msgsize() (s int) { + s = 1 + return +} + +// DecodeMsg implements msgp.Decodable +func (z *Metric) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "name": + z.Name, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Name") + return + } + case "time": + err = dc.ReadExtension(&z.Time) + if err != nil { + err = msgp.WrapError(err, "Time") + return + } + case "tags": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + if z.Tags == nil { + z.Tags = make(map[string]string, zb0002) + } else if len(z.Tags) > 0 { + for key := range z.Tags { + delete(z.Tags, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 string + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + z.Tags[za0001] = za0002 + } + case "fields": + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Fields") + return + } + if z.Fields == nil { + z.Fields = make(map[string]interface{}, zb0003) + } else if len(z.Fields) > 0 { + for key := range z.Fields { + delete(z.Fields, key) + } + } + for zb0003 > 0 { + zb0003-- + var za0003 string + var za0004 interface{} + za0003, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Fields") + return + } + za0004, err = dc.ReadIntf() + if err != nil { + err = msgp.WrapError(err, "Fields", za0003) + return + } + z.Fields[za0003] = za0004 + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *Metric) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "name" + err = en.Append(0x84, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteString(z.Name) + if err != nil { + err = msgp.WrapError(err, "Name") + return + } + // write "time" + err = en.Append(0xa4, 0x74, 0x69, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteExtension(&z.Time) + if err != nil { + err = msgp.WrapError(err, "Time") + return + } + // write "tags" + err = en.Append(0xa4, 0x74, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Tags))) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + for za0001, za0002 := range z.Tags { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + // write "fields" + err = en.Append(0xa6, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Fields))) + if err != nil { + err = msgp.WrapError(err, "Fields") + return + } + for za0003, za0004 := range z.Fields { + err = en.WriteString(za0003) + if err != nil { + err = msgp.WrapError(err, "Fields") + return + } + err = en.WriteIntf(za0004) + if err != nil { + err = msgp.WrapError(err, "Fields", za0003) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *Metric) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 4 + // string "name" + o = append(o, 0x84, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.Name) + // string "time" + o = append(o, 0xa4, 0x74, 0x69, 0x6d, 0x65) + o, err = msgp.AppendExtension(o, &z.Time) + if err != nil { + err = msgp.WrapError(err, "Time") + return + } + // string "tags" + o = append(o, 0xa4, 0x74, 0x61, 0x67, 0x73) + o = msgp.AppendMapHeader(o, uint32(len(z.Tags))) + for za0001, za0002 := range z.Tags { + o = msgp.AppendString(o, za0001) + o = msgp.AppendString(o, za0002) + } + // string "fields" + o = append(o, 0xa6, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73) + o = msgp.AppendMapHeader(o, uint32(len(z.Fields))) + for za0003, za0004 := range z.Fields { + o = msgp.AppendString(o, za0003) + o, err = msgp.AppendIntf(o, za0004) + if err != nil { + err = msgp.WrapError(err, "Fields", za0003) + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *Metric) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "name": + z.Name, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Name") + return + } + case "time": + bts, err = msgp.ReadExtensionBytes(bts, &z.Time) + if err != nil { + err = msgp.WrapError(err, "Time") + return + } + case "tags": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + if z.Tags == nil { + z.Tags = make(map[string]string, zb0002) + } else if len(z.Tags) > 0 { + for key := range z.Tags { + delete(z.Tags, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 string + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + z.Tags[za0001] = za0002 + } + case "fields": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Fields") + return + } + if z.Fields == nil { + z.Fields = make(map[string]interface{}, zb0003) + } else if len(z.Fields) > 0 { + for key := range z.Fields { + delete(z.Fields, key) + } + } + for zb0003 > 0 { + var za0003 string + var za0004 interface{} + zb0003-- + za0003, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Fields") + return + } + za0004, bts, err = msgp.ReadIntfBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Fields", za0003) + return + } + z.Fields[za0003] = za0004 + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *Metric) Msgsize() (s int) { + s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 5 + msgp.ExtensionPrefixSize + z.Time.Len() + 5 + msgp.MapHeaderSize + if z.Tags != nil { + for za0001, za0002 := range z.Tags { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) + } + } + s += 7 + msgp.MapHeaderSize + if z.Fields != nil { + for za0003, za0004 := range z.Fields { + _ = za0004 + s += msgp.StringPrefixSize + len(za0003) + msgp.GuessSize(za0004) + } + } + return +} diff --git a/plugins/serializers/msgpack/metric_gen_test.go b/plugins/serializers/msgpack/metric_gen_test.go new file mode 100644 index 0000000000000..e24d0a9b179c3 --- /dev/null +++ b/plugins/serializers/msgpack/metric_gen_test.go @@ -0,0 +1,236 @@ +package msgpack + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalMessagePackTime(t *testing.T) { + v := MessagePackTime{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgMessagePackTime(b *testing.B) { + v := MessagePackTime{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgMessagePackTime(b *testing.B) { + v := MessagePackTime{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalMessagePackTime(b *testing.B) { + v := MessagePackTime{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeMessagePackTime(t *testing.T) { + v := MessagePackTime{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeMessagePackTime Msgsize() is inaccurate") + } + + vn := MessagePackTime{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeMessagePackTime(b *testing.B) { + v := MessagePackTime{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeMessagePackTime(b *testing.B) { + v := MessagePackTime{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalMetric(t *testing.T) { + v := Metric{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgMetric(b *testing.B) { + v := Metric{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgMetric(b *testing.B) { + v := Metric{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalMetric(b *testing.B) { + v := Metric{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeMetric(t *testing.T) { + v := Metric{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeMetric Msgsize() is inaccurate") + } + + vn := Metric{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeMetric(b *testing.B) { + v := Metric{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeMetric(b *testing.B) { + v := Metric{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/plugins/serializers/msgpack/metric_test.go b/plugins/serializers/msgpack/metric_test.go new file mode 100644 index 0000000000000..e0ea25ebc88a7 --- /dev/null +++ b/plugins/serializers/msgpack/metric_test.go @@ -0,0 +1,143 @@ +package msgpack + +import ( + "encoding/hex" + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestMsgPackTime32(t *testing.T) { + // Maximum of 4 bytes encodable time + var sec int64 = 0xFFFFFFFF + var nsec int64 = 0 + t1 := MessagePackTime{time: time.Unix(sec, nsec)} + + assert.Equal(t, t1.Len(), 4) + + buf := make([]byte, t1.Len()) + assert.NoError(t, t1.MarshalBinaryTo(buf)) + + t2 := new(MessagePackTime) + t2.UnmarshalBinary(buf) + + assert.Equal(t, t1.time, t2.time) +} + +func TestMsgPackTime64(t *testing.T) { + // Maximum of 8 bytes encodable time + var sec int64 = 0x3FFFFFFFF + var nsec int64 = 999999999 + t1 := MessagePackTime{time: time.Unix(sec, nsec)} + + assert.Equal(t, t1.Len(), 8) + + buf := make([]byte, t1.Len()) + assert.NoError(t, t1.MarshalBinaryTo(buf)) + + t2 := new(MessagePackTime) + t2.UnmarshalBinary(buf) + + assert.Equal(t, t1.time, t2.time) +} + +func TestMsgPackTime96(t *testing.T) { + // Testing 12 bytes timestamp + var sec int64 = 0x400000001 + var nsec int64 = 111111111 + t1 := MessagePackTime{time: time.Unix(sec, nsec)} + + assert.Equal(t, t1.Len(), 12) + + buf := make([]byte, t1.Len()) + assert.NoError(t, t1.MarshalBinaryTo(buf)) + + t2 := new(MessagePackTime) + t2.UnmarshalBinary(buf) + + assert.True(t, t1.time.Equal(t2.time)) + + // Testing the default value: 0001-01-01T00:00:00Z + t1 = MessagePackTime{} + + assert.Equal(t, t1.Len(), 12) + assert.NoError(t, t1.MarshalBinaryTo(buf)) + + t2 = new(MessagePackTime) + t2.UnmarshalBinary(buf) + + assert.True(t, t1.time.Equal(t2.time)) +} + +func TestMsgPackTimeEdgeCases(t *testing.T) { + times := make([]time.Time, 0) + expected := make([][]byte, 0) + + // Unix epoch. Begin of 4bytes dates + // Nanoseconds: 0x00000000, Seconds: 0x0000000000000000 + ts, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z") + bs, _ := hex.DecodeString("d6ff00000000") + times = append(times, ts) + expected = append(expected, bs) + + // End of 4bytes dates + // Nanoseconds: 0x00000000, Seconds: 0x00000000ffffffff + ts, _ = time.Parse(time.RFC3339, "2106-02-07T06:28:15Z") + bs, _ = hex.DecodeString("d6ffffffffff") + times = append(times, ts) + expected = append(expected, bs) + + // Begin of 8bytes dates + // Nanoseconds: 0x00000000, Seconds: 0x0000000100000000 + ts, _ = time.Parse(time.RFC3339, "2106-02-07T06:28:16Z") + bs, _ = hex.DecodeString("d7ff0000000100000000") + times = append(times, ts) + expected = append(expected, bs) + + // Just after Unix epoch. Non zero nanoseconds + // Nanoseconds: 0x00000001, Seconds: 0x0000000000000000 + ts, _ = time.Parse(time.RFC3339Nano, "1970-01-01T00:00:00.000000001Z") + bs, _ = hex.DecodeString("d7ff0000000400000000") + times = append(times, ts) + expected = append(expected, bs) + + // End of 8bytes dates + // Nanoseconds: 0x00000000, Seconds: 0x00000003ffffffff + ts, _ = time.Parse(time.RFC3339Nano, "2514-05-30T01:53:03.000000000Z") + bs, _ = hex.DecodeString("d7ff00000003ffffffff") + times = append(times, ts) + expected = append(expected, bs) + + // Begin of 12bytes date + // Nanoseconds: 0x00000000, Seconds: 0x0000000400000000 + ts, _ = time.Parse(time.RFC3339Nano, "2514-05-30T01:53:04.000000000Z") + bs, _ = hex.DecodeString("c70cff000000000000000400000000") + times = append(times, ts) + expected = append(expected, bs) + + // Zero value, 0001-01-01T00:00:00Z + // Nanoseconds: 0x00000000, Seconds: 0xfffffff1886e0900 + ts = time.Time{} + bs, _ = hex.DecodeString("c70cff00000000fffffff1886e0900") + times = append(times, ts) + expected = append(expected, bs) + + // Max value + // Nanoseconds: 0x3b9ac9ff, Seconds: 0x7fffffffffffffff + ts = time.Unix(math.MaxInt64, 999_999_999).UTC() + bs, _ = hex.DecodeString("c70cff3b9ac9ff7fffffffffffffff") + times = append(times, ts) + expected = append(expected, bs) + + buf := make([]byte, 0) + for i, ts := range times { + t1 := MessagePackTime{time: ts} + m := Metric{Time: t1} + + buf = buf[:0] + buf, _ = m.MarshalMsg(buf) + assert.Equal(t, expected[i], buf[12:len(buf)-14]) + } +} diff --git a/plugins/serializers/msgpack/msgpack.go b/plugins/serializers/msgpack/msgpack.go new file mode 100644 index 0000000000000..d850bb8b004ca --- /dev/null +++ b/plugins/serializers/msgpack/msgpack.go @@ -0,0 +1,44 @@ +package msgpack + +import ( + "github.com/influxdata/telegraf" +) + +// Serializer encodes metrics in MessagePack format +type Serializer struct{} + +// NewSerializer creates a msgpack.Serializer +func NewSerializer() *Serializer { + return &Serializer{} +} + +func marshalMetric(buf []byte, metric telegraf.Metric) ([]byte, error) { + return (&Metric{ + Name: metric.Name(), + Time: MessagePackTime{time: metric.Time()}, + Tags: metric.Tags(), + Fields: metric.Fields(), + }).MarshalMsg(buf) +} + +// Serialize implements serializers.Serializer.Serialize +// github.com/influxdata/telegraf/plugins/serializers/Serializer +func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { + return marshalMetric(nil, metric) +} + +// SerializeBatch implements serializers.Serializer.SerializeBatch +// github.com/influxdata/telegraf/plugins/serializers/Serializer +func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + buf := make([]byte, 0) + for _, m := range metrics { + var err error + buf, err = marshalMetric(buf, m) + + if err != nil { + return nil, err + } + + } + return buf, nil +} diff --git a/plugins/serializers/msgpack/msgpack_test.go b/plugins/serializers/msgpack/msgpack_test.go new file mode 100644 index 0000000000000..a44ffae4515e3 --- /dev/null +++ b/plugins/serializers/msgpack/msgpack_test.go @@ -0,0 +1,132 @@ +package msgpack + +import ( + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +func toTelegrafMetric(m Metric) telegraf.Metric { + tm, _ := metric.New(m.Name, m.Tags, m.Fields, m.Time.time) + + return tm +} + +func TestSerializeMetricInt(t *testing.T) { + m := testutil.TestMetric(int64(90)) + + s := Serializer{} + var buf []byte + buf, err := s.Serialize(m) + assert.NoError(t, err) + + m2 := &Metric{} + left, err := m2.UnmarshalMsg(buf) + assert.NoError(t, err) + + assert.Equal(t, len(left), 0) + + testutil.RequireMetricEqual(t, m, toTelegrafMetric(*m2)) +} + +func TestSerializeMetricString(t *testing.T) { + m := testutil.TestMetric("foobar") + + s := Serializer{} + var buf []byte + buf, err := s.Serialize(m) + assert.NoError(t, err) + + m2 := &Metric{} + left, err := m2.UnmarshalMsg(buf) + assert.NoError(t, err) + + assert.Equal(t, len(left), 0) + + testutil.RequireMetricEqual(t, m, toTelegrafMetric(*m2)) +} + +func TestSerializeMultiFields(t *testing.T) { + m := testutil.TestMetric(int(90)) + m.AddField("value2", 8559615) + + s := Serializer{} + var buf []byte + buf, err := s.Serialize(m) + assert.NoError(t, err) + + m2 := &Metric{} + left, err := m2.UnmarshalMsg(buf) + assert.NoError(t, err) + + assert.Equal(t, len(left), 0) + + testutil.RequireMetricEqual(t, m, toTelegrafMetric(*m2)) +} + +func TestSerializeMetricWithEscapes(t *testing.T) { + m := testutil.TestMetric(int(90)) + m.AddField("U,age=Idle", int64(90)) + m.AddTag("cpu tag", "cpu0") + + s := Serializer{} + var buf []byte + buf, err := s.Serialize(m) + assert.NoError(t, err) + + m2 := &Metric{} + left, err := m2.UnmarshalMsg(buf) + assert.NoError(t, err) + + assert.Equal(t, len(left), 0) + + testutil.RequireMetricEqual(t, m, toTelegrafMetric(*m2)) +} + +func TestSerializeMultipleMetric(t *testing.T) { + m := testutil.TestMetric(int(90)) + + s := Serializer{} + + encoded, err := s.Serialize(m) + assert.NoError(t, err) + + // Multiple metrics in continous bytes stream + var buf []byte + buf = append(buf, encoded...) + buf = append(buf, encoded...) + buf = append(buf, encoded...) + buf = append(buf, encoded...) + + left := buf + for len(left) > 0 { + decodeM := &Metric{} + left, err = decodeM.UnmarshalMsg(left) + + assert.NoError(t, err) + testutil.RequireMetricEqual(t, m, toTelegrafMetric(*decodeM)) + } +} + +func TestSerializeBatch(t *testing.T) { + m := testutil.TestMetric(int(90)) + + metrics := []telegraf.Metric{m, m, m, m} + + s := Serializer{} + + buf, err := s.SerializeBatch(metrics) + assert.NoError(t, err) + + left := buf + for len(left) > 0 { + decodeM := &Metric{} + left, err = decodeM.UnmarshalMsg(left) + + assert.NoError(t, err) + testutil.RequireMetricEqual(t, m, toTelegrafMetric(*decodeM)) + } +} diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 61fb03c96562d..f6c62fc12cbda 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -2,7 +2,6 @@ package serializers import ( "fmt" - "github.com/influxdata/telegraf/plugins/serializers/prometheusremotewrite" "time" "github.com/influxdata/telegraf" @@ -10,8 +9,10 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/influxdata/telegraf/plugins/serializers/msgpack" "github.com/influxdata/telegraf/plugins/serializers/nowmetric" "github.com/influxdata/telegraf/plugins/serializers/prometheus" + "github.com/influxdata/telegraf/plugins/serializers/prometheusremotewrite" "github.com/influxdata/telegraf/plugins/serializers/splunkmetric" "github.com/influxdata/telegraf/plugins/serializers/wavefront" ) @@ -129,6 +130,8 @@ func NewSerializer(config *Config) (Serializer, error) { serializer, err = NewPrometheusSerializer(config) case "prometheusremotewrite": serializer, err = NewPrometheusRemoteWriteSerializer(config) + case "msgpack": + serializer, err = NewMsgpackSerializer() default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -240,3 +243,7 @@ func NewGraphiteSerializer(prefix, template string, tagSupport bool, separator s Templates: graphiteTemplates, }, nil } + +func NewMsgpackSerializer() (Serializer, error) { + return msgpack.NewSerializer(), nil +}