Skip to content

Commit

Permalink
[Elastic Agent] Add skeleton for client/server for agent control prot…
Browse files Browse the repository at this point in the history
…ocol (elastic#20163)

* Add protocl to control Elastic Agent.

* Fix CI with protoc.

* Remove CI changes.

* Start on the control server code.

* More client/server work.

* More work.

* Add test.

* Fix vet issues.

* Fix permissions on unix socket. Add comment to Windows npipe.
  • Loading branch information
blakerouse authored Jul 30, 2020
1 parent 2abf87f commit 77a8472
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 0 deletions.
20 changes: 20 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package control

import (
"fmt"
"path/filepath"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
)

// Address returns the address to connect to Elastic Agent daemon.
func Address() string {
data := paths.Data()
return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock"))
}
22 changes: 22 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/addr_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build windows

package control

import (
"crypto/sha256"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
)

// Address returns the address to connect to Elastic Agent daemon.
func Address() string {
data = paths.Data()
// entire string cannot be longer than 256 characters, this forces the
// length to always be 87 characters (but unique per data path)
return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data))
}
188 changes: 188 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package client

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto"
)

// Status is the status of the Elastic Agent
type Status = proto.Status

const (
// Starting is when the it is still starting.
Starting Status = proto.Status_STARTING
// Configuring is when it is configuring.
Configuring Status = proto.Status_CONFIGURING
// Healthy is when it is healthy.
Healthy Status = proto.Status_HEALTHY
// Degraded is when it is degraded.
Degraded Status = proto.Status_DEGRADED
// Failed is when it is failed.
Failed Status = proto.Status_FAILED
// Stopping is when it is stopping.
Stopping Status = proto.Status_STOPPING
// Upgrading is when it is upgrading.
Upgrading Status = proto.Status_UPGRADING
)

// Version is the current running version of the daemon.
type Version struct {
Version string
Commit string
BuildTime time.Time
Snapshot bool
}

// ApplicationStatus is a status of an application inside of Elastic Agent.
type ApplicationStatus struct {
ID string
Name string
Status Status
Message string
Payload map[string]interface{}
}

// AgentStatus is the current status of the Elastic Agent.
type AgentStatus struct {
Status Status
Message string
Applications []*ApplicationStatus
}

// Client communicates to Elastic Agent through the control protocol.
type Client interface {
// Start starts the client.
Start(ctx context.Context) error
// Stop stops the client.
Stop()
// Version returns the current version of the running agent.
Version(ctx context.Context) (Version, error)
// Status returns the current status of the running agent.
Status(ctx context.Context) (*AgentStatus, error)
// Restart triggers restarting the current running daemon.
Restart(ctx context.Context) error
// Upgrade triggers upgrade of the current running daemon.
Upgrade(ctx context.Context, version string, sourceURI string) (string, error)
}

// client manages the state and communication to the Elastic Agent.
type client struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client proto.ElasticAgentClient
cfgLock sync.RWMutex
obsLock sync.RWMutex
}

// New creates a client connection to Elastic Agent.
func New() Client {
return &client{}
}

// Start starts the connection to Elastic Agent.
func (c *client) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
conn, err := dialContext(ctx)
if err != nil {
return err
}
c.client = proto.NewElasticAgentClient(conn)
return nil
}

// Stop stops the connection to Elastic Agent.
func (c *client) Stop() {
if c.cancel != nil {
c.cancel()
c.wg.Wait()
c.ctx = nil
c.cancel = nil
}
}

// Version returns the current version of the running agent.
func (c *client) Version(ctx context.Context) (Version, error) {
res, err := c.client.Version(ctx, &proto.Empty{})
if err != nil {
return Version{}, err
}
bt, err := time.Parse(control.TimeFormat(), res.BuildTime)
if err != nil {
return Version{}, err
}
return Version{
Version: res.Version,
Commit: res.Commit,
BuildTime: bt,
Snapshot: res.Snapshot,
}, nil
}

// Status returns the current status of the running agent.
func (c *client) Status(ctx context.Context) (*AgentStatus, error) {
res, err := c.client.Status(ctx, &proto.Empty{})
if err != nil {
return nil, err
}
s := &AgentStatus{
Status: res.Status,
Message: res.Message,
Applications: make([]*ApplicationStatus, len(res.Applications)),
}
for i, appRes := range res.Applications {
var payload map[string]interface{}
if appRes.Payload != "" {
err := json.Unmarshal([]byte(appRes.Payload), &payload)
if err != nil {
return nil, err
}
}
s.Applications[i] = &ApplicationStatus{
ID: appRes.Id,
Name: appRes.Name,
Status: appRes.Status,
Message: appRes.Message,
Payload: payload,
}
}
return s, nil
}

// Restart triggers restarting the current running daemon.
func (c *client) Restart(ctx context.Context) error {
res, err := c.client.Restart(ctx, &proto.Empty{})
if err != nil {
return err
}
if res.Status == proto.ActionStatus_FAILURE {
return fmt.Errorf(res.Error)
}
return nil
}

// Upgrade triggers upgrade of the current running daemon.
func (c *client) Upgrade(ctx context.Context, version string, sourceURI string) (string, error) {
res, err := c.client.Upgrade(ctx, &proto.UpgradeRequest{
Version: version,
SourceURI: sourceURI,
})
if err != nil {
return "", err
}
if res.Status == proto.ActionStatus_FAILURE {
return "", fmt.Errorf(res.Error)
}
return res.Version, nil
}
26 changes: 26 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/client/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package client

import (
"context"
"net"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"

"google.golang.org/grpc"
)

func dialContext(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, strings.TrimPrefix(control.Address(), "unix://"), grpc.WithInsecure(), grpc.WithContextDialer(dialer))
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "unix", addr)
}
26 changes: 26 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build windows

package client

import (
"context"
"net"

"google.golang.org/grpc"

"github.com/elastic/beats/v7/libbeat/api/npipe"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"
)

func dialContext(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, control.Address(), grpc.WithInsecure(), grpc.WithContextDialer(dialer))
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
return npipe.DialContext(arr)(ctx, "", "")
}
53 changes: 53 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/control_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package control_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

func TestServerClient_Version(t *testing.T) {
srv := server.New(newErrorLogger(t))
err := srv.Start()
require.NoError(t, err)
defer srv.Stop()

c := client.New()
err = c.Start(context.Background())
require.NoError(t, err)
defer c.Stop()

ver, err := c.Version(context.Background())
require.NoError(t, err)

assert.Equal(t, client.Version{
Version: release.Version(),
Commit: release.Commit(),
BuildTime: release.BuildTime(),
Snapshot: release.Snapshot(),
}, ver)
}

func newErrorLogger(t *testing.T) *logger.Logger {
t.Helper()

loggerCfg := logger.DefaultLoggingConfig()
loggerCfg.Level = logp.ErrorLevel

log, err := logger.NewFromConfig("", loggerCfg)
require.NoError(t, err)
return log
}
38 changes: 38 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/server/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package server

import (
"net"
"os"
"path/filepath"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"
)

func createListener() (net.Listener, error) {
path := strings.TrimPrefix(control.Address(), "unix://")
dir := filepath.Dir(path)
if _, err := os.Stat(dir); os.IsNotExist(err) {
err = os.MkdirAll(dir, 0755)
if err != nil {
return nil, err
}
}
lis, err := net.Listen("unix", path)
if err != nil {
return nil, err
}
err = os.Chmod(path, 0700)
if err != nil {
// failed to set permissions (close listener)
lis.Close()
return nil, err
}
return lis, err
}
Loading

0 comments on commit 77a8472

Please sign in to comment.