Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat:filebeat/input/redis/slowlog] Add Redis replication role to slowlogs #40578

Merged
merged 8 commits into from
Sep 18, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Journald: removed configuration options `include_matches.or`, `include_matches.and`, `backoff`, `max_backoff`, `cursor_seek_fallback`. {pull}40061[40061]
- Journald: `include_matches.match` now behaves in the same way as matchers in `journalctl`. Users should carefully update their input configuration. {pull}40061[40061]
- Journald: `seek` and `since` behaviour have been simplified, if there is a cursor (state) `seek` and `since` are ignored and the cursor is used. {pull}40061[40061]
- Redis: Added replication role as a field to submitted slowlogs
- Added `container.image.name` to `journald` Filebeat input's Docker-specific translated fields. {pull}40450[40450]


Expand Down
18 changes: 14 additions & 4 deletions filebeat/input/redis/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,38 @@ func (h *Harvester) Run() error {
return nil
default:
}
// Writes Slowlog get and slowlog reset both to the buffer so they are executed together
// Writes Slowlog get, slowlog reset, and role to the buffer so they are executed together
if err := h.conn.Send("SLOWLOG", "GET"); err != nil {
return fmt.Errorf("error sending slowlog get: %w", err)
}
if err := h.conn.Send("SLOWLOG", "RESET"); err != nil {
return fmt.Errorf("error sending slowlog reset: %w", err)
}
if err := h.conn.Send("ROLE"); err != nil {
return fmt.Errorf("error sending role: %w", err)
}

// Flush the buffer to execute both commands and receive the reply from SLOWLOG GET
// Flush the buffer to execute all commands and receive the replies
h.conn.Flush()

// Receives first reply from redis which is the one from GET
// Receives first reply from redis which is the one from SLOWLOG GET
logs, err := rd.Values(h.conn.Receive())
if err != nil {
return fmt.Errorf("error receiving slowlog data: %w", err)
}

// Read reply from RESET
// Read reply from SLOWLOG RESET
_, err = h.conn.Receive()
if err != nil {
return fmt.Errorf("error receiving reset data: %w", err)
}

// Read reply from ROLE
role, err := h.conn.Receive()
if err != nil {
return fmt.Errorf("error receiving replication role: %w", err)
}

for _, item := range logs {
// Stopping here means some of the slowlog events are lost!
select {
Expand Down Expand Up @@ -146,6 +155,7 @@ func (h *Harvester) Run() error {
"duration": mapstr.M{
"us": log.duration,
},
"role": role,
mauri870 marked this conversation as resolved.
Show resolved Hide resolved
}

if log.args != nil {
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def test_input(self):

assert output["input.type"] == "redis"
assert "redis.slowlog.cmd" in output
assert "redis.slowlog.role" in output

def get_host(self):
return os.getenv('REDIS_HOST', 'localhost')
Expand Down
Loading