Skip to content

Commit

Permalink
Sign and verify work submissions
Browse files Browse the repository at this point in the history
Remote work submissions can be digitally signed by the sender. The
target node will verify the signature of the work command before
starting the work unit.

A pair of RSA public and private keys are created offline and
distributed to the nodes. The public key should be on the node receiving
work (PKIX format). The private key should be on the node submitting
work (PKCS1 format).
  • Loading branch information
fosterseth committed Sep 9, 2021
1 parent 7cb98c7 commit ed12f53
Show file tree
Hide file tree
Showing 22 changed files with 441 additions and 97 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ TESTCMD = -run $(RUNTEST)
endif

test:
@go test ./... -p 1 -parallel=16 $(TESTCMD) -count=1 -v
@go test ./... -p 1 -parallel=16 $(TESTCMD) -count=1

testloop: receptor
@i=1; while echo "------ $$i" && \
Expand Down
6 changes: 3 additions & 3 deletions docs/source/tls.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ foo.yml
- tls-server:
name: myserver
cert: foo.crt
key: foo.key
cert: /full/path/foo.crt
key: /full/path/foo.key
requireclientcert: true
clientcas: ca.crt
clientcas: /full/path/ca.crt
- tcp-listener:
port: 2222
Expand Down
33 changes: 33 additions & 0 deletions docs/source/workceptor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,39 @@ For local work, transitioning from Pending to Running occurs the moment the ``co

For remote work, transitioning from Pending to Running occurs when the status reported from the remote node has a Running state.

Signed work
^^^^^^^^^^^^^^^^^^

Remote work submissions can be digitally signed by the sender. The target node will verify the signature of the work command before starting the work unit.

A pair of RSA public and private keys are created offline and distributed to the nodes. The public key should be on the node receiving work (PKIX format). The private key should be on the node submitting work (PKCS1 format).

in `bar.yml`

.. code-block:: yaml
# PKIX
- verifyingkeypublic: /full/path/signworkpublic.pem
- work-command:
workType: echopayload
command: bash
params: "-c \"while read -r line; do echo ${line^^}; sleep 5; done\""
verifysignature: true
in `foo.yml`

.. code-block:: yaml
# PKCS1
- signingkeyprivate: /full/path/signworkprivate.pem
Use the "--signwork" parameter to sign the work.

