From b1b78604089100061fe08a97362438cd21d5a604 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 30 Jul 2020 14:31:26 +0200 Subject: [PATCH 1/3] [Filebeat][Fortinet] Remove pre populated event.timezone (#20273) * Remove pre populated event.timezone * Add changelog entry * Remove processor instead of the field --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/module/fortinet/firewall/config/firewall.yml | 1 - x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml | 2 +- .../module/fortinet/firewall/test/fortinet.log-expected.json | 1 - 4 files changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3664320db88c..45a9acb0c11a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -228,6 +228,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix s3 input parsing json file without expand_event_list_from_field. {issue}19902[19902] {pull}19962[19962] - Fix millisecond timestamp normalization issues in CrowdStrike module {issue}20035[20035], {pull}20138[20138] - Fix support for message code 106100 in Cisco ASA and FTD. {issue}19350[19350] {pull}20245[20245] +- Fix `fortinet` setting `event.timezone` to the system one when no `tz` field present {pull}20273[20273] *Heartbeat* diff --git a/x-pack/filebeat/module/fortinet/firewall/config/firewall.yml b/x-pack/filebeat/module/fortinet/firewall/config/firewall.yml index 6af16945317c..1154d83947f2 100644 --- a/x-pack/filebeat/module/fortinet/firewall/config/firewall.yml +++ b/x-pack/filebeat/module/fortinet/firewall/config/firewall.yml @@ -24,7 +24,6 @@ tags: {{.tags | tojson}} publisher_pipeline.disable_host: {{ inList .tags "forwarded" }} processors: - - add_locale: ~ - add_fields: target: '' fields: diff --git a/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml b/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml index 60ada5b7f085..2aaf7065ec1d 100644 --- a/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml +++ b/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml @@ -178,4 +178,4 @@ processors: on_failure: - set: field: error.message - value: '{{ _ingest.on_failure_message }}' \ No newline at end of file + value: '{{ _ingest.on_failure_message }}' diff --git a/x-pack/filebeat/module/fortinet/firewall/test/fortinet.log-expected.json b/x-pack/filebeat/module/fortinet/firewall/test/fortinet.log-expected.json index 73ad332c40d5..bf1b5de3fd0b 100644 --- a/x-pack/filebeat/module/fortinet/firewall/test/fortinet.log-expected.json +++ b/x-pack/filebeat/module/fortinet/firewall/test/fortinet.log-expected.json @@ -96,7 +96,6 @@ "event.module": "fortinet", "event.outcome": "success", "event.start": "2020-06-24T01:16:08.000Z", - "event.timezone": "-02:00", "event.type": [ "connection", "end" From 2abf87f5370492b60ed2ccda9160da49ba116bc5 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 30 Jul 2020 16:31:38 +0200 Subject: [PATCH 2/3] Auditbeat: Allow multiple instances by grouping kprobes by PID (#20325) This updates the system/socket dataset to group installed kprobes by PID instead of using a generic `auditbeat` group. This allows multiple instances of Auditbeat to run with the system/socket dataset enabled (default) avoiding collision of kprobes. --- CHANGELOG.next.asciidoc | 1 + .../module/system/socket/socket_linux.go | 58 ++++++++++++++----- 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 45a9acb0c11a..68dbaa6fa1c6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - auditd: Fix spelling of anomaly in `event.category`. - auditd: Fix typo in `event.action` of `removed-user-role-from`. {pull}19300[19300] - auditd: Fix typo in `event.action` of `used-suspicious-link`. {pull}19300[19300] +- system/socket: Fix kprobe grouping to allow running more than one instance. {pull}20325[20325] *Filebeat* diff --git a/x-pack/auditbeat/module/system/socket/socket_linux.go b/x-pack/auditbeat/module/system/socket/socket_linux.go index 4c2cfd7e7828..78fdd8ae4cae 100644 --- a/x-pack/auditbeat/module/system/socket/socket_linux.go +++ b/x-pack/auditbeat/module/system/socket/socket_linux.go @@ -9,8 +9,11 @@ package socket import ( "context" "fmt" + "os" + "path/filepath" "sort" "strconv" + "strings" "sync/atomic" "syscall" "time" @@ -36,17 +39,18 @@ import ( ) const ( - moduleName = "system" - metricsetName = "socket" - fullName = moduleName + "/" + metricsetName - namespace = "system.audit.socket" - detailSelector = metricsetName + "detailed" - auditbeatGroup = "auditbeat" + moduleName = "system" + metricsetName = "socket" + fullName = moduleName + "/" + metricsetName + namespace = "system.audit.socket" + detailSelector = metricsetName + "detailed" + groupNamePrefix = "auditbeat_" // Magic value to detect clock-sync events generated by the metricset. clockSyncMagic uint64 = 0x42DEADBEEFABCDEF ) var ( + groupName = fmt.Sprintf("%s%d", groupNamePrefix, os.Getpid()) kernelVersion string eventCount uint64 ) @@ -290,7 +294,7 @@ func (m *MetricSet) Setup() (err error) { extra = WithFilterPort(22) } m.installer = newProbeInstaller(traceFS, - WithGroup(auditbeatGroup), + WithGroup(groupName), WithTemplates(m.templateVars), extra) defer func() { @@ -300,10 +304,18 @@ func (m *MetricSet) Setup() (err error) { }() // - // remove existing KProbes from Auditbeat + // remove dangling KProbes from terminated Auditbeat processes. + // Not a fatal error if they can't be removed. // - if err = m.installer.UninstallIf(isOwnProbe); err != nil { - return errors.Wrap(err, "unable to delete existing KProbes. Is Auditbeat already running?") + if err = m.installer.UninstallIf(isDeadAuditbeat); err != nil { + m.log.Debugf("Removing existing probes from terminated instances: %+v", err) + } + + // + // remove existing Auditbeat KProbes that match the current PID. + // + if err = m.installer.UninstallIf(isThisAuditbeat); err != nil { + return errors.Wrapf(err, "unable to delete existing KProbes for group %s", groupName) } // @@ -409,7 +421,7 @@ func (m *MetricSet) Cleanup() { } } if m.installer != nil { - if err := m.installer.UninstallIf(isOwnProbe); err != nil { + if err := m.installer.UninstallIf(isThisAuditbeat); err != nil { m.log.Warnf("Failed to remove KProbes on exit: %v", err) } } @@ -468,8 +480,28 @@ func triggerClockSync() { unix.Uname(&buf) } -func isOwnProbe(probe tracing.Probe) bool { - return probe.Group == auditbeatGroup +func isRunningAuditbeat(pid int) bool { + path := fmt.Sprintf("/proc/%d/exe", pid) + exePath, err := os.Readlink(path) + if err != nil { + // Not a running process + return false + } + exeName := filepath.Base(exePath) + return strings.HasPrefix(exeName, "auditbeat") +} + +func isDeadAuditbeat(probe tracing.Probe) bool { + if strings.HasPrefix(probe.Group, groupNamePrefix) && probe.Group != groupName { + if pid, err := strconv.Atoi(probe.Group[len(groupNamePrefix):]); err == nil && !isRunningAuditbeat(pid) { + return true + } + } + return false +} + +func isThisAuditbeat(probe tracing.Probe) bool { + return probe.Group == groupName } type mountPoint struct { From 77a8472b0a5f16d7f21763b4edb1a6c876e549db Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 11:33:36 -0400 Subject: [PATCH 3/3] [Elastic Agent] Add skeleton for client/server for agent control protocol (#20163) * Add protocl to control Elastic Agent. * Fix CI with protoc. * Remove CI changes. * Start on the control server code. * More client/server work. * More work. * Add test. * Fix vet issues. * Fix permissions on unix socket. Add comment to Windows npipe. --- .../elastic-agent/pkg/agent/control/addr.go | 20 ++ .../pkg/agent/control/addr_windows.go | 22 ++ .../pkg/agent/control/client/client.go | 188 ++++++++++++++++++ .../pkg/agent/control/client/dial.go | 26 +++ .../pkg/agent/control/client/dial_windows.go | 26 +++ .../pkg/agent/control/control_test.go | 53 +++++ .../pkg/agent/control/server/listener.go | 38 ++++ .../agent/control/server/listener_windows.go | 29 +++ .../pkg/agent/control/server/server.go | 106 ++++++++++ .../elastic-agent/pkg/agent/control/time.go | 10 + 10 files changed, 518 insertions(+) create mode 100644 x-pack/elastic-agent/pkg/agent/control/addr.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/addr_windows.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/client/client.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/client/dial.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/control_test.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/server/listener.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/server/server.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/time.go diff --git a/x-pack/elastic-agent/pkg/agent/control/addr.go b/x-pack/elastic-agent/pkg/agent/control/addr.go new file mode 100644 index 000000000000..3416480a6a05 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/addr.go @@ -0,0 +1,20 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !windows + +package control + +import ( + "fmt" + "path/filepath" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" +) + +// Address returns the address to connect to Elastic Agent daemon. +func Address() string { + data := paths.Data() + return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock")) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go new file mode 100644 index 000000000000..1123eec941bf --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go @@ -0,0 +1,22 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build windows + +package control + +import ( + "crypto/sha256" + "fmt" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" +) + +// Address returns the address to connect to Elastic Agent daemon. +func Address() string { + data = paths.Data() + // entire string cannot be longer than 256 characters, this forces the + // length to always be 87 characters (but unique per data path) + return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data)) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go new file mode 100644 index 000000000000..bcd8eccdb82d --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -0,0 +1,188 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package client + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" +) + +// Status is the status of the Elastic Agent +type Status = proto.Status + +const ( + // Starting is when the it is still starting. + Starting Status = proto.Status_STARTING + // Configuring is when it is configuring. + Configuring Status = proto.Status_CONFIGURING + // Healthy is when it is healthy. + Healthy Status = proto.Status_HEALTHY + // Degraded is when it is degraded. + Degraded Status = proto.Status_DEGRADED + // Failed is when it is failed. + Failed Status = proto.Status_FAILED + // Stopping is when it is stopping. + Stopping Status = proto.Status_STOPPING + // Upgrading is when it is upgrading. + Upgrading Status = proto.Status_UPGRADING +) + +// Version is the current running version of the daemon. +type Version struct { + Version string + Commit string + BuildTime time.Time + Snapshot bool +} + +// ApplicationStatus is a status of an application inside of Elastic Agent. +type ApplicationStatus struct { + ID string + Name string + Status Status + Message string + Payload map[string]interface{} +} + +// AgentStatus is the current status of the Elastic Agent. +type AgentStatus struct { + Status Status + Message string + Applications []*ApplicationStatus +} + +// Client communicates to Elastic Agent through the control protocol. +type Client interface { + // Start starts the client. + Start(ctx context.Context) error + // Stop stops the client. + Stop() + // Version returns the current version of the running agent. + Version(ctx context.Context) (Version, error) + // Status returns the current status of the running agent. + Status(ctx context.Context) (*AgentStatus, error) + // Restart triggers restarting the current running daemon. + Restart(ctx context.Context) error + // Upgrade triggers upgrade of the current running daemon. + Upgrade(ctx context.Context, version string, sourceURI string) (string, error) +} + +// client manages the state and communication to the Elastic Agent. +type client struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + client proto.ElasticAgentClient + cfgLock sync.RWMutex + obsLock sync.RWMutex +} + +// New creates a client connection to Elastic Agent. +func New() Client { + return &client{} +} + +// Start starts the connection to Elastic Agent. +func (c *client) Start(ctx context.Context) error { + c.ctx, c.cancel = context.WithCancel(ctx) + conn, err := dialContext(ctx) + if err != nil { + return err + } + c.client = proto.NewElasticAgentClient(conn) + return nil +} + +// Stop stops the connection to Elastic Agent. +func (c *client) Stop() { + if c.cancel != nil { + c.cancel() + c.wg.Wait() + c.ctx = nil + c.cancel = nil + } +} + +// Version returns the current version of the running agent. +func (c *client) Version(ctx context.Context) (Version, error) { + res, err := c.client.Version(ctx, &proto.Empty{}) + if err != nil { + return Version{}, err + } + bt, err := time.Parse(control.TimeFormat(), res.BuildTime) + if err != nil { + return Version{}, err + } + return Version{ + Version: res.Version, + Commit: res.Commit, + BuildTime: bt, + Snapshot: res.Snapshot, + }, nil +} + +// Status returns the current status of the running agent. +func (c *client) Status(ctx context.Context) (*AgentStatus, error) { + res, err := c.client.Status(ctx, &proto.Empty{}) + if err != nil { + return nil, err + } + s := &AgentStatus{ + Status: res.Status, + Message: res.Message, + Applications: make([]*ApplicationStatus, len(res.Applications)), + } + for i, appRes := range res.Applications { + var payload map[string]interface{} + if appRes.Payload != "" { + err := json.Unmarshal([]byte(appRes.Payload), &payload) + if err != nil { + return nil, err + } + } + s.Applications[i] = &ApplicationStatus{ + ID: appRes.Id, + Name: appRes.Name, + Status: appRes.Status, + Message: appRes.Message, + Payload: payload, + } + } + return s, nil +} + +// Restart triggers restarting the current running daemon. +func (c *client) Restart(ctx context.Context) error { + res, err := c.client.Restart(ctx, &proto.Empty{}) + if err != nil { + return err + } + if res.Status == proto.ActionStatus_FAILURE { + return fmt.Errorf(res.Error) + } + return nil +} + +// Upgrade triggers upgrade of the current running daemon. +func (c *client) Upgrade(ctx context.Context, version string, sourceURI string) (string, error) { + res, err := c.client.Upgrade(ctx, &proto.UpgradeRequest{ + Version: version, + SourceURI: sourceURI, + }) + if err != nil { + return "", err + } + if res.Status == proto.ActionStatus_FAILURE { + return "", fmt.Errorf(res.Error) + } + return res.Version, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/client/dial.go b/x-pack/elastic-agent/pkg/agent/control/client/dial.go new file mode 100644 index 000000000000..56313b12c82a --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/client/dial.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !windows + +package client + +import ( + "context" + "net" + "strings" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + + "google.golang.org/grpc" +) + +func dialContext(ctx context.Context) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, strings.TrimPrefix(control.Address(), "unix://"), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) +} + +func dialer(ctx context.Context, addr string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "unix", addr) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go new file mode 100644 index 000000000000..58b36c180435 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build windows + +package client + +import ( + "context" + "net" + + "google.golang.org/grpc" + + "github.com/elastic/beats/v7/libbeat/api/npipe" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" +) + +func dialContext(ctx context.Context) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, control.Address(), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) +} + +func dialer(ctx context.Context, addr string) (net.Conn, error) { + return npipe.DialContext(arr)(ctx, "", "") +} diff --git a/x-pack/elastic-agent/pkg/agent/control/control_test.go b/x-pack/elastic-agent/pkg/agent/control/control_test.go new file mode 100644 index 000000000000..13d32420258d --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/control_test.go @@ -0,0 +1,53 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package control_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" +) + +func TestServerClient_Version(t *testing.T) { + srv := server.New(newErrorLogger(t)) + err := srv.Start() + require.NoError(t, err) + defer srv.Stop() + + c := client.New() + err = c.Start(context.Background()) + require.NoError(t, err) + defer c.Stop() + + ver, err := c.Version(context.Background()) + require.NoError(t, err) + + assert.Equal(t, client.Version{ + Version: release.Version(), + Commit: release.Commit(), + BuildTime: release.BuildTime(), + Snapshot: release.Snapshot(), + }, ver) +} + +func newErrorLogger(t *testing.T) *logger.Logger { + t.Helper() + + loggerCfg := logger.DefaultLoggingConfig() + loggerCfg.Level = logp.ErrorLevel + + log, err := logger.NewFromConfig("", loggerCfg) + require.NoError(t, err) + return log +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener.go b/x-pack/elastic-agent/pkg/agent/control/server/listener.go new file mode 100644 index 000000000000..2dd5d54a46fe --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener.go @@ -0,0 +1,38 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !windows + +package server + +import ( + "net" + "os" + "path/filepath" + "strings" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" +) + +func createListener() (net.Listener, error) { + path := strings.TrimPrefix(control.Address(), "unix://") + dir := filepath.Dir(path) + if _, err := os.Stat(dir); os.IsNotExist(err) { + err = os.MkdirAll(dir, 0755) + if err != nil { + return nil, err + } + } + lis, err := net.Listen("unix", path) + if err != nil { + return nil, err + } + err = os.Chmod(path, 0700) + if err != nil { + // failed to set permissions (close listener) + lis.Close() + return nil, err + } + return lis, err +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go new file mode 100644 index 000000000000..d2d2866b98a1 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build windows + +package server + +import ( + "net" + "os/user" + + "github.com/elastic/beats/v7/libbeat/api/npipe" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" +) + +// createListener creates a named pipe listener on Windows +func createListener() (net.Listener, error) { + u, err := user.Current() + if err != nil { + return nil, err + } + sd, err := npipe.DefaultSD(u.Username) + if err != nil { + return nil, err + } + return npipe.NewListener(control.Address(), sd) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go new file mode 100644 index 000000000000..c9a750808fcd --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -0,0 +1,106 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "context" + "net" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" + + "google.golang.org/grpc" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" +) + +// Server is the daemon side of the control protocol. +type Server struct { + logger *logger.Logger + listener net.Listener + server *grpc.Server +} + +// New creates a new control protocol server. +func New(log *logger.Logger) *Server { + return &Server{ + logger: log, + } +} + +// Start starts the GRPC endpoint and accepts new connections. +func (s *Server) Start() error { + if s.server != nil { + // already started + return nil + } + + lis, err := createListener() + if err != nil { + return err + } + s.listener = lis + s.server = grpc.NewServer() + proto.RegisterElasticAgentServer(s.server, s) + + // start serving GRPC connections + go func() { + err := s.server.Serve(lis) + if err != nil { + s.logger.Errorf("error listening for GRPC: %s", err) + } + }() + + return nil +} + +// Stop stops the GRPC endpoint. +func (s *Server) Stop() { + if s.server != nil { + s.server.Stop() + s.server = nil + s.listener = nil + } +} + +// Version returns the currently running version. +func (s *Server) Version(_ context.Context, _ *proto.Empty) (*proto.VersionResponse, error) { + return &proto.VersionResponse{ + Version: release.Version(), + Commit: release.Commit(), + BuildTime: release.BuildTime().Format(control.TimeFormat()), + Snapshot: release.Snapshot(), + }, nil +} + +// Status returns the overall status of the agent. +func (s *Server) Status(_ context.Context, _ *proto.Empty) (*proto.StatusResponse, error) { + // not implemented + return &proto.StatusResponse{ + Status: proto.Status_HEALTHY, + Message: "not implemented", + Applications: nil, + }, nil +} + +// Restart performs re-exec. +func (s *Server) Restart(_ context.Context, _ *proto.Empty) (*proto.RestartResponse, error) { + // not implemented + return &proto.RestartResponse{ + Status: proto.ActionStatus_FAILURE, + Error: "not implemented", + }, nil +} + +// Upgrade performs the upgrade operation. +func (s *Server) Upgrade(ctx context.Context, request *proto.UpgradeRequest) (*proto.UpgradeResponse, error) { + // not implemented + return &proto.UpgradeResponse{ + Status: proto.ActionStatus_FAILURE, + Version: "", + Error: "not implemented", + }, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/time.go b/x-pack/elastic-agent/pkg/agent/control/time.go new file mode 100644 index 000000000000..c87902bbc371 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/time.go @@ -0,0 +1,10 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package control + +// TimeFormat returns the time format shared between the protocol. +func TimeFormat() string { + return "2006-01-02 15:04:05 -0700 MST" +}