Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feature/use-with-ki…
Browse files Browse the repository at this point in the history
…nd-k8s-env

* upstream/main:
  fix typos and improve sentences (elastic#30432)
  Add drop and explicit tests to avoid duplicate ingest of elasticsearch logs (elastic#30440)
  {,x-pack/}auditbeat: replace uses of github.com/pkg/errors with stdlib equivalents (elastic#30321)
  Spelling fix (elastic#30439)
  packetbeat/beater: make sure Npcap installation runs before interfaces are needed in all cases (elastic#30438)
  Add BC about Homebrew no longer being available in 8.0 (elastic#30419)
  Install gawk as a replacement for mawk in Docker containers. (elastic#30452)
  Clean up python-related system tests (elastic#30415)
  Fix TestNewModuleRegistry flakiness (elastic#30453)
  [Filebeat] [auditd]: Support EXECVE events with truncated argument list (elastic#30382)
  Set `log.offset` to the start of the reported line in filestream (elastic#30445)
  clarify SelectedPackageTypes meaning and improve its usage (elastic#30142)
  [elasticsearch module] serialize shards properties (elastic#30408)
  Add docs about hints and templates autodiscovery priority (elastic#30343)
  • Loading branch information
v1v committed Feb 21, 2022
2 parents c20f180 + 452d447 commit 6c6cce5
Show file tree
Hide file tree
Showing 112 changed files with 1,577 additions and 963 deletions.
10 changes: 6 additions & 4 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif

- Fix field names with `add_network_direction` processor. {issue}29747[29747] {pull}29751[29751]
- Fix a logging bug when `ssl.verification_mode` was set to `full` or `certificate`, the command `test output` incorrectly logged that TLS was disabled.
- Fix the ability for subcommands to be ran properly from the beats containers. {pull}30452[30452]

*Auditbeat*

Expand All @@ -51,9 +52,10 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695]
- Fix multiple instances of the same module configured within `filebeat.modules` in filebeat.yml. {issue}29649[29649] {pull}29952[29952]
- aws-s3: fix race condition in states used by s3-poller. {issue}30123[30123] {pull}30131[30131]

*Filebeat*
- Fix broken Kafka input {issue}29746[29746] {pull}30277[30277]
- Report the starting offset of the line in `log.offset` when using `filestream` instead of the end to be ECS compliant. {pull}30445[30445]
- auditd: Prevent mapping explosion when truncated EXECVE records are ingested. {pull}30382[30382]
- elasticsearch: fix duplicate ingest when using a common appender configuration {issue}30428[30428] {pull}30440[30440]

*Heartbeat*
- Fix missing mapping for `service.name`. {pull}30324[30324]
Expand All @@ -73,7 +75,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif

*Functionbeat*

- Pass AWS region configuration correctly. {issue}28520[28520] {pull}30238[30238]
- Pass AWS region configuration correctly. {issue}28520[28520] {pull}30238[30238]


*Elastic Logging Plugin*
Expand Down Expand Up @@ -137,7 +139,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif

*Packetbeat*

- Add automated OEM Npcap installation handling. {pull}29112[29112] {pull}30396[30396]
- Add automated OEM Npcap installation handling. {pull}29112[29112] {pull}30438[30438]
- Add support for capturing TLS random number and OCSP status request details. {issue}29962[29962] {pull}30102[30102]

*Functionbeat*
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var withECSVersion = processing.WithFields(common.MapStr{

// AuditbeatSettings contains the default settings for auditbeat
func AuditbeatSettings() instance.Settings {
var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError)
return instance.Settings{
RunFlags: runFlags,
Name: Name,
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func OpenBucket(name string) (Bucket, error) {
initDatastoreOnce.Do(func() {
ds = &boltDatastore{
path: paths.Resolve(paths.Data, "beat.db"),
mode: 0600,
mode: 0o600,
}
})

Expand Down
19 changes: 9 additions & 10 deletions auditbeat/helper/hasher/hasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/dustin/go-humanize"
"github.com/joeshaw/multierror"
"github.com/pkg/errors"
"golang.org/x/crypto/blake2b"
"golang.org/x/crypto/sha3"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -143,22 +142,22 @@ func (c *Config) Validate() error {

for _, ht := range c.HashTypes {
if !ht.IsValid() {
errs = append(errs, errors.Errorf("invalid hash_types value '%v'", ht))
errs = append(errs, fmt.Errorf("invalid hash_types value '%v'", ht))
}
}

var err error

c.MaxFileSizeBytes, err = humanize.ParseBytes(c.MaxFileSize)
if err != nil {
errs = append(errs, errors.Wrap(err, "invalid max_file_size value"))
errs = append(errs, fmt.Errorf("invalid max_file_size value: %w", err))
} else if c.MaxFileSizeBytes <= 0 {
errs = append(errs, errors.Errorf("max_file_size value (%v) must be positive", c.MaxFileSize))
errs = append(errs, fmt.Errorf("max_file_size value (%v) must be positive", c.MaxFileSize))
}

c.ScanRateBytesPerSec, err = humanize.ParseBytes(c.ScanRatePerSec)
if err != nil {
errs = append(errs, errors.Wrap(err, "invalid scan_rate_per_sec value"))
errs = append(errs, fmt.Errorf("invalid scan_rate_per_sec value: %w", err))
}

return errs.Err()
Expand Down Expand Up @@ -189,22 +188,22 @@ func NewFileHasher(c Config, done <-chan struct{}) (*FileHasher, error) {
func (hasher *FileHasher) HashFile(path string) (map[HashType]Digest, error) {
info, err := os.Stat(path)
if err != nil {
return nil, errors.Wrapf(err, "failed to stat file %v", path)
return nil, fmt.Errorf("failed to stat file %v: %w", path, err)
}

// Throttle reading and hashing rate.
if len(hasher.config.HashTypes) > 0 {
err = hasher.throttle(info.Size())
if err != nil {
return nil, errors.Wrapf(err, "failed to hash file %v", path)
return nil, fmt.Errorf("failed to hash file %v: %w", path, err)
}
}

var hashes []hash.Hash
for _, hashType := range hasher.config.HashTypes {
h, valid := validHashes[hashType]
if !valid {
return nil, errors.Errorf("unknown hash type '%v'", hashType)
return nil, fmt.Errorf("unknown hash type '%v'", hashType)
}

hashes = append(hashes, h())
Expand All @@ -213,13 +212,13 @@ func (hasher *FileHasher) HashFile(path string) (map[HashType]Digest, error) {
if len(hashes) > 0 {
f, err := file.ReadOpen(path)
if err != nil {
return nil, errors.Wrap(err, "failed to open file for hashing")
return nil, fmt.Errorf("failed to open file for hashing: %w", err)
}
defer f.Close()

hashWriter := multiWriter(hashes)
if _, err := io.Copy(hashWriter, f); err != nil {
return nil, errors.Wrap(err, "failed to calculate file hashes")
return nil, fmt.Errorf("failed to calculate file hashes: %w", err)
}

nameToHash := make(map[HashType]Digest, len(hashes))
Expand Down
8 changes: 4 additions & 4 deletions auditbeat/helper/hasher/hasher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package hasher

import (
"errors"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

Expand All @@ -35,7 +35,7 @@ func TestHasher(t *testing.T) {
defer os.RemoveAll(dir)

file := filepath.Join(dir, "exe")
if err = ioutil.WriteFile(file, []byte("test exe\n"), 0600); err != nil {
if err = ioutil.WriteFile(file, []byte("test exe\n"), 0o600); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -69,7 +69,7 @@ func TestHasherLimits(t *testing.T) {
defer os.RemoveAll(dir)

file := filepath.Join(dir, "exe")
if err = ioutil.WriteFile(file, []byte("test exe\n"), 0600); err != nil {
if err = ioutil.WriteFile(file, []byte("test exe\n"), 0o600); err != nil {
t.Fatal(err)
}

Expand All @@ -88,5 +88,5 @@ func TestHasherLimits(t *testing.T) {
hashes, err := hasher.HashFile(file)
assert.Empty(t, hashes)
assert.Error(t, err)
assert.IsType(t, FileTooLargeError{}, errors.Cause(err))
assert.True(t, errors.As(err, &FileTooLargeError{}))
}
55 changes: 28 additions & 27 deletions auditbeat/module/auditd/audit_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package auditd

import (
"errors"
"fmt"
"os"
"runtime"
Expand All @@ -27,8 +28,6 @@ import (
"syscall"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
Expand Down Expand Up @@ -99,7 +98,7 @@ type MetricSet struct {
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := defaultConfig
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, errors.Wrap(err, "failed to unpack the auditd config")
return nil, fmt.Errorf("failed to unpack the auditd config: %w", err)
}

log := logp.NewLogger(moduleName)
Expand All @@ -108,7 +107,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

client, err := newAuditClient(&config, log)
if err != nil {
return nil, errors.Wrap(err, "failed to create audit client")
return nil, fmt.Errorf("failed to create audit client: %w", err)
}

reassemblerGapsMetric.Set(0)
Expand Down Expand Up @@ -255,15 +254,15 @@ func (ms *MetricSet) addRules(reporter mb.PushReporterV2) error {

client, err := libaudit.NewAuditClient(nil)
if err != nil {
return errors.Wrap(err, "failed to create audit client for adding rules")
return fmt.Errorf("failed to create audit client for adding rules: %w", err)
}
defer closeAuditClient(client)

// Don't attempt to change configuration if audit rules are locked (enabled == 2).
// Will result in EPERM.
status, err := client.GetStatus()
if err != nil {
err = errors.Wrap(err, "failed to get audit status before adding rules")
err = fmt.Errorf("failed to get audit status before adding rules: %w", err)
reporter.Error(err)
return err
}
Expand All @@ -274,7 +273,7 @@ func (ms *MetricSet) addRules(reporter mb.PushReporterV2) error {
// Delete existing rules.
n, err := client.DeleteRules()
if err != nil {
return errors.Wrap(err, "failed to delete existing rules")
return fmt.Errorf("failed to delete existing rules: %w", err)
}
ms.log.Infof("Deleted %v pre-existing audit rules.", n)

Expand All @@ -289,7 +288,7 @@ func (ms *MetricSet) addRules(reporter mb.PushReporterV2) error {
for _, rule := range rules {
if err = client.AddRule(rule.data); err != nil {
// Treat rule add errors as warnings and continue.
err = errors.Wrapf(err, "failed to add audit rule '%v'", rule.flags)
err = fmt.Errorf("failed to add audit rule '%v': %w", rule.flags, err)
reporter.Error(err)
ms.log.Warnw("Failure adding audit rule", "error", err)
failCount++
Expand All @@ -307,14 +306,17 @@ func (ms *MetricSet) initClient() error {
// required to ensure that auditing is enabled if the process is only
// given CAP_AUDIT_READ.
err := ms.client.SetEnabled(true, libaudit.NoWait)
return errors.Wrap(err, "failed to enable auditing in the kernel")
if err != nil {
return fmt.Errorf("failed to enable auditing in the kernel: %w", err)
}
return nil
}

// Unicast client initialization (requires CAP_AUDIT_CONTROL and that the
// process be in initial PID namespace).
status, err := ms.client.GetStatus()
if err != nil {
return errors.Wrap(err, "failed to get audit status")
return fmt.Errorf("failed to get audit status: %w", err)
}
ms.kernelLost.enabled = true
ms.kernelLost.counter = status.Lost
Expand All @@ -327,13 +329,13 @@ func (ms *MetricSet) initClient() error {

if fm, _ := ms.config.failureMode(); status.Failure != fm {
if err = ms.client.SetFailure(libaudit.FailureMode(fm), libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to set audit failure mode in kernel")
return fmt.Errorf("failed to set audit failure mode in kernel: %w", err)
}
}

if status.BacklogLimit != ms.config.BacklogLimit {
if err = ms.client.SetBacklogLimit(ms.config.BacklogLimit, libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to set audit backlog limit in kernel")
return fmt.Errorf("failed to set audit backlog limit in kernel: %w", err)
}
}

Expand All @@ -345,7 +347,7 @@ func (ms *MetricSet) initClient() error {
if status.FeatureBitmap&libaudit.AuditFeatureBitmapBacklogWaitTime != 0 {
ms.log.Info("Setting kernel backlog wait time to prevent backpressure propagating to the kernel.")
if err = ms.client.SetBacklogWaitTime(0, libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to set audit backlog wait time in kernel")
return fmt.Errorf("failed to set audit backlog wait time in kernel: %w", err)
}
} else {
if ms.backpressureStrategy == bsAuto {
Expand All @@ -365,38 +367,38 @@ func (ms *MetricSet) initClient() error {

if status.RateLimit != ms.config.RateLimit {
if err = ms.client.SetRateLimit(ms.config.RateLimit, libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to set audit rate limit in kernel")
return fmt.Errorf("failed to set audit rate limit in kernel: %w", err)
}
}

if status.Enabled == 0 {
if err = ms.client.SetEnabled(true, libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to enable auditing in the kernel")
return fmt.Errorf("failed to enable auditing in the kernel: %w", err)
}
}

if err := ms.client.WaitForPendingACKs(); err != nil {
return errors.Wrap(err, "failed to wait for ACKs")
return fmt.Errorf("failed to wait for ACKs: %w", err)
}

if err := ms.setPID(setPIDMaxRetries); err != nil {
if errno, ok := err.(syscall.Errno); ok && errno == syscall.EEXIST && status.PID != 0 {
return fmt.Errorf("failed to set audit PID. An audit process is already running (PID %d)", status.PID)
}
return errors.Wrapf(err, "failed to set audit PID (current audit PID %d)", status.PID)
return fmt.Errorf("failed to set audit PID (current audit PID %d): %w", status.PID, err)
}
return nil
}

func (ms *MetricSet) setPID(retries int) (err error) {
if err = ms.client.SetPID(libaudit.WaitForReply); err == nil || errors.Cause(err) != syscall.ENOBUFS || retries == 0 {
if err = ms.client.SetPID(libaudit.WaitForReply); err == nil || !errors.Is(err, syscall.ENOBUFS) || retries == 0 {
return err
}
// At this point the netlink channel is congested (ENOBUFS).
// Drain and close the client, then retry with a new client.
closeAuditClient(ms.client)
if ms.client, err = newAuditClient(&ms.config, ms.log); err != nil {
return errors.Wrapf(err, "failed to recover from ENOBUFS")
return fmt.Errorf("failed to recover from ENOBUFS: %w", err)
}
ms.log.Info("Recovering from ENOBUFS ...")
return ms.setPID(retries - 1)
Expand Down Expand Up @@ -438,7 +440,7 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi
}
reassembler, err := libaudit.NewReassembler(int(ms.config.ReassemblerMaxInFlight), ms.config.ReassemblerTimeout, st)
if err != nil {
return nil, errors.Wrap(err, "failed to create Reassembler")
return nil, fmt.Errorf("failed to create Reassembler: %w", err)
}
go maintain(done, reassembler)

Expand All @@ -450,7 +452,7 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi
for {
raw, err := ms.client.Receive(false)
if err != nil {
if errors.Cause(err) == syscall.EBADF {
if errors.Is(err, syscall.EBADF) {
// Client has been closed.
break
}
Expand Down Expand Up @@ -941,17 +943,17 @@ func kernelVersion() (major, minor int, full string, err error) {
release := string(data[:length])
parts := strings.SplitN(release, ".", 3)
if len(parts) < 2 {
return 0, 0, release, errors.Errorf("failed to parse uname release '%v'", release)
return 0, 0, release, fmt.Errorf("failed to parse uname release '%v'", release)
}

major, err = strconv.Atoi(parts[0])
if err != nil {
return 0, 0, release, errors.Wrapf(err, "failed to parse major version from '%v'", release)
return 0, 0, release, fmt.Errorf("failed to parse major version from '%v': %w", release, err)
}

minor, err = strconv.Atoi(parts[1])
if err != nil {
return 0, 0, release, errors.Wrapf(err, "failed to parse minor version from '%v'", release)
return 0, 0, release, fmt.Errorf("failed to parse minor version from '%v': %w", release, err)
}

return major, minor, release, nil
Expand All @@ -961,7 +963,7 @@ func determineSocketType(c *Config, log *logp.Logger) (string, error) {
client, err := libaudit.NewAuditClient(nil)
if err != nil {
if c.SocketType == "" {
return "", errors.Wrap(err, "failed to create audit client")
return "", fmt.Errorf("failed to create audit client: %w", err)
}
// Ignore errors if a socket type has been specified. It will fail during
// further setup and its necessary for unit tests to pass
Expand All @@ -971,7 +973,7 @@ func determineSocketType(c *Config, log *logp.Logger) (string, error) {
status, err := client.GetStatus()
if err != nil {
if c.SocketType == "" {
return "", errors.Wrap(err, "failed to get audit status")
return "", fmt.Errorf("failed to get audit status: %w", err)
}
return c.SocketType, nil
}
Expand Down Expand Up @@ -1031,7 +1033,6 @@ func determineSocketType(c *Config, log *logp.Logger) (string, error) {
}
return unicast, nil
}

}

func getBackpressureStrategy(value string, logger *logp.Logger) backpressureStrategy {
Expand Down
Loading

0 comments on commit 6c6cce5

Please sign in to comment.