Skip to content

Commit

Permalink
commands support & parallel lock
Browse files Browse the repository at this point in the history
  • Loading branch information
seletskiy committed May 27, 2016
1 parent 73c40bc commit 2d15b80
Show file tree
Hide file tree
Showing 16 changed files with 564 additions and 208 deletions.
51 changes: 29 additions & 22 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,40 @@ import (
"io"
"os"
"path/filepath"
"sync"
"syscall"

"github.com/seletskiy/hierr"
)

func startArchiveReceivers(
lockedNodes *distributedLock,
args map[string]interface{},
) (*archiveReceivers, error) {
var (
rootDir = args["--root"].(string)
)

rootDir string,
) (*remoteExecution, error) {
archiveReceiverCommandString := fmt.Sprintf(
`tar -x --verbose --directory="%s"`,
rootDir,
)

unpackers := []io.WriteCloser{}

nodes := []archiveReceiverNode{}
nodes := []remoteExecutionNode{}

logMutex := &sync.Mutex{}

for _, node := range lockedNodes.nodes {
debugf(hierr.Errorf(
archiveReceiverCommandString,
"%s starting archive receiver command",
node.String(),
).Error())
tracef(
"%s",
hierr.Errorf(
archiveReceiverCommandString,
"%s starting archive receiver command",
node.String(),
).Error(),
)

archiveReceiverCommand, err := node.runner.Command(
archiveReceiverCommandString,
)

if err != nil {
return nil, hierr.Errorf(
err,
Expand All @@ -57,21 +58,24 @@ func startArchiveReceivers(
unpackers = append(unpackers, stdin)

stdout := newLineFlushWriter(
logMutex,
newPrefixWriter(
newDebugWriter(logger),
fmt.Sprintf("%s {tar} <stdout> ", node.String()),
node.String()+" {tar} <stdout> ",
),
true,
)

archiveReceiverCommand.SetStdout(stdout)

stderr := newLineFlushWriter(
logMutex,
newPrefixWriter(
newDebugWriter(logger),
fmt.Sprintf("%s {tar} <stderr> ", node.String()),
node.String()+" {tar} <stderr> ",
),
true,
)

archiveReceiverCommand.SetStdout(stdout)
archiveReceiverCommand.SetStderr(stderr)

err = archiveReceiverCommand.Start()
Expand All @@ -82,13 +86,16 @@ func startArchiveReceivers(
)
}

nodes = append(nodes, archiveReceiverNode{
nodes = append(nodes, remoteExecutionNode{
node: node,
command: archiveReceiverCommand,

stdout: stdout,
stderr: stderr,
})
}

return &archiveReceivers{
return &remoteExecution{
stdin: multiWriteCloser{unpackers},
nodes: nodes,
}, nil
Expand All @@ -106,7 +113,7 @@ func archiveFilesToWriter(target io.Writer, files []string) error {
archive := tar.NewWriter(target)
for fileIndex, fileName := range files {
logger.Infof(
"(%d/%d) sending file: '%s'",
"%5d/%d sending file: '%s'",
fileIndex+1,
len(files),
fileName,
Expand All @@ -115,7 +122,7 @@ func archiveFilesToWriter(target io.Writer, files []string) error {
writeFileToArchive(fileName, archive, workDir)
}

debugf("closing archive stream, %d files sent", len(files))
tracef("closing archive stream, %d files sent", len(files))

err = archive.Close()
if err != nil {
Expand Down Expand Up @@ -168,7 +175,7 @@ func writeFileToArchive(
ModTime: fileInfo.ModTime(),
}

debugf(
tracef(
hierr.Errorf(
fmt.Sprintf(
"size: %d bytes; mode: %o; uid/gid: %d/%d; modtime: %s",
Expand Down
8 changes: 0 additions & 8 deletions archive_receiver_node.go

This file was deleted.

46 changes: 0 additions & 46 deletions archive_receivers.go

This file was deleted.

137 changes: 137 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package main

import (
"io"
"os"
"strings"
"sync"

"github.com/seletskiy/hierr"
)

func runCommand(
lockedNodes *distributedLock,
command []string,
verbosityLevel verbosity,
) error {
commandString := joinCommand(command)

remoteCommands := map[*distributedLockNode]*remoteExecutionNode{}

logMutex := &sync.Mutex{}

for _, node := range lockedNodes.nodes {
tracef(
"%s",
hierr.Errorf(
commandString,
"%s starting command",
node.String(),
).Error(),
)

remoteCommand, err := node.runner.Command(
commandString,
)
if err != nil {
return hierr.Errorf(
err,
`can't create remote command`,
)
}

var stdout io.WriteCloser
var stderr io.WriteCloser
switch verbosityLevel {
default:
stdout = newLineFlushWriter(
logMutex,
newPrefixWriter(
os.Stdout,
node.address.domain+" ",
),
true,
)

stderr = newLineFlushWriter(
logMutex,
newPrefixWriter(
os.Stderr,
node.address.domain+" ",
),
true,
)

case verbosityQuiet:
stdout = newLineFlushWriter(logMutex, os.Stdout, false)
stderr = newLineFlushWriter(logMutex, os.Stderr, false)

case verbosityDebug:
stdout = newLineFlushWriter(
logMutex,
newPrefixWriter(
newDebugWriter(logger),
node.String()+" {cmd} <stdout> ",
),
false,
)

stderr = newLineFlushWriter(
logMutex,
newPrefixWriter(
newDebugWriter(logger),
node.String()+" {cmd} <stderr> ",
),
false,
)
}

remoteCommand.SetStdout(stdout)
remoteCommand.SetStderr(stderr)

err = remoteCommand.Start()
if err != nil {
return hierr.Errorf(
err,
`can't start remote command`,
)
}

remoteCommands[node] = &remoteExecutionNode{
node: node,
command: remoteCommand,

stdout: stdout,
stderr: stderr,
}
}

for node, remoteCommand := range remoteCommands {
err := remoteCommand.command.Wait()
_ = remoteCommand.stdout.Close()
_ = remoteCommand.stderr.Close()
if err != nil {
return hierr.Errorf(
err,
`%s can't wait for remote command to finish: '%s'`,
node.String(),
commandString,
)
}
}

return nil
}

func joinCommand(command []string) string {
escapedParts := []string{}

for _, part := range command {
part = strings.Replace(part, `"`, `\"`, -1)
part = strings.Replace(part, `\`, `\\`, -1)

escapedParts = append(escapedParts, part)
}

return strings.Join(escapedParts, " ")
}
53 changes: 0 additions & 53 deletions concurrency_test.go

This file was deleted.

Loading

0 comments on commit 2d15b80

Please sign in to comment.