Skip to content

Commit

Permalink
Backport logstash concurrent event modification fix for #1410 (#1438)
Browse files Browse the repository at this point in the history
logstash output used to add metadata to an event. With potentially having
multiple outputs configured this is some bad practice. Instead the logstash
output plugin will add '@metadata' to the event when encoding and only if
'@metadata' is not already present in event
  • Loading branch information
Steffen Siering authored and tsg committed Apr 21, 2016
1 parent 34f7db5 commit 448a46d
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ https://github.com/elastic/beats/compare/v1.2.1...1.2[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*
- Fix race when multiple outputs access the same event with logstash output manipulating event {issue}1410[1410]

*Packetbeat*

Expand Down
54 changes: 53 additions & 1 deletion libbeat/outputs/logstash/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type lumberjackClient struct {
timeout time.Duration
countTimeoutErr int
compressLevel int

// the beat name, already json encoded
beat []byte
}

const (
Expand Down Expand Up @@ -65,6 +68,7 @@ func newLumberjackClient(
compressLevel int,
maxWindowSize int,
timeout time.Duration,
beat string,
) (*lumberjackClient, error) {

// validate by creating and discarding zlib writer with configured level
Expand All @@ -77,12 +81,18 @@ func newLumberjackClient(
w.Close()
}

encodedBeat, err := json.Marshal(beat)
if err != nil {
return nil, err
}

return &lumberjackClient{
TransportClient: conn,
windowSize: defaultStartMaxWindowSize,
timeout: timeout,
maxWindowSize: maxWindowSize,
compressLevel: compressLevel,
beat: encodedBeat,
}, nil
}

Expand Down Expand Up @@ -344,7 +354,7 @@ func (l *lumberjackClient) writeDataFrame(
// payloadLen (bytes): uint32
// payload: JSON document

jsonEvent, err := json.Marshal(event)
jsonEvent, err := serializeEvent(event, l.beat)
if err != nil {
debug("Fail to convert the event to JSON: %s", err)
return err
Expand All @@ -365,6 +375,48 @@ func (l *lumberjackClient) writeDataFrame(

return nil
}
func serializeEvent(event common.MapStr, beat []byte) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteRune('{')

if _, hasMeta := event["@metadata"]; !hasMeta {
typ := event["type"].(string)

buf.WriteString(`"@metadata":{"type":`)
tmp, err := json.Marshal(typ)
if err != nil {
return nil, err
}
buf.Write(tmp)

buf.WriteString(`,"beat":`)
buf.Write(beat)
buf.WriteString(`},`)
}

for k, v := range event {
// append key
tmp, err := json.Marshal(k)
if err != nil {
return nil, err
}
buf.Write(tmp)
buf.WriteRune(':')

// append value
tmp, err = json.Marshal(v)
if err != nil {
return nil, err
}
buf.Write(tmp)
buf.WriteRune(',')
}

b := buf.Bytes()
b[len(b)-1] = '}'

return b, nil
}

func writeUint32(out io.Writer, v uint32) error {
return binary.Write(out, binary.BigEndian, v)
Expand Down
9 changes: 5 additions & 4 deletions libbeat/outputs/logstash/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type mockTransport struct {
}

func newLumberjackTestClient(conn TransportClient) *lumberjackClient {
c, err := newLumberjackClient(conn, 3, testMaxWindowSize, 5*time.Second)
c, err := newLumberjackClient(conn, 3, testMaxWindowSize, 5*time.Second, "test")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestSimpleEvent(t *testing.T) {
transp := newMockTransport()
client := newClientTestDriver(newLumberjackTestClient(transp))

event := common.MapStr{"name": "me", "line": 10}
event := common.MapStr{"type": "test", "name": "me", "line": 10}
client.Publish([]common.MapStr{event})

// receive window message
Expand Down Expand Up @@ -357,6 +357,7 @@ func TestStructuredEvent(t *testing.T) {
transp := newMockTransport()
client := newClientTestDriver(newLumberjackTestClient(transp))
event := common.MapStr{
"type": "test",
"name": "test",
"struct": common.MapStr{
"field1": 1,
Expand Down Expand Up @@ -432,7 +433,7 @@ func testGrowWindowSize(t *testing.T,
initial, maxOK, windowSize, batchSize, expected int,
) {
enableLogging([]string{"logstash"})
c, _ := newLumberjackClient(nil, 3, windowSize, 1*time.Second)
c, _ := newLumberjackClient(nil, 3, windowSize, 1*time.Second, "test")
c.windowSize = initial
c.maxOkWindowSize = maxOK
for i := 0; i < 100; i++ {
Expand All @@ -447,7 +448,7 @@ func TestShrinkWindowSizeNeverZero(t *testing.T) {
enableLogging([]string{"logstash"})

windowSize := 124
c, _ := newLumberjackClient(nil, 3, windowSize, 1*time.Second)
c, _ := newLumberjackClient(nil, 3, windowSize, 1*time.Second, "test")
c.windowSize = windowSize
for i := 0; i < 100; i++ {
c.shrinkWindow()
Expand Down
22 changes: 4 additions & 18 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ func (lj *logstash) init(
}

clients, err = mode.MakeClients(config,
makeClientFactory(maxWindowSize, compressLevel, timeout,
makeClientFactory(config.Index, maxWindowSize, compressLevel, timeout,
makeTLSClient(defaultPort, tlsConfig)))
} else {
clients, err = mode.MakeClients(config,
makeClientFactory(maxWindowSize, compressLevel, timeout,
makeClientFactory(config.Index, maxWindowSize, compressLevel, timeout,
makeTCPClient(defaultPort)))
}
if err != nil {
Expand Down Expand Up @@ -134,6 +134,7 @@ func (lj *logstash) init(
}

func makeClientFactory(
beat string,
maxWindowSize int,
compressLevel int,
timeout time.Duration,
Expand All @@ -144,7 +145,7 @@ func makeClientFactory(
if err != nil {
return nil, err
}
return newLumberjackClient(transp, compressLevel, maxWindowSize, timeout)
return newLumberjackClient(transp, compressLevel, maxWindowSize, timeout, beat)
}
}

Expand All @@ -168,7 +169,6 @@ func (lj *logstash) PublishEvent(
opts outputs.Options,
event common.MapStr,
) error {
lj.addMeta(event)
return lj.mode.PublishEvent(signaler, opts, event)
}

Expand All @@ -179,19 +179,5 @@ func (lj *logstash) BulkPublish(
opts outputs.Options,
events []common.MapStr,
) error {
for _, event := range events {
lj.addMeta(event)
}
return lj.mode.PublishEvents(trans, opts, events)
}

// addMeta adapts events to be compatible with logstash forwarer messages by renaming
// the "message" field to "line". The lumberjack server in logstash will
// decode/rename the "line" field into "message".
func (lj *logstash) addMeta(event common.MapStr) {
// add metadata for indexing
event["@metadata"] = common.MapStr{
"beat": lj.index,
"type": event["type"].(string),
}
}

0 comments on commit 448a46d

Please sign in to comment.