Skip to content

Commit

Permalink
Add more filtering options to journald input (#29294)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds support for `unit`, `transports` and `syslog_identifiers` options for filtering.

This PR also introduces a breaking change to `include_matches` option. From now on it does not accept a list of expressions. Now both conjunction (AND) and disjunctions (OR) are supported when applying matches to journals.

Collecting entries of two different units:

```yaml
- type: journald
  include_matches.or:
  - equals:
    - _SYSTEMD_UNIT=my_unit
    - _SYSTEMD_UNIT=my_other_unit
```

Collecting entries using syslog transport for a unit
```yaml
- type: journald
  include_matches.and:
  - equals:
    - _SYSTEMD_UNIT=my_unit
    - _TRANSPORT=syslog
```

Although the configuration lets you write complex expressions, systemd does not provide full logical expression support.

## Why is it important?

When this change merged, journald input can be marked either beta or GA. Furthermore, now it provides similar filtering capabilities as the good old community Journalbeat did.
  • Loading branch information
kvch authored Jan 4, 2022
1 parent eaa3b32 commit fc7b8fc
Show file tree
Hide file tree
Showing 12 changed files with 530 additions and 22 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Index template's default_fields setting is only populated with ECS fields. {pull}28596[28596] {issue}28215[28215]
- Remove deprecated `--template` and `--ilm-policy` flags. Use `--index-management` instead. {pull}28870[28870]
- Remove options `logging.files.suffix` and default to datetime endings. {pull}28927[28927]
- Remove Journalbeat. Use `journald` input of Filebeat instead. {pull}29131[29131]
- `include_matches` option of `journald` input no longer accepts a list of string. {pull}29294[29294]
- Add job.name in pods controlled by Jobs {pull}28954[28954]

*Auditbeat*
Expand Down Expand Up @@ -148,6 +150,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add documentation for add_kubernetes_metadata processors `log_path` matcher. {pull}28868[28868]
- Add support for parsers on journald input {pull}29070[29070]
- Add support in httpjson input for oAuth2ProviderDefault of password grant_type. {pull}29087[29087]
- Add support for filtering in journald input with `unit`, `kernel`, `identifiers` and `include_matches`. {pull}29294[29294]
- Add new `userAgent` and `beatInfo` template functions for httpjson input {pull}29528[29528]

*Heartbeat*
Expand Down
17 changes: 14 additions & 3 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,21 @@ filebeat.inputs:
#id: service-foo

# You may wish to have separate inputs for each service. You can use
# include_matches to specify a list of filter expressions that are
# include_matches.or to specify a list of filter expressions that are
# applied as a logical OR. You may specify filter
#include_matches:
#- _SYSTEMD_UNIT=foo.service
#include_matches.or:
#- equals:
#- _SYSTEMD_UNIT=foo.service

# List of syslog identifiers
#syslog_identifiers: ["audit"]

# Collect events from the service and messages about the service,
# including coredumps.
#units: ["docker.service"]

# The list of transports (_TRANSPORT field of journald entries)
#transports: ["audit"]

# Parsers are also supported, here is an example of the multiline
# parser.
Expand Down
60 changes: 55 additions & 5 deletions filebeat/docs/inputs/input-journald.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ journal.
----

You may wish to have separate inputs for each service. You can use
`include_matches` to specify a list of filter expressions that are applied as a
logical OR. A good way to list the journald fields that are available for
`include_matches` to specify filtering expressions.
A good way to list the https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html[journald fields] that are available for
filtering messages is to run `journalctl -o json` to output logs and metadata as
JSON. This example collects logs from the `vault.service` systemd unit.

Expand All @@ -34,7 +34,7 @@ JSON. This example collects logs from the `vault.service` systemd unit.
{beatname_lc}.inputs:
- type: journald
id: service-vault
include_matches:
include_matches.match:
- _SYSTEMD_UNIT=vault.service
----

Expand All @@ -48,7 +48,7 @@ possible.
{beatname_lc}.inputs:
- type: journald
id: iptables
include_matches:
include_matches.match:
- _TRANSPORT=kernel
processors:
- drop_event:
Expand Down Expand Up @@ -133,14 +133,64 @@ If you have old log files and want to skip lines, start {beatname_uc} with
`seek: tail` specified. Then stop {beatname_uc}, set `seek: cursor`, and restart
{beatname_uc}.

[float]
[id="{beatname_lc}-input-{type}-units"]
==== `units`

Iterate only the entries of the units specified in this option. The iterated entries include
messages from the units, messages about the units by authorized daemons and coredumps. However,
it does not match systemd user units.

[float]
[id="{beatname_lc}-input-{type}-syslog-identifiers"]
==== `syslog_identifiers`

Read only the entries with the selected syslog identifiers.

[float]
[id="{beatname_lc}-input-{type}-transports"]
==== `transports`

Collect the messages using the specified transports. Example: syslog.

Valid transports:

* audit: messages from the kernel audit subsystem
* driver: internally generated messages
* syslog: messages received via the local syslog socket with the syslog protocol
* journal: messages received via the native journal protocol
* stdout: messages from a service's standard output or error output
* kernel: messages from the kernel

[float]
[id="{beatname_lc}-input-{type}-include-matches"]
==== `include_matches`

A list of filter expressions used to match fields. The format of the expression
A collection of filter expressions used to match fields. The format of the expression
is `field=value`. {beatname_uc} fetches all events that exactly match the
expressions. Pattern matching is not supported.

If you configured a filter expression, only entries with this field set will be iterated by the journald reader of Filebeat.
If the filter expressions apply to different fields, only entries with all fields set will be iterated.
If they apply to the same fields, only entries where the field takes one of the specified values will be iterated.

`match`: List of filter expressions to match fields.
`or`: The filter expressions listed under `or` are connected with a disjunction (or).
`and`: The filter expressions listed under `and` are connected with a conjunction (and).

Please note that these expressions are limited. You can build complex filtering, but full logical
expressions are not supported.

The following include matches configuration reads all `systemd` syslog entries:

["source","yaml",subs="attributes"]
----
include_matches.and:
- match:
- "journald.process.name=systemd"
- "systemd.transport=syslog"
----

To reference fields, use one of the following:

* The field name used by the systemd journal. For example,
Expand Down
17 changes: 14 additions & 3 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -973,10 +973,21 @@ filebeat.inputs:
#id: service-foo

# You may wish to have separate inputs for each service. You can use
# include_matches to specify a list of filter expressions that are
# include_matches.or to specify a list of filter expressions that are
# applied as a logical OR. You may specify filter
#include_matches:
#- _SYSTEMD_UNIT=foo.service
#include_matches.or:
#- equals:
#- _SYSTEMD_UNIT=foo.service

# List of syslog identifiers
#syslog_identifiers: ["audit"]

# Collect events from the service and messages about the service,
# including coredumps.
#units: ["docker.service"]

# The list of transports (_TRANSPORT field of journald entries)
#transports: ["audit"]

# Parsers are also supported, here is an example of the multiline
# parser.
Expand Down
11 changes: 10 additions & 1 deletion filebeat/input/journald/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@ type config struct {
CursorSeekFallback journalread.SeekMode `config:"cursor_seek_fallback"`

// Matches store the key value pairs to match entries.
Matches []journalfield.Matcher `config:"include_matches"`
Matches journalfield.IncludeMatches `config:"include_matches"`

// Units stores the units to monitor.
Units []string `config:"units"`

// Transports stores the list of transports to include in the messages.
Transports []string `config:"transports"`

// Identifiers stores the syslog identifiers to watch.
Identifiers []string `config:"syslog_identifiers"`

// SaveRemoteHostname defines if the original source of the entry needs to be saved.
SaveRemoteHostname bool `config:"save_remote_hostname"`
Expand Down
33 changes: 29 additions & 4 deletions filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ type journald struct {
MaxBackoff time.Duration
Seek journalread.SeekMode
CursorSeekFallback journalread.SeekMode
Matches []journalfield.Matcher
Matches journalfield.IncludeMatches
Units []string
Transports []string
Identifiers []string
SaveRemoteHostname bool
Parsers parser.Config
}
Expand Down Expand Up @@ -105,6 +108,9 @@ func configure(cfg *common.Config) ([]cursor.Source, cursor.Input, error) {
Seek: config.Seek,
CursorSeekFallback: config.CursorSeekFallback,
Matches: config.Matches,
Units: config.Units,
Transports: config.Transports,
Identifiers: config.Identifiers,
SaveRemoteHostname: config.SaveRemoteHostname,
Parsers: config.Parsers,
}, nil
Expand Down Expand Up @@ -156,7 +162,8 @@ func (inp *journald) Run(

func (inp *journald) open(log *logp.Logger, canceler input.Canceler, src cursor.Source) (*journalread.Reader, error) {
backoff := backoff.NewExpBackoff(canceler.Done(), inp.Backoff, inp.MaxBackoff)
reader, err := journalread.Open(log, src.Name(), backoff, withFilters(inp.Matches))
reader, err := journalread.Open(log, src.Name(), backoff,
withFilters(inp.Matches), withUnits(inp.Units), withTransports(inp.Transports), withSyslogIdentifiers(inp.Identifiers))
if err != nil {
return nil, sderr.Wrap(err, "failed to create reader for %{path} journal", src.Name())
}
Expand Down Expand Up @@ -184,9 +191,27 @@ func initCheckpoint(log *logp.Logger, c cursor.Cursor) checkpoint {
return cp
}

func withFilters(filters []journalfield.Matcher) func(*sdjournal.Journal) error {
func withFilters(filters journalfield.IncludeMatches) func(*sdjournal.Journal) error {
return func(j *sdjournal.Journal) error {
return journalfield.ApplyMatchersOr(j, filters)
return journalfield.ApplyIncludeMatches(j, filters)
}
}

func withUnits(units []string) func(*sdjournal.Journal) error {
return func(j *sdjournal.Journal) error {
return journalfield.ApplyUnitMatchers(j, units)
}
}

func withTransports(transports []string) func(*sdjournal.Journal) error {
return func(j *sdjournal.Journal) error {
return journalfield.ApplyTransportMatcher(j, transports)
}
}

func withSyslogIdentifiers(identifiers []string) func(*sdjournal.Journal) error {
return func(j *sdjournal.Journal) error {
return journalfield.ApplySyslogIdentifierMatcher(j, identifiers)
}
}

Expand Down
Loading

0 comments on commit fc7b8fc

Please sign in to comment.