forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add MessagePack output data format (influxdata#8828)
- Loading branch information
Showing
13 changed files
with
1,155 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# MessagePack: | ||
|
||
MessagePack is an efficient binary serialization format. It lets you exchange data among multiple languages like JSON. | ||
|
||
https://msgpack.org | ||
|
||
### Format Definitions: | ||
|
||
Output of this format is MessagePack binary representation of metrics that have identical structure of the below JSON. | ||
|
||
``` | ||
{ | ||
"name":"cpu", | ||
"time": <TIMESTAMP>, // https://github.com/msgpack/msgpack/blob/master/spec.md#timestamp-extension-type | ||
"tags":{ | ||
"tag_1":"host01", | ||
... | ||
}, | ||
"fields":{ | ||
"field_1":30, | ||
"field_2":true, | ||
"field_3":"field_value" | ||
"field_4":30.1 | ||
... | ||
} | ||
} | ||
``` | ||
|
||
MessagePack has it's own timestamp representation. You can find additional informations from [MessagePack specification](https://github.com/msgpack/msgpack/blob/master/spec.md#timestamp-extension-type). | ||
|
||
### MessagePack Configuration: | ||
|
||
There are no additional configuration options for MessagePack format. | ||
|
||
```toml | ||
[[outputs.file]] | ||
## Files to write to, "stdout" is a specially handled file. | ||
files = ["stdout", "/tmp/metrics.out"] | ||
|
||
## Data format to output. | ||
## Each data format has its own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||
data_format = "msgpack" | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package msgpack | ||
|
||
import ( | ||
"encoding/binary" | ||
"time" | ||
|
||
"github.com/tinylib/msgp/msgp" | ||
) | ||
|
||
//go:generate msgp | ||
|
||
// Metric is structure to define MessagePack message format | ||
// will be used by msgp code generator | ||
type Metric struct { | ||
Name string `msg:"name"` | ||
Time MessagePackTime `msg:"time,extension"` | ||
Tags map[string]string `msg:"tags"` | ||
Fields map[string]interface{} `msg:"fields"` | ||
} | ||
|
||
// MessagePackTime implements the official timestamp extension type | ||
// https://github.com/msgpack/msgpack/blob/master/spec.md#timestamp-extension-type | ||
// | ||
// tinylib/msgp has been using their own custom extension type and the official extension | ||
// is not available. (https://github.com/tinylib/msgp/issues/214) | ||
type MessagePackTime struct { | ||
time time.Time | ||
} | ||
|
||
func init() { | ||
msgp.RegisterExtension(-1, func() msgp.Extension { return new(MessagePackTime) }) | ||
} | ||
|
||
// ExtensionType implements the Extension interface | ||
func (*MessagePackTime) ExtensionType() int8 { | ||
return -1 | ||
} | ||
|
||
// Len implements the Extension interface | ||
// The timestamp extension uses variable length encoding depending the input | ||
// | ||
// 32bits: [1970-01-01 00:00:00 UTC, 2106-02-07 06:28:16 UTC) range. If the nanoseconds part is 0 | ||
// 64bits: [1970-01-01 00:00:00.000000000 UTC, 2514-05-30 01:53:04.000000000 UTC) range. | ||
// 96bits: [-584554047284-02-23 16:59:44 UTC, 584554051223-11-09 07:00:16.000000000 UTC) range. | ||
func (t *MessagePackTime) Len() int { | ||
sec := t.time.Unix() | ||
nsec := t.time.Nanosecond() | ||
|
||
if sec < 0 || sec >= (1<<34) { // 96 bits encoding | ||
return 12 | ||
} | ||
if sec >= (1<<32) || nsec != 0 { | ||
return 8 | ||
} | ||
return 4 | ||
} | ||
|
||
// MarshalBinaryTo implements the Extension interface | ||
func (t *MessagePackTime) MarshalBinaryTo(buf []byte) error { | ||
len := t.Len() | ||
|
||
if len == 4 { | ||
sec := t.time.Unix() | ||
binary.BigEndian.PutUint32(buf, uint32(sec)) | ||
} else if len == 8 { | ||
sec := t.time.Unix() | ||
nsec := t.time.Nanosecond() | ||
|
||
data := uint64(nsec)<<34 | (uint64(sec) & 0x03_ffff_ffff) | ||
binary.BigEndian.PutUint64(buf, data) | ||
} else if len == 12 { | ||
sec := t.time.Unix() | ||
nsec := t.time.Nanosecond() | ||
|
||
binary.BigEndian.PutUint32(buf, uint32(nsec)) | ||
binary.BigEndian.PutUint64(buf[4:], uint64(sec)) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// UnmarshalBinary implements the Extension interface | ||
func (t *MessagePackTime) UnmarshalBinary(buf []byte) error { | ||
len := len(buf) | ||
|
||
if len == 4 { | ||
sec := binary.BigEndian.Uint32(buf) | ||
t.time = time.Unix(int64(sec), 0) | ||
} else if len == 8 { | ||
data := binary.BigEndian.Uint64(buf) | ||
|
||
nsec := (data & 0xfffffffc_00000000) >> 34 | ||
sec := (data & 0x00000003_ffffffff) | ||
|
||
t.time = time.Unix(int64(sec), int64(nsec)) | ||
} else if len == 12 { | ||
nsec := binary.BigEndian.Uint32(buf) | ||
sec := binary.BigEndian.Uint64(buf[4:]) | ||
|
||
t.time = time.Unix(int64(sec), int64(nsec)) | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.