.. code::
$ receptorctl --socket /tmp/foo.sock work submit echoint --node bar --no-payload --signwork
Units on disk
^^^^^^^^^^^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.4.9
github.com/ghjm/cmdline v0.1.0
github.com/golang-jwt/jwt/v4 v4.0.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/gorilla/websocket v1.4.2
github.com/jupp0r/go-priority-queue v0.0.0-20160601094913-ab1073853bde
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down
51 changes: 48 additions & 3 deletions pkg/certificates/ca.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func LoadFromPEMFile(filename string) ([]interface{}, error) {
var block *pem.Block
for len(content) > 0 {
block, content = pem.Decode(content)
if block == nil {
return nil, fmt.Errorf("failed to decode PEM block")
}
switch block.Type {
case "CERTIFICATE":
var cert *x509.Certificate
Expand All @@ -68,6 +71,12 @@ func LoadFromPEMFile(filename string) ([]interface{}, error) {
return nil, err
}
results = append(results, key)
case "PUBLIC KEY":
key, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
return nil, err
}
results = append(results, key)
default:
return nil, fmt.Errorf("unknown block type %s", block.Type)
}
Expand Down Expand Up @@ -112,13 +121,32 @@ func SaveToPEMFile(filename string, data []interface{}) error {

continue
}
var key *rsa.PrivateKey
key, ok = elem.(*rsa.PrivateKey)
var keyPrivate *rsa.PrivateKey
keyPrivate, ok = elem.(*rsa.PrivateKey)
if ok {
keyPEM := new(bytes.Buffer)
err = pem.Encode(keyPEM, &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(key),
Bytes: x509.MarshalPKCS1PrivateKey(keyPrivate),
})
if err != nil {
return err
}
content = append(content, keyPEM.String())

continue
}
var keyPublic *rsa.PublicKey
keyPublic, ok = elem.(*rsa.PublicKey)
if ok {
keyPEM := new(bytes.Buffer)
keyPublicBytes, err := x509.MarshalPKIXPublicKey(keyPublic)
if err != nil {
return err
}
err = pem.Encode(keyPEM, &pem.Block{
Type: "PUBLIC KEY",
Bytes: keyPublicBytes,
})
if err != nil {
return err
Expand Down Expand Up @@ -183,6 +211,23 @@ func LoadPrivateKey(filename string) (*rsa.PrivateKey, error) {
return key, nil
}

// LoadPublicKey loads a single RSA public key from a file.
func LoadPublicKey(filename string) (*rsa.PublicKey, error) {
data, err := LoadFromPEMFile(filename)
if err != nil {
return nil, err
}
if len(data) != 1 {
return nil, fmt.Errorf("public key file should contain exactly one item")
}
key, ok := data[0].(*rsa.PublicKey)
if !ok {
return nil, fmt.Errorf("public key file does not contain public key data")
}

return key, nil
}

// CA contains internal data for a certificate authority.
type CA struct {
Certificate *x509.Certificate
Expand Down
24 changes: 18 additions & 6 deletions pkg/netceptor/netceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type Netceptor struct {
seenUpdateExpireTime time.Duration
maxForwardingHops byte
maxConnectionIdleTime time.Duration
workCommands []string
workCommands []WorkCommand
epoch uint64
sequence uint64
connLock *sync.RWMutex
Expand Down Expand Up @@ -221,14 +221,19 @@ const (
ConnTypeStreamTLS = 2
)

type WorkCommand struct {
WorkType string
Secure bool
}

// ServiceAdvertisement is the data associated with a service advertisement.
type ServiceAdvertisement struct {
NodeID string
Service string
Time time.Time
ConnType byte
Tags map[string]string
WorkCommands []string
WorkCommands string
}

// serviceAdvertisementFull is the whole message from the network.
Expand Down Expand Up @@ -563,7 +568,10 @@ func (s *Netceptor) Status() Status {
adCopy := *ad
if adCopy.NodeID == s.nodeID {
adCopy.Time = time.Now()
adCopy.WorkCommands = s.workCommands
if len(s.workCommands) > 0 {
wCBytes, _ := json.Marshal(s.workCommands)
adCopy.WorkCommands = string(wCBytes)
}
}
serviceAds = append(serviceAds, &adCopy)
}
Expand Down Expand Up @@ -688,7 +696,10 @@ func (s *Netceptor) sendServiceAds() {
}
if svcType, ok := sa.Tags["type"]; ok {
if svcType == "Control Service" {
sa.WorkCommands = s.workCommands
if len(s.workCommands) > 0 {
wCBytes, _ := json.Marshal(s.workCommands)
sa.WorkCommands = string(wCBytes)
}
}
}
ads = append(ads, sa)
Expand Down Expand Up @@ -867,11 +878,12 @@ func (s *Netceptor) GetServerTLSConfig(name string) (*tls.Config, error) {
}

// AddWorkCommand records a work command so it can be included in service announcements.
func (s *Netceptor) AddWorkCommand(command string) error {
func (s *Netceptor) AddWorkCommand(command string, secure bool) error {
if command == "" {
return fmt.Errorf("must provide a name")
}
s.workCommands = append(s.workCommands, command)
wC := WorkCommand{WorkType: command, Secure: secure}
s.workCommands = append(s.workCommands, wC)

return nil
}
Expand Down
49 changes: 47 additions & 2 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ type commandCfg struct {
Command string `required:"true" description:"Command to run to process units of work"`
Params string `description:"Command-line parameters"`
AllowRuntimeParams bool `description:"Allow users to add more parameters" default:"false"`
VerifySignature bool `description:"Verify a signed work submission" default:"false"`
}

func (cfg commandCfg) newWorker(w *Workceptor, unitID string, workType string) WorkUnit {
Expand All @@ -309,7 +310,7 @@ func (cfg commandCfg) newWorker(w *Workceptor, unitID string, workType string) W

// Run runs the action.
func (cfg commandCfg) Run() error {
err := MainInstance.RegisterWorker(cfg.WorkType, cfg.newWorker)
err := MainInstance.RegisterWorker(cfg.WorkType, cfg.newWorker, cfg.VerifySignature)

return err
}
Expand Down Expand Up @@ -339,7 +340,51 @@ func (cfg commandRunnerCfg) Run() error {
return nil
}

type signingKeyPrivateCfg struct {
Filename string `description:"Private key to sign work submissions" barevalue:"yes" default:""`
}

type verifyingKeyPublicCfg struct {
Filename string `description:"Public key to verify signed work submissions" barevalue:"yes" default:""`
}

func filenameExists(filename string) error {
if _, err := os.Stat(filename); err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("%s does not exist", filename)
}

return err
}

return nil
}

func (cfg signingKeyPrivateCfg) Prepare() error {
return filenameExists(cfg.Filename)
}

func (cfg verifyingKeyPublicCfg) Prepare() error {
return filenameExists(cfg.Filename)
}

func (cfg signingKeyPrivateCfg) Run() error {
MainInstance.signingkey = cfg.Filename

return nil
}

func (cfg verifyingKeyPublicCfg) Run() error {
MainInstance.verifyingkey = cfg.Filename

return nil
}

func init() {
cmdline.RegisterConfigTypeForApp("receptor-workers",
"signingkeyprivate", "Private key to sign work submissions", signingKeyPrivateCfg{}, cmdline.Singleton)
cmdline.RegisterConfigTypeForApp("receptor-workers",
"verifyingkeypublic", "Public key to verify work submissions", verifyingKeyPublicCfg{}, cmdline.Singleton)
cmdline.RegisterConfigTypeForApp("receptor-workers",
"work-command", "Run a worker using an external command", commandCfg{}, cmdline.Section(workersSection))
cmdline.RegisterConfigTypeForApp("receptor-workers",
Expand Down Expand Up @@ -371,5 +416,5 @@ func (c Command) setup(wc *Workceptor) error {
return cw
}

return wc.RegisterWorker(c.WorkType, factory)
return wc.RegisterWorker(c.WorkType, factory, false)
}
49 changes: 45 additions & 4 deletions pkg/workceptor/controlsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ func intFromMap(config map[string]interface{}, name string) (int64, error) {
return 0, fmt.Errorf("field %s value %s is not convertible to an int", name, value)
}

func boolFromMap(config map[string]interface{}, name string) (bool, error) {
value, ok := config[name]
if !ok {
return false, fmt.Errorf("field %s missing", name)
}
valueBoolStr, ok := value.(string)
if !ok {
return false, fmt.Errorf("field %s must be a string", name)
}
if valueBoolStr == "true" {
return true, nil
}
if valueBoolStr == "false" {
return false, nil
}

return false, fmt.Errorf("field %s value %s is not convertible to an bool", name, value)
}

func (t *workceptorCommandType) InitFromJSON(config map[string]interface{}) (controlsvc.ControlCommand, error) {
subCmd, err := strFromMap(config, "subcommand")
if err != nil {
Expand Down Expand Up @@ -188,9 +207,27 @@ func (c *workceptorCommand) ControlFunc(nc *netceptor.Netceptor, cfo controlsvc.
if err != nil {
ttl = ""
}
signWork, err := boolFromMap(c.params, "signwork")
if err != nil {
signWork = false
}
signature, err := strFromMap(c.params, "signature")
if err != nil {
signature = ""
}
workParams := make(map[string]string)
nonParams := []string{"command", "subcommand", "node", "worktype", "tlsclient", "ttl", "signwork", "signature"}
inNonParams := func(p string) bool {
for _, nonparam := range nonParams {
if p == nonparam {
return true
}
}

return false
}
for k, v := range c.params {
if k == "command" || k == "subcommand" || k == "node" || k == "worktype" || k == "tlsclient" || k == "ttl" {
if ok := inNonParams(k); ok {
continue
}
vStr, ok := v.(string)
Expand All @@ -199,14 +236,18 @@ func (c *workceptorCommand) ControlFunc(nc *netceptor.Netceptor, cfo controlsvc.
}
workParams[k] = vStr
}
isLocalHost := strings.EqualFold(workNode, "localhost")
var worker WorkUnit
if workNode == nc.NodeID() || strings.EqualFold(workNode, "localhost") {
if workNode == nc.NodeID() || isLocalHost {
if ttl != "" {
return nil, fmt.Errorf("ttl option is intended for remote work only")
}
worker, err = c.w.AllocateUnit(workType, workParams)
if signWork {
return nil, fmt.Errorf("signwork option is intended for remote work only")
}
worker, err = c.w.AllocateUnit(workType, signature, isLocalHost, workParams)
} else {
worker, err = c.w.AllocateRemoteUnit(workNode, workType, tlsclient, ttl, workParams)
worker, err = c.w.AllocateRemoteUnit(workNode, workType, tlsclient, ttl, signWork, workParams)
}
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/workceptor/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type WorkUnit interface {
Restart() error
Cancel() error
Release(force bool) error
SetSignWork(signWork bool)
}

// NewWorkerFunc represents a factory of WorkUnit instances.
Expand Down
Loading

0 comments on commit ed12f53

Please sign in to comment.