From 98972eb541f4af6b69fbe4f9e99a9aae083e039c Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 19 May 2020 16:58:13 -0400 Subject: [PATCH] Add ID to journalbeat input (#18467) (#18590) Add an `id` configuration option to the Journalbeat input to make it possible to start multiple inputs on the same journal. As an administrator this provides more flexibility in how you can deploy Journalbeat. It gives you the configuration grainularity you need to be able to have an input for each service or log type. You can apply different `include_matches` to each input. And you can easily add new log types at any point and make Journalbeat read all matching logs from the "head" without affecting existing input cursor positions. I also fixed a test that was modifying a file under source control by copying it into the build directory before applying changes. (cherry picked from commit 74a5bd29dada8d3fa2019a0c1c4300facef63cf3) --- CHANGELOG.next.asciidoc | 3 + Vagrantfile | 7 +- .../_meta/config/beat.reference.yml.tmpl | 5 + journalbeat/_meta/config/beat.yml.tmpl | 5 + journalbeat/docs/config-options.asciidoc | 27 ++++- journalbeat/input/config.go | 2 + journalbeat/input/input.go | 30 +++-- journalbeat/journalbeat.reference.yml | 5 + journalbeat/journalbeat.yml | 5 + journalbeat/reader/config.go | 2 + journalbeat/reader/journal.go | 2 +- .../tests/system/config/journalbeat.yml.j2 | 82 +------------ journalbeat/tests/system/test_base.py | 110 ++++++++++++++---- 13 files changed, 169 insertions(+), 116 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f35b9328249..c9ef1985c55 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -423,6 +423,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Journalbeat* +- Added an `id` config option to inputs to allow running multiple inputs on the + same journal. {pull}18467{18467} + *Metricbeat* - Move the windows pdh implementation from perfmon to a shared location in order for future modules/metricsets to make use of. {pull}15503[15503] diff --git a/Vagrantfile b/Vagrantfile index fdff82a57ea..81560b73434 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -135,7 +135,7 @@ def linuxDebianProvision() #!/usr/bin/env bash set -eio pipefail apt-get update -apt-get install -y make gcc python3 python3-pip python3-venv git +apt-get install -y make gcc python3 python3-pip python3-venv git libsystemd-dev SCRIPT end @@ -229,6 +229,11 @@ Vagrant.configure(2) do |config| c.vm.box = "ubuntu/bionic64" c.vm.network :forwarded_port, guest: 22, host: 2228, id: "ssh", auto_correct: true + c.vm.provider :virtualbox do |vbox| + vbox.memory = 4096 + vbox.cpus = 4 + end + c.vm.provision "shell", inline: $unixProvision, privileged: false c.vm.provision "shell", inline: linuxGvmProvision, privileged: false c.vm.provision "shell", inline: linuxDebianProvision diff --git a/journalbeat/_meta/config/beat.reference.yml.tmpl b/journalbeat/_meta/config/beat.reference.yml.tmpl index 3d4bc90b6f1..8da73b6bf07 100644 --- a/journalbeat/_meta/config/beat.reference.yml.tmpl +++ b/journalbeat/_meta/config/beat.reference.yml.tmpl @@ -18,6 +18,11 @@ journalbeat.inputs: # When empty starts to read from local journal. - paths: [] + # An optional unique identifier for the input. By providing a unique `id` you + # can operate multiple inputs on the same journal. This allows each input's + # cursor to be persisted independently in the registry file. + #id: "" + # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. diff --git a/journalbeat/_meta/config/beat.yml.tmpl b/journalbeat/_meta/config/beat.yml.tmpl index 9410e82a925..a4f7d5d4c31 100644 --- a/journalbeat/_meta/config/beat.yml.tmpl +++ b/journalbeat/_meta/config/beat.yml.tmpl @@ -18,6 +18,11 @@ journalbeat.inputs: # When empty starts to read from local journal. - paths: [] + # An optional unique identifier for the input. By providing a unique `id` you + # can operate multiple inputs on the same journal. This allows each input's + # cursor to be persisted independently in the registry file. + #id: "" + # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. diff --git a/journalbeat/docs/config-options.asciidoc b/journalbeat/docs/config-options.asciidoc index 816207c1ad2..106318ceb6d 100644 --- a/journalbeat/docs/config-options.asciidoc +++ b/journalbeat/docs/config-options.asciidoc @@ -75,6 +75,27 @@ in a Docker container. However, in this example the fields are matched using the You can specify the following options to configure how {beatname_uc} reads the journal files. +[float] +[id="{beatname_lc}-id"] +==== `id` + +An optional unique identifier for the input. By providing a unique `id` you can +operate multiple inputs on the same journal. This allows each input's cursor +to be persisted independently in the registry file. + +---- +{beatname_lc}.inputs: +- id: consul.service + paths: [] + include_matches: + - _SYSTEMD_UNIT=consul.service + +- id: vault.service + paths: [] + include_matches: + - _SYSTEMD_UNIT=vault.service +---- + [float] [id="{beatname_lc}-paths"] ==== `paths` @@ -108,10 +129,10 @@ The position to start reading the journal from. Valid settings are: * `head`: Starts reading at the beginning of the journal. After a restart, {beatname_uc} resends all log messages in the journal. -* `tail`: Starts reading at the end of the journal. After a restart, +* `tail`: Starts reading at the end of the journal. After a restart, {beatname_uc} resends the last message, which might result in duplicates. If multiple log messages are written to a journal while {beatname_uc} is down, -only the last log message is sent on restart. +only the last log message is sent on restart. * `cursor`: On first read, starts reading at the beginning of the journal. After a reload or restart, continues reading at the last known position. @@ -207,7 +228,7 @@ journald fields: The following translated fields for https://docs.docker.com/config/containers/logging/journald/[Docker] are also -available: +available: [horizontal] `CONTAINER_ID`:: `container.id_truncated` diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index 00eb871cb4c..63c31ccfce7 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -28,6 +28,8 @@ import ( // Config stores the options of an input. type Config struct { + // Unique ID of the input for state persistence purposes. + ID string `config:"id"` // Paths stores the paths to the journal files to be read. Paths []string `config:"paths"` // Backoff is the current interval to wait before diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index ca39b082049..368c2f52c13 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -25,8 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/fmtstr" - "github.com/gofrs/uuid" - "github.com/elastic/beats/v7/journalbeat/checkpoint" "github.com/elastic/beats/v7/journalbeat/reader" "github.com/elastic/beats/v7/libbeat/beat" @@ -43,7 +41,6 @@ type Input struct { pipeline beat.Pipeline client beat.Client states map[string]checkpoint.JournalState - id uuid.UUID logger *logp.Logger eventMeta common.EventMetadata processors beat.ProcessorList @@ -61,13 +58,11 @@ func New( return nil, err } - id, err := uuid.NewV4() - if err != nil { - return nil, fmt.Errorf("error while generating ID for input: %v", err) + logger := logp.NewLogger("input") + if config.ID != "" { + logger = logger.With("id", config.ID) } - logger := logp.NewLogger("input").With("id", id) - var readers []*reader.Reader if len(config.Paths) == 0 { cfg := reader.Config{ @@ -78,9 +73,10 @@ func New( CursorSeekFallback: config.CursorSeekFallback, Matches: config.Matches, SaveRemoteHostname: config.SaveRemoteHostname, + CheckpointID: checkpointID(config.ID, reader.LocalSystemJournalID), } - state := states[reader.LocalSystemJournalID] + state := states[cfg.CheckpointID] r, err := reader.NewLocal(cfg, done, state, logger) if err != nil { return nil, fmt.Errorf("error creating reader for local journal: %v", err) @@ -97,8 +93,10 @@ func New( CursorSeekFallback: config.CursorSeekFallback, Matches: config.Matches, SaveRemoteHostname: config.SaveRemoteHostname, + CheckpointID: checkpointID(config.ID, p), } - state := states[p] + + state := states[cfg.CheckpointID] r, err := reader.New(cfg, done, state, logger) if err != nil { return nil, fmt.Errorf("error creating reader for journal: %v", err) @@ -119,7 +117,6 @@ func New( config: config, pipeline: b.Publisher, states: states, - id: id, logger: logger, eventMeta: config.EventMetadata, processors: inputProcessors, @@ -138,7 +135,7 @@ func (i *Input) Run() { Processor: i.processors, }, ACKCount: func(n int) { - i.logger.Infof("journalbeat successfully published %d events", n) + i.logger.Debugw("journalbeat successfully published events", "event.count", n) }, }) if err != nil { @@ -233,3 +230,12 @@ func processorsForInput(beatInfo beat.Info, config Config) (*processors.Processo return procs, nil } + +// checkpointID returns the identifier used to track persistent state for the +// input. +func checkpointID(id, path string) string { + if id == "" { + return path + } + return "journald::" + path + "::" + id +} diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 7a9b0a62e2a..6824dfe28cf 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -18,6 +18,11 @@ journalbeat.inputs: # When empty starts to read from local journal. - paths: [] + # An optional unique identifier for the input. By providing a unique `id` you + # can operate multiple inputs on the same journal. This allows each input's + # cursor to be persisted independently in the registry file. + #id: "" + # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index 2dfa8a07f5f..e3d72984701 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -18,6 +18,11 @@ journalbeat.inputs: # When empty starts to read from local journal. - paths: [] + # An optional unique identifier for the input. By providing a unique `id` you + # can operate multiple inputs on the same journal. This allows each input's + # cursor to be persisted independently in the registry file. + #id: "" + # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. diff --git a/journalbeat/reader/config.go b/journalbeat/reader/config.go index d9ad5769688..f8dbabe5fee 100644 --- a/journalbeat/reader/config.go +++ b/journalbeat/reader/config.go @@ -41,6 +41,8 @@ type Config struct { Matches []string // SaveRemoteHostname defines if the original source of the entry needs to be saved. SaveRemoteHostname bool + // CheckpointID is the identifier to use when persisting state. + CheckpointID string } const ( diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index 9c599d29657..f364afe6d6b 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -267,7 +267,7 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { } state := checkpoint.JournalState{ - Path: r.config.Path, + Path: r.config.CheckpointID, Cursor: entry.Cursor, RealtimeTimestamp: entry.RealtimeTimestamp, MonotonicTimestamp: entry.MonotonicTimestamp, diff --git a/journalbeat/tests/system/config/journalbeat.yml.j2 b/journalbeat/tests/system/config/journalbeat.yml.j2 index 7486f9914ef..858181ca409 100644 --- a/journalbeat/tests/system/config/journalbeat.yml.j2 +++ b/journalbeat/tests/system/config/journalbeat.yml.j2 @@ -1,85 +1,13 @@ -################### Beat Configuration ######################### journalbeat.inputs: -- paths: [{{ journal_path }}] - seek: {{ seek_method }} - {% if cursor_seek_fallback %} - cursor_seek_fallback: {{ cursor_seek_fallback }} - {% endif %} - include_matches: [{{ matches }}] +{% for input in inputs %} +- {{ input | tojson }} +{% endfor %} +{% if registry_file is defined %} journalbeat.registry_file: {{ registry_file }} +{% endif %} -############################# Output ########################################## - -# Configure what outputs to use when sending the data collected by the beat. -# You can enable one or multiple outputs by setting enabled option to true. output: - - ### File as output file: - # Enabling file output - enabled: true - - # Path to the directory where to save the generated files. The option is mandatory. path: {{ output_file_path|default(beat.working_dir + "/output") }} - - - # Name of the generated files. The default is `journalbeat` and it generates - # files: `journalbeat`, `journalbeat.1`, `journalbeat.2`, etc. filename: {{ output_file_filename|default("journalbeat") }} - - # Maximum size in kilobytes of each file. When this size is reached, the files are - # rotated. The default value is 10 MB. - #rotate_every_kb: 10000 - - # Maximum number of files under path. When this number of files is reached, the - # oldest file is deleted and the rest are shifted from last to first. The default - # is 7 files. - #number_of_files: 7 - - - -############################# Beat ######################################### - -# The name of the shipper that publishes the network data. It can be used to group -# all the transactions sent by a single shipper in the web interface. -# If this options is not defined, the hostname is used. -#name: - -# The tags of the shipper are included in their own field with each -# transaction published. Tags make it easy to group servers by different -# logical properties. -#tags: ["service-X", "web-tier"] - - - -############################# Logging ######################################### - -#logging: - # Send all logging output to syslog. On Windows default is false, otherwise - # default is true. - #to_syslog: true - - # Write all logging output to files. Beats automatically rotate files if configurable - # limit is reached. - #to_files: false - - # Enable debug output for selected components. - #selectors: [] - - # Set log level - #level: error - - #files: - # The directory where the log files will written to. - #path: /var/log/journalbeat - - # The name of the files where the logs are written to. - #name: journalbeat - - # Configure log file size limit. If limit is reached, log file will be - # automatically rotated - #rotateeverybytes: 10485760 # = 10MB - - # Number of rotated log files to keep. Oldest files will be deleted first. - #keepfiles: 7 diff --git a/journalbeat/tests/system/test_base.py b/journalbeat/tests/system/test_base.py index c85f1ef9f16..a94d4a7473c 100644 --- a/journalbeat/tests/system/test_base.py +++ b/journalbeat/tests/system/test_base.py @@ -5,6 +5,7 @@ import unittest import time import yaml +from shutil import copyfile class Test(BaseTest): @@ -16,7 +17,9 @@ def test_start_with_local_journal(self): """ self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*" + inputs=[{ + "paths": [], + }], ) journalbeat_proc = self.start_beat() @@ -32,9 +35,12 @@ def test_start_with_journal_directory(self): """ self.render_config_template( - journal_path=self.beat_path + "/tests/system/input/", - seek_method="tail", - path=os.path.abspath(self.working_dir) + "/log/*" + inputs=[{ + "paths": [ + self.beat_path + "/tests/system/input/", + ], + "seek": "tail", + }], ) journalbeat_proc = self.start_beat() @@ -58,9 +64,12 @@ def test_start_with_selected_journal_file(self): """ self.render_config_template( - journal_path=self.beat_path + "/tests/system/input/test.journal", - seek_method="head", - path=os.path.abspath(self.working_dir) + "/log/*" + inputs=[{ + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + "seek": "head", + }], ) journalbeat_proc = self.start_beat() @@ -86,10 +95,13 @@ def test_start_with_selected_journal_file_with_cursor_fallback(self): """ self.render_config_template( - journal_path=self.beat_path + "/tests/system/input/test.journal", - seek_method="cursor", - cursor_seek_fallback="tail", - path=os.path.abspath(self.working_dir) + "/log/*" + inputs=[{ + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + "seek": "cursor", + "cursor_seek_fallback": "tail", + }], ) journalbeat_proc = self.start_beat() @@ -114,15 +126,19 @@ def test_read_events_with_existing_registry(self): Journalbeat is able to follow reading a from a journal with an existing registry file. """ - registry_path = self.beat_path + "/tests/system/input/test.registry" + registry_path = os.path.join(os.path.abspath(self.working_dir), "data", "registry") + os.mkdir(os.path.dirname(registry_path)) + copyfile(self.beat_path + "/tests/system/input/test.registry", + os.path.join(os.path.abspath(self.working_dir), "data/registry")) input_path = self.beat_path + "/tests/system/input/test.journal" self._prepare_registry_file(registry_path, input_path) self.render_config_template( - journal_path=input_path, - seek_method="cursor", - registry_file=registry_path, - path=os.path.abspath(self.working_dir) + "/log/*", + inputs=[{ + "paths": [input_path], + "seek": "cursor", + "cursor_seek_fallback": "tail", + }], ) journalbeat_proc = self.start_beat() @@ -134,7 +150,7 @@ def test_read_events_with_existing_registry(self): # message can be read from test journal "please report the conditions when this event happened to", # only one event is read and published - "journalbeat successfully published 1 events", + 'journalbeat successfully published events\t{"event.count": 1}', ] for snippet in required_log_snippets: self.wait_until(lambda: self.log_contains(snippet), @@ -150,10 +166,15 @@ def test_read_events_with_include_matches(self): """ self.render_config_template( - journal_path=self.beat_path + "/tests/system/input/test.journal", - seek_method="head", - matches="syslog.priority=5", - path=os.path.abspath(self.working_dir) + "/log/*", + inputs=[{ + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + "seek": "head", + "include_matches": [ + "syslog.priority=5", + ] + }], ) journalbeat_proc = self.start_beat() @@ -167,7 +188,7 @@ def test_read_events_with_include_matches(self): "please report the conditions when this event happened to", "unhandled HKEY event 0x60b1", # Four events with priority 5 is publised - "journalbeat successfully published 4 events", + 'journalbeat successfully published events\t{"event.count": 4}', ] for snippet in required_log_snippets: self.wait_until(lambda: self.log_contains(snippet), @@ -176,6 +197,45 @@ def test_read_events_with_include_matches(self): exit_code = journalbeat_proc.kill_and_wait() assert exit_code == 0 + @unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux") + def test_input_id(self): + """ + Journalbeat persists states with IDs. + """ + + self.render_config_template( + inputs=[ + { + "id": "serviceA.unit", + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + }, + { + "id": "serviceB unit", + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + } + ], + ) + + # Run the beat until it publishes events from both inputs. + journalbeat_proc = self.start_beat() + expected_msg = 'successfully published events' + self.wait_until(lambda: self.log_contains(expected_msg)) + self.wait_until(lambda: self.log_contains(expected_msg)) + journalbeat_proc.check_kill_and_wait() + + # Verify that registry paths are prefixed with an ID. + registry_data = self.read_registry() + self.assertIn("journal_entries", registry_data) + journal_entries = registry_data['journal_entries'] + self.assertGreater(len(journal_entries), 0) + for item in journal_entries: + self.assertTrue(item['path'].startswith('journald::'), "starts with journald::") + self.assertTrue(item['path'].find('::service'), "ends with ::") + def _prepare_registry_file(self, registry_path, journal_path): lines = [] with open(registry_path, "r") as registry_file: @@ -186,6 +246,12 @@ def _prepare_registry_file(self, registry_path, journal_path): for line in lines: registry_file.write(line) + def read_registry(self): + registry_path = os.path.join(os.path.abspath(self.working_dir), "data", "registry") + + with open(registry_path, "r") as stream: + return yaml.safe_load(stream) + if __name__ == '__main__': unittest.main()