diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3c750ac917be..7e7a8624cfbe 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -99,6 +99,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - `container` and `docker` inputs now support reading of labels and env vars written by docker JSON file logging driver. {issue}8358[8358] - Add `index` option to all inputs to directly set a per-input index value. {pull}14010[14010] +- Include log.source.address for unparseable syslog messages. {issue}13268[13268] {pull}15453[15453] *Heartbeat* diff --git a/filebeat/input/syslog/input.go b/filebeat/input/syslog/input.go index 10365c6d3f91..71efa8bcfc71 100644 --- a/filebeat/input/syslog/input.go +++ b/filebeat/input/syslog/input.go @@ -128,24 +128,8 @@ func NewInput( forwarder := harvester.NewForwarder(out) cb := func(data []byte, metadata inputsource.NetworkMetadata) { - ev := newEvent() - Parse(data, ev) - if !ev.IsValid() { - log.Errorw("can't parse event as syslog rfc3164", "message", string(data)) - // On error revert to the raw bytes content, we need a better way to communicate this kind of - // error upstream this should be a global effort. - forwarder.Send(beat.Event{ - Timestamp: time.Now(), - Meta: common.MapStr{ - "truncated": metadata.Truncated, - }, - Fields: common.MapStr{ - "message": string(data), - }, - }) - } else { - forwarder.Send(createEvent(ev, metadata, time.Local, log)) - } + ev := parseAndCreateEvent(data, metadata, time.Local, log) + forwarder.Send(ev) } server, err := factory(cb, config.Protocol) @@ -201,11 +185,6 @@ func (p *Input) Wait() { func createEvent(ev *event, metadata inputsource.NetworkMetadata, timezone *time.Location, log *logp.Logger) beat.Event { f := common.MapStr{ "message": strings.TrimRight(ev.Message(), "\n"), - "log": common.MapStr{ - "source": common.MapStr{ - "address": metadata.RemoteAddr.String(), - }, - }, } syslog := common.MapStr{} @@ -254,13 +233,31 @@ func createEvent(ev *event, metadata inputsource.NetworkMetadata, timezone *time f["event.sequence"] = ev.Sequence() } - return beat.Event{ - Timestamp: ev.Timestamp(timezone), + return newBeatEvent(ev.Timestamp(timezone), metadata, f) +} + +func parseAndCreateEvent(data []byte, metadata inputsource.NetworkMetadata, timezone *time.Location, log *logp.Logger) beat.Event { + ev := newEvent() + Parse(data, ev) + if !ev.IsValid() { + log.Errorw("can't parse event as syslog rfc3164", "message", string(data)) + return newBeatEvent(time.Now(), metadata, common.MapStr{ + "message": string(data), + }) + } + return createEvent(ev, metadata, time.Local, log) +} + +func newBeatEvent(timestamp time.Time, metadata inputsource.NetworkMetadata, fields common.MapStr) beat.Event { + event := beat.Event{ + Timestamp: timestamp, Meta: common.MapStr{ "truncated": metadata.Truncated, }, - Fields: f, + Fields: fields, } + event.Fields.Put("log.source.address", metadata.RemoteAddr.String()) + return event } func mapValueToName(v int, m mapper) (string, error) { diff --git a/filebeat/input/syslog/input_test.go b/filebeat/input/syslog/input_test.go index 566a9404ad6a..f52d235e3a48 100644 --- a/filebeat/input/syslog/input_test.go +++ b/filebeat/input/syslog/input_test.go @@ -193,6 +193,58 @@ func TestSequence(t *testing.T) { }) } +func TestParseAndCreateEvent(t *testing.T) { + cases := map[string]struct { + data []byte + expected common.MapStr + }{ + "valid data": { + data: []byte("<34>Oct 11 22:14:15 mymachine su[230]: 'su root' failed for lonvick on /dev/pts/8"), + expected: common.MapStr{ + "event": common.MapStr{"severity": 2}, + "hostname": "mymachine", + "log": common.MapStr{ + "source": common.MapStr{ + "address": "127.0.0.1", + }, + }, + "message": "'su root' failed for lonvick on /dev/pts/8", + "process": common.MapStr{"pid": 230, "program": "su"}, + "syslog": common.MapStr{ + "facility": 4, + "facility_label": "security/authorization", + "priority": 34, + "severity_label": "Critical", + }, + }, + }, + + "invalid data": { + data: []byte("invalid"), + expected: common.MapStr{ + "log": common.MapStr{ + "source": common.MapStr{ + "address": "127.0.0.1", + }, + }, + "message": "invalid", + }, + }, + } + + tz := time.Local + log := logp.NewLogger("syslog") + metadata := dummyMetadata() + + for title, c := range cases { + t.Run(title, func(t *testing.T) { + event := parseAndCreateEvent(c.data, metadata, tz, log) + assert.Equal(t, c.expected, event.Fields) + assert.Equal(t, metadata.Truncated, event.Meta["truncated"]) + }) + } +} + func dummyMetadata() inputsource.NetworkMetadata { ip := "127.0.0.1" parsedIP := net.ParseIP(ip) diff --git a/filebeat/tests/system/test_syslog.py b/filebeat/tests/system/test_syslog.py index ac450e7681e4..08a101d8b20a 100644 --- a/filebeat/tests/system/test_syslog.py +++ b/filebeat/tests/system/test_syslog.py @@ -49,6 +49,46 @@ def test_syslog_with_tcp(self): self.assert_syslog(output[0]) sock.close() + def test_syslog_with_tcp_invalid_message(self): + """ + Test syslog input with invalid events from TCP. + """ + host = "127.0.0.1" + port = 8080 + input_raw = """ +- type: syslog + protocol: + tcp: + host: "{}:{}" +""" + + input_raw = input_raw.format(host, port) + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Started listening for TCP connection")) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP + sock.connect((host, port)) + + for n in range(0, 2): + sock.send("invalid\n") + + self.wait_until(lambda: self.output_count(lambda x: x >= 2)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 2 + assert output[0]["message"] == "invalid" + assert len(output[0]["log.source.address"]) > 0 + sock.close() + def test_syslog_with_udp(self): """ Test syslog input with events from TCP.