Skip to content

Commit

Permalink
Merge pull request #4757 from hashicorp/f-driver-plugin-loader
Browse files Browse the repository at this point in the history
driver plugin loader
  • Loading branch information
nickethier authored Oct 11, 2018
2 parents 7120fc2 + 19e45d3 commit a69deeb
Show file tree
Hide file tree
Showing 42 changed files with 1,291 additions and 751 deletions.
2 changes: 2 additions & 0 deletions client/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestAllocations_GarbageCollectAll_ACL(t *testing.T) {
}

func TestAllocations_GarbageCollect(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
t.Parallel()
require := require.New(t)
client := TestClient(t, func(c *config.Config) {
Expand Down Expand Up @@ -174,6 +175,7 @@ func TestAllocations_GarbageCollect_ACL(t *testing.T) {
}

func TestAllocations_Stats(t *testing.T) {
t.Skip("missing exec driver plugin implementation")
t.Parallel()
require := require.New(t)
client := TestClient(t, nil)
Expand Down
1 change: 1 addition & 0 deletions client/alloc_watcher_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
// TestPrevAlloc_StreamAllocDir_TLS asserts ephemeral disk migrations still
// work when TLS is enabled.
func TestPrevAlloc_StreamAllocDir_TLS(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
const (
caFn = "../helper/tlsutil/testdata/global-ca.pem"
serverCertFn = "../helper/tlsutil/testdata/global-server.pem"
Expand Down
4 changes: 4 additions & 0 deletions client/allocdir/task_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
// TaskDir contains all of the paths relevant to a task. All paths are on the
// host system so drivers should mount/link into task containers as necessary.
type TaskDir struct {
// AllocDir is the path to the alloc directory on the host
AllocDir string

// Dir is the path to Task directory on the host
Dir string

Expand Down Expand Up @@ -50,6 +53,7 @@ func newTaskDir(logger hclog.Logger, allocDir, taskName string) *TaskDir {
logger = logger.Named("task_dir").With("task_name", taskName)

return &TaskDir{
AllocDir: allocDir,
Dir: taskDir,
SharedAllocDir: filepath.Join(allocDir, SharedAllocName),
LogDir: filepath.Join(allocDir, SharedAllocName, LogDirName),
Expand Down
49 changes: 28 additions & 21 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/shared/loader"
)

// allocRunner is used to run all the tasks in a given allocation
Expand Down Expand Up @@ -78,6 +79,10 @@ type allocRunner struct {
// prevAllocWatcher allows waiting for a previous allocation to exit
// and if necessary migrate its alloc dir.
prevAllocWatcher allocwatcher.PrevAllocWatcher

// pluginSingletonLoader is a plugin loader that will returns singleton
// instances of the plugins.
pluginSingletonLoader loader.PluginCatalog
}

// NewAllocRunner returns a new allocation runner.
Expand All @@ -89,18 +94,19 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
}

ar := &allocRunner{
id: alloc.ID,
alloc: alloc,
clientConfig: config.ClientConfig,
consulClient: config.Consul,
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
state: &state.State{},
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
allocBroadcaster: cstructs.NewAllocBroadcaster(),
prevAllocWatcher: config.PrevAllocWatcher,
id: alloc.ID,
alloc: alloc,
clientConfig: config.ClientConfig,
consulClient: config.Consul,
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
state: &state.State{},
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
allocBroadcaster: cstructs.NewAllocBroadcaster(),
prevAllocWatcher: config.PrevAllocWatcher,
pluginSingletonLoader: config.PluginSingletonLoader,
}

// Create the logger based on the allocation ID
Expand All @@ -124,15 +130,16 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
for _, task := range tasks {
config := &taskrunner.Config{
Alloc: ar.alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
StateDB: ar.stateDB,
StateUpdater: ar,
Consul: ar.consulClient,
VaultClient: ar.vaultClient,
Alloc: ar.alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
StateDB: ar.stateDB,
StateUpdater: ar,
Consul: ar.consulClient,
VaultClient: ar.vaultClient,
PluginSingletonLoader: ar.pluginSingletonLoader,
}

// Create, but do not Run, the task runner
Expand Down
23 changes: 15 additions & 8 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,35 @@ import (
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/plugins/shared/catalog"
"github.com/hashicorp/nomad/plugins/shared/loader"
"github.com/hashicorp/nomad/plugins/shared/singleton"
"github.com/stretchr/testify/require"
)

// TestAllocRunner_AllocState_Initialized asserts that getting TaskStates via
// AllocState() are initialized even before the AllocRunner has run.
func TestAllocRunner_AllocState_Initialized(t *testing.T) {
t.Skip("missing exec driver plugin implementation")
t.Parallel()

alloc := mock.Alloc()
logger := testlog.HCLogger(t)

conf := &Config{
Alloc: alloc,
Logger: logger,
ClientConfig: config.TestClientConfig(),
StateDB: state.NoopDB{},
Consul: nil,
Vault: nil,
StateUpdater: nil,
PrevAllocWatcher: nil,
Alloc: alloc,
Logger: logger,
ClientConfig: config.TestClientConfig(),
StateDB: state.NoopDB{},
Consul: nil,
Vault: nil,
StateUpdater: nil,
PrevAllocWatcher: nil,
PluginSingletonLoader: &loader.MockCatalog{},
}

pluginLoader := catalog.TestPluginLoader(t)
conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, pluginLoader)
ar, err := NewAllocRunner(conf)
require.NoError(t, err)

Expand Down
8 changes: 8 additions & 0 deletions client/allocrunner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
cstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/shared/loader"
)

// Config holds the configuration for creating an allocation runner.
Expand Down Expand Up @@ -37,4 +38,11 @@ type Config struct {
// PrevAllocWatcher handles waiting on previous allocations and
// migrating their ephemeral disk when necessary.
PrevAllocWatcher allocwatcher.PrevAllocWatcher

// PluginLoader is used to load plugins.
PluginLoader loader.PluginCatalog

// PluginSingletonLoader is a plugin loader that will returns singleton
// instances of the plugins.
PluginSingletonLoader loader.PluginCatalog
}
4 changes: 2 additions & 2 deletions client/allocrunner/interfaces/task_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package interfaces
import (
"context"

"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -82,7 +82,7 @@ type DriverStats interface {

type TaskPoststartRequest struct {
// Exec hook (may be nil)
DriverExec driver.ScriptExecutor
DriverExec interfaces.ScriptExecutor

// Network info (may be nil)
DriverNetwork *cstructs.DriverNetwork
Expand Down
66 changes: 66 additions & 0 deletions client/allocrunner/taskrunner/driver_handle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package taskrunner

import (
"context"
"time"

cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)

// NewDriverHandle returns a handle for task operations on a specific task
func NewDriverHandle(driver drivers.DriverPlugin, taskID string, task *structs.Task, net *cstructs.DriverNetwork) *DriverHandle {
return &DriverHandle{
driver: driver,
net: net,
taskID: taskID,
task: task,
}
}

// DriverHandle encapsulates a driver plugin client and task identifier and exposes
// an api to perform driver operations on the task
type DriverHandle struct {
driver drivers.DriverPlugin
net *cstructs.DriverNetwork
task *structs.Task
taskID string
}

func (h *DriverHandle) ID() string {
return h.taskID
}

func (h *DriverHandle) WaitCh(ctx context.Context) (<-chan *drivers.ExitResult, error) {
return h.driver.WaitTask(ctx, h.taskID)
}

func (h *DriverHandle) Update(task *structs.Task) error {
return nil
}

func (h *DriverHandle) Kill() error {
return h.driver.StopTask(h.taskID, h.task.KillTimeout, h.task.KillSignal)
}

func (h *DriverHandle) Stats() (*cstructs.TaskResourceUsage, error) {
return h.driver.TaskStats(h.taskID)
}

func (h *DriverHandle) Signal(s string) error {
return h.driver.SignalTask(h.taskID, s)
}

func (h *DriverHandle) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
command := append([]string{cmd}, args...)
res, err := h.driver.ExecTask(h.taskID, command, timeout)
if err != nil {
return nil, 0, err
}
return res.Stdout, res.ExitResult.ExitCode, res.ExitResult.Err
}

func (h *DriverHandle) Network() *cstructs.DriverNetwork {
return h.net
}
47 changes: 47 additions & 0 deletions client/allocrunner/taskrunner/interfaces/handle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package interfaces

import (
"context"
"time"

cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)

// DriverHandle wraps operations to a driver such that they are operated on a specific
// task
type DriverHandle interface {
// ID returns the task ID
ID() string

// WaitCh is used to return a channel used to wait for task completion
WaitCh(context.Context) (<-chan *drivers.ExitResult, error)

// Update is used to update the task if possible and update task related
// configurations.
Update(task *structs.Task) error

// Kill is used to stop the task
Kill() error

// Stats returns aggregated stats of the driver
Stats() (*cstructs.TaskResourceUsage, error)

// Signal is used to send a signal to the task
Signal(s string) error

// ScriptExecutor is an interface used to execute commands such as
// health check scripts in the a DriverHandle's context.
ScriptExecutor

// Network returns the driver's network or nil if the driver did not
// create a network.
Network() *cstructs.DriverNetwork
}

// ScriptExecutor is an interface that supports Exec()ing commands in the
// driver's context. Split out of DriverHandle to ease testing.
type ScriptExecutor interface {
Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error)
}
3 changes: 1 addition & 2 deletions client/allocrunner/taskrunner/interfaces/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package interfaces

import (
"context"
"os"

"github.com/hashicorp/nomad/nomad/structs"
)

type TaskLifecycle interface {
Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error
Signal(event *structs.TaskEvent, s os.Signal) error
Signal(event *structs.TaskEvent, signal string) error
Kill(ctx context.Context, event *structs.TaskEvent) error
}
Loading

0 comments on commit a69deeb

Please sign in to comment.