Skip to content

Commit

Permalink
extract remoteNodes map to type
Browse files Browse the repository at this point in the history
  • Loading branch information
seletskiy committed Jun 10, 2016
1 parent 1bae4e3 commit 8649a2f
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,36 @@ import (
"github.com/seletskiy/hierr"
)

type remoteNodesMap map[*distributedLockNode]*remoteExecutionNode

type remoteNodes struct {
*sync.Mutex

nodes remoteNodesMap
}

func (nodes *remoteNodes) Set(
node *distributedLockNode,
remote *remoteExecutionNode,
) {
nodes.Lock()
defer nodes.Unlock()

nodes.nodes[node] = remote
}

func runRemoteExecution(
lockedNodes *distributedLock,
command string,
setupCallback func(*remoteExecutionNode),
) (*remoteExecution, error) {
var (
stdins = []io.WriteCloser{}
remoteNodes = map[*distributedLockNode]*remoteExecutionNode{}
stdins = []io.WriteCloser{}

logLock = &sync.Mutex{}
stdinsLock = &sync.Mutex{}

logMutex = &sync.Mutex{}
nodesMapMutex = &sync.Mutex{}
nodes = &remoteNodes{&sync.Mutex{}, remoteNodesMap{}}
)

errors := make(chan error, 0)
Expand All @@ -38,7 +57,7 @@ func runRemoteExecution(
remoteNode, err := runRemoteExecutionNode(
node,
command,
logMutex,
logLock,
)
if err != nil {
errors <- err
Expand All @@ -62,12 +81,12 @@ func runRemoteExecution(
return
}

nodesMapMutex.Lock()
{
stdins = append(stdins, remoteNode.stdin)
remoteNodes[node] = remoteNode
}
nodesMapMutex.Unlock()
nodes.Set(node, remoteNode)

stdinsLock.Lock()
defer stdinsLock.Unlock()

stdins = append(stdins, remoteNode.stdin)

errors <- nil
}(node)
Expand All @@ -86,14 +105,14 @@ func runRemoteExecution(
return &remoteExecution{
stdin: &multiWriteCloser{stdins},

nodes: remoteNodes,
nodes: nodes.nodes,
}, nil
}

func runRemoteExecutionNode(
node *distributedLockNode,
command string,
logMutex *sync.Mutex,
logLock *sync.Mutex,
) (*remoteExecutionNode, error) {
remoteCommand, err := node.runner.Command(command)
if err != nil {
Expand All @@ -107,16 +126,16 @@ func runRemoteExecutionNode(
var stderr io.WriteCloser
switch verbose {
case verbosityQuiet:
stdout = lineflushwriter.New(nopCloser{os.Stdout}, logMutex, false)
stderr = lineflushwriter.New(nopCloser{os.Stderr}, logMutex, false)
stdout = lineflushwriter.New(nopCloser{os.Stdout}, logLock, false)
stderr = lineflushwriter.New(nopCloser{os.Stderr}, logLock, false)

case verbosityNormal:
stdout = lineflushwriter.New(
prefixwriter.New(
nopCloser{os.Stdout},
node.address.domain+" ",
),
logMutex,
logLock,
true,
)

Expand All @@ -125,7 +144,7 @@ func runRemoteExecutionNode(
nopCloser{os.Stderr},
node.address.domain+" ",
),
logMutex,
logLock,
true,
)

Expand All @@ -137,7 +156,7 @@ func runRemoteExecutionNode(
newDebugWriter(logger),
node.String()+" {cmd} <stdout> ",
),
logMutex,
logLock,
false,
)

Expand All @@ -146,7 +165,7 @@ func runRemoteExecutionNode(
newDebugWriter(logger),
node.String()+" {cmd} <stderr> ",
),
logMutex,
logLock,
false,
)
}
Expand Down

0 comments on commit 8649a2f

Please sign in to comment.