Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feature/ci-pipeli…
Browse files Browse the repository at this point in the history
…ne-2.0

* upstream/master:
  [Elastic Agent] Add skeleton for client/server for agent control protocol (elastic#20163)
  Auditbeat: Allow multiple instances by grouping kprobes by PID (elastic#20325)
  [Filebeat][Fortinet] Remove pre populated event.timezone (elastic#20273)
  • Loading branch information
v1v committed Jul 30, 2020
2 parents f9bd227 + 77a8472 commit ac360da
Show file tree
Hide file tree
Showing 15 changed files with 566 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- auditd: Fix spelling of anomaly in `event.category`.
- auditd: Fix typo in `event.action` of `removed-user-role-from`. {pull}19300[19300]
- auditd: Fix typo in `event.action` of `used-suspicious-link`. {pull}19300[19300]
- system/socket: Fix kprobe grouping to allow running more than one instance. {pull}20325[20325]

*Filebeat*

Expand Down Expand Up @@ -228,6 +229,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix s3 input parsing json file without expand_event_list_from_field. {issue}19902[19902] {pull}19962[19962]
- Fix millisecond timestamp normalization issues in CrowdStrike module {issue}20035[20035], {pull}20138[20138]
- Fix support for message code 106100 in Cisco ASA and FTD. {issue}19350[19350] {pull}20245[20245]
- Fix `fortinet` setting `event.timezone` to the system one when no `tz` field present {pull}20273[20273]

*Heartbeat*

Expand Down
58 changes: 45 additions & 13 deletions x-pack/auditbeat/module/system/socket/socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ package socket
import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
Expand All @@ -36,17 +39,18 @@ import (
)

const (
moduleName = "system"
metricsetName = "socket"
fullName = moduleName + "/" + metricsetName
namespace = "system.audit.socket"
detailSelector = metricsetName + "detailed"
auditbeatGroup = "auditbeat"
moduleName = "system"
metricsetName = "socket"
fullName = moduleName + "/" + metricsetName
namespace = "system.audit.socket"
detailSelector = metricsetName + "detailed"
groupNamePrefix = "auditbeat_"
// Magic value to detect clock-sync events generated by the metricset.
clockSyncMagic uint64 = 0x42DEADBEEFABCDEF
)

var (
groupName = fmt.Sprintf("%s%d", groupNamePrefix, os.Getpid())
kernelVersion string
eventCount uint64
)
Expand Down Expand Up @@ -290,7 +294,7 @@ func (m *MetricSet) Setup() (err error) {
extra = WithFilterPort(22)
}
m.installer = newProbeInstaller(traceFS,
WithGroup(auditbeatGroup),
WithGroup(groupName),
WithTemplates(m.templateVars),
extra)
defer func() {
Expand All @@ -300,10 +304,18 @@ func (m *MetricSet) Setup() (err error) {
}()

//
// remove existing KProbes from Auditbeat
// remove dangling KProbes from terminated Auditbeat processes.
// Not a fatal error if they can't be removed.
//
if err = m.installer.UninstallIf(isOwnProbe); err != nil {
return errors.Wrap(err, "unable to delete existing KProbes. Is Auditbeat already running?")
if err = m.installer.UninstallIf(isDeadAuditbeat); err != nil {
m.log.Debugf("Removing existing probes from terminated instances: %+v", err)
}

//
// remove existing Auditbeat KProbes that match the current PID.
//
if err = m.installer.UninstallIf(isThisAuditbeat); err != nil {
return errors.Wrapf(err, "unable to delete existing KProbes for group %s", groupName)
}

//
Expand Down Expand Up @@ -409,7 +421,7 @@ func (m *MetricSet) Cleanup() {
}
}
if m.installer != nil {
if err := m.installer.UninstallIf(isOwnProbe); err != nil {
if err := m.installer.UninstallIf(isThisAuditbeat); err != nil {
m.log.Warnf("Failed to remove KProbes on exit: %v", err)
}
}
Expand Down Expand Up @@ -468,8 +480,28 @@ func triggerClockSync() {
unix.Uname(&buf)
}

func isOwnProbe(probe tracing.Probe) bool {
return probe.Group == auditbeatGroup
func isRunningAuditbeat(pid int) bool {
path := fmt.Sprintf("/proc/%d/exe", pid)
exePath, err := os.Readlink(path)
if err != nil {
// Not a running process
return false
}
exeName := filepath.Base(exePath)
return strings.HasPrefix(exeName, "auditbeat")
}

func isDeadAuditbeat(probe tracing.Probe) bool {
if strings.HasPrefix(probe.Group, groupNamePrefix) && probe.Group != groupName {
if pid, err := strconv.Atoi(probe.Group[len(groupNamePrefix):]); err == nil && !isRunningAuditbeat(pid) {
return true
}
}
return false
}

func isThisAuditbeat(probe tracing.Probe) bool {
return probe.Group == groupName
}

type mountPoint struct {
Expand Down
20 changes: 20 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package control

import (
"fmt"
"path/filepath"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
)

// Address returns the address to connect to Elastic Agent daemon.
func Address() string {
data := paths.Data()
return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock"))
}
22 changes: 22 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/addr_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build windows

package control

import (
"crypto/sha256"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
)

// Address returns the address to connect to Elastic Agent daemon.
func Address() string {
data = paths.Data()
// entire string cannot be longer than 256 characters, this forces the
// length to always be 87 characters (but unique per data path)
return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data))
}
188 changes: 188 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package client

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto"
)

// Status is the status of the Elastic Agent
type Status = proto.Status

const (
// Starting is when the it is still starting.
Starting Status = proto.Status_STARTING
// Configuring is when it is configuring.
Configuring Status = proto.Status_CONFIGURING
// Healthy is when it is healthy.
Healthy Status = proto.Status_HEALTHY
// Degraded is when it is degraded.
Degraded Status = proto.Status_DEGRADED
// Failed is when it is failed.
Failed Status = proto.Status_FAILED
// Stopping is when it is stopping.
Stopping Status = proto.Status_STOPPING
// Upgrading is when it is upgrading.
Upgrading Status = proto.Status_UPGRADING
)

// Version is the current running version of the daemon.
type Version struct {
Version string
Commit string
BuildTime time.Time
Snapshot bool
}

// ApplicationStatus is a status of an application inside of Elastic Agent.
type ApplicationStatus struct {
ID string
Name string
Status Status
Message string
Payload map[string]interface{}
}

// AgentStatus is the current status of the Elastic Agent.
type AgentStatus struct {
Status Status
Message string
Applications []*ApplicationStatus
}

// Client communicates to Elastic Agent through the control protocol.
type Client interface {
// Start starts the client.
Start(ctx context.Context) error
// Stop stops the client.
Stop()
// Version returns the current version of the running agent.
Version(ctx context.Context) (Version, error)
// Status returns the current status of the running agent.
Status(ctx context.Context) (*AgentStatus, error)
// Restart triggers restarting the current running daemon.
Restart(ctx context.Context) error
// Upgrade triggers upgrade of the current running daemon.
Upgrade(ctx context.Context, version string, sourceURI string) (string, error)
}

// client manages the state and communication to the Elastic Agent.
type client struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client proto.ElasticAgentClient
cfgLock sync.RWMutex
obsLock sync.RWMutex
}

// New creates a client connection to Elastic Agent.
func New() Client {
return &client{}
}

// Start starts the connection to Elastic Agent.
func (c *client) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
conn, err := dialContext(ctx)
if err != nil {
return err
}
c.client = proto.NewElasticAgentClient(conn)
return nil
}

// Stop stops the connection to Elastic Agent.
func (c *client) Stop() {
if c.cancel != nil {
c.cancel()
c.wg.Wait()
c.ctx = nil
c.cancel = nil
}
}

// Version returns the current version of the running agent.
func (c *client) Version(ctx context.Context) (Version, error) {
res, err := c.client.Version(ctx, &proto.Empty{})
if err != nil {
return Version{}, err
}
bt, err := time.Parse(control.TimeFormat(), res.BuildTime)
if err != nil {
return Version{}, err
}
return Version{
Version: res.Version,
Commit: res.Commit,
BuildTime: bt,
Snapshot: res.Snapshot,
}, nil
}

// Status returns the current status of the running agent.
func (c *client) Status(ctx context.Context) (*AgentStatus, error) {
res, err := c.client.Status(ctx, &proto.Empty{})
if err != nil {
return nil, err
}
s := &AgentStatus{
Status: res.Status,
Message: res.Message,
Applications: make([]*ApplicationStatus, len(res.Applications)),
}
for i, appRes := range res.Applications {
var payload map[string]interface{}
if appRes.Payload != "" {
err := json.Unmarshal([]byte(appRes.Payload), &payload)
if err != nil {
return nil, err
}
}
s.Applications[i] = &ApplicationStatus{
ID: appRes.Id,
Name: appRes.Name,
Status: appRes.Status,
Message: appRes.Message,
Payload: payload,
}
}
return s, nil
}

// Restart triggers restarting the current running daemon.
func (c *client) Restart(ctx context.Context) error {
res, err := c.client.Restart(ctx, &proto.Empty{})
if err != nil {
return err
}
if res.Status == proto.ActionStatus_FAILURE {
return fmt.Errorf(res.Error)
}
return nil
}

// Upgrade triggers upgrade of the current running daemon.
func (c *client) Upgrade(ctx context.Context, version string, sourceURI string) (string, error) {
res, err := c.client.Upgrade(ctx, &proto.UpgradeRequest{
Version: version,
SourceURI: sourceURI,
})
if err != nil {
return "", err
}
if res.Status == proto.ActionStatus_FAILURE {
return "", fmt.Errorf(res.Error)
}
return res.Version, nil
}
26 changes: 26 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/client/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package client

import (
"context"
"net"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"

"google.golang.org/grpc"
)

func dialContext(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, strings.TrimPrefix(control.Address(), "unix://"), grpc.WithInsecure(), grpc.WithContextDialer(dialer))
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "unix", addr)
}
Loading

0 comments on commit ac360da

Please sign in to comment.