Skip to content

Commit

Permalink
Merge pull request #8 in DEVOPS/orgalorg from limit-threading to master
Browse files Browse the repository at this point in the history
* commit '12ca7aa8dc9eaef38f0d839621105ccda88cbd63':
  bubble args from craateThreadsPool
  fix errornous typo
  move threads size debug
  rename pool to thread_pool
  limit threading capabilities & flag -d
  • Loading branch information
Селецкий Станислав committed Jun 23, 2016
2 parents bb6024f + 12ca7aa commit 87fbb42
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 69 deletions.
81 changes: 42 additions & 39 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,59 +49,62 @@ func runRemoteExecution(
errors := make(chan error, 0)
for _, node := range lockedNodes.nodes {
go func(node *distributedLockNode) {
tracef(
"%s",
hierr.Errorf(
command,
"%s starting command",
node.String(),
).Error(),
)
pool.run(func() {
tracef(
"%s",
hierr.Errorf(
command,
"%s starting command",
node.String(),
).Error(),
)

remoteNode, err := runRemoteExecutionNode(
node,
command,
logLock,
)
if err != nil {
errors <- err
return
}

if setupCallback != nil {
setupCallback(remoteNode)
}

remoteNode.command.SetStdout(remoteNode.stdout)
remoteNode.command.SetStderr(remoteNode.stderr)

err = remoteNode.command.Start()
if err != nil {
errors <- hierr.Errorf(
err,
`can't start remote command`,
remoteNode, err := runRemoteExecutionNode(
node,
command,
logLock,
)
if err != nil {
errors <- err
return
}

return
}
if setupCallback != nil {
setupCallback(remoteNode)
}

nodes.Set(node, remoteNode)
remoteNode.command.SetStdout(remoteNode.stdout)
remoteNode.command.SetStderr(remoteNode.stderr)

stdinsLock.Lock()
defer stdinsLock.Unlock()
err = remoteNode.command.Start()
if err != nil {
errors <- hierr.Errorf(
err,
`can't start remote command`,
)

stdins = append(stdins, remoteNode.stdin)
return
}

errors <- nil
nodes.Set(node, remoteNode)

stdinsLock.Lock()
defer stdinsLock.Unlock()

stdins = append(stdins, remoteNode.stdin)

errors <- nil
})
}(node)
}

for range lockedNodes.nodes {
for _, node := range lockedNodes.nodes {
err := <-errors
if err != nil {
return nil, hierr.Errorf(
err,
`can't run remote command on node`,
`remote execution failed on node: '%s'`,
node,
)
}
}
Expand Down
4 changes: 4 additions & 0 deletions debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func infof(format string, args ...interface{}) {
func warningf(format string, args ...interface{}) {
args = serializeErrors(args)

if verbose <= verbosityQuiet {
return
}

logger.Warningf(format, args...)
}

Expand Down
47 changes: 27 additions & 20 deletions lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,41 @@ func acquireDistributedLock(
failOnError: failOnError,
}

nodeIndex = int64(0)
errors = make(chan error, 0)
connectedCount = int64(0)
failedCount = int64(0)

errors = make(chan error, 0)

mutex = &sync.Mutex{}
)

for _, nodeAddress := range addresses {
go func(nodeAddress address) {
err := connectToNode(cluster, runnerFactory, nodeAddress, mutex)

if err != nil {
if noConnFail {
warningf("%s", err)
errors <- nil
} else {
errors <- err
}
pool.run(func() {
err := connectToNode(cluster, runnerFactory, nodeAddress, mutex)

return
}
if err != nil {
atomic.AddInt64(&failedCount, 1)

debugf(`%4d/%d connection established: %s`,
atomic.AddInt64(&nodeIndex, 1),
len(addresses),
nodeAddress,
)
if noConnFail {
warningf("%s", err)
errors <- nil
} else {
errors <- err
}

return
}

debugf(`%4d/%d (failed: %d) connection established: %s`,
atomic.AddInt64(&connectedCount, 1),
int64(len(addresses))-failedCount,
failedCount,
nodeAddress,
)

errors <- err
errors <- err
})
}(nodeAddress)
}

Expand Down Expand Up @@ -89,7 +96,7 @@ func connectToNode(
if err != nil {
return hierr.Errorf(
err,
`can't create runner for address: %s`,
`can't connect to address: %s`,
address,
)
}
Expand Down
21 changes: 19 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ Advanced options:
using '-g', they will be appended to shell
invocation.
[default: bash -c $'{}']
-d --threads <n> Set threads count which will be used for connection,
locking and execution commands.
[default: 16].
--json Output everything in line-by-line JSON format,
printing objects with fields:
* 'stream' = 'stdout' | 'stderr';
Expand All @@ -168,12 +171,12 @@ Timeout options:
--conn-timeout <t> Remote host connection timeout in milliseconds.
[default: 10000]
--send-timeout <t> Remote host connection data sending timeout in
milliseconds. [default: 10000]
milliseconds. [default: 60000]
NOTE: send timeout will be also used for the
heartbeat messages, that orgalorg and connected nodes
exchanges through synchronization process.
--recv-timeout <t> Remote host connection data receiving timeout in
milliseconds. [default: 10000]
milliseconds. [default: 60000]
--keep-alive <t> How long to keep connection keeped alive after session
ends. [default: 10000]
`
Expand All @@ -195,6 +198,8 @@ var (
logger = lorg.NewLog()
verbose = verbosityNormal
format = outputFormatText

pool *threadPool
)

var (
Expand Down Expand Up @@ -237,6 +242,16 @@ func main() {
exit(1)
}

poolSize, err := parseThreadPoolSize(args)
if err != nil {
errorf("%s", hierr.Errorf(
err,
`--threads given invalid value`,
))
}

pool = newThreadPool(poolSize)

switch {
case args["--upload"].(bool):
fallthrough
Expand Down Expand Up @@ -600,6 +615,8 @@ func connectAndLock(
)
}

debugf(`using %d threads`, pool.size)

debugf(`connecting to %d nodes`, len(addresses))

if lockFile == "" {
Expand Down
31 changes: 26 additions & 5 deletions remote_execution.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"io"

"github.com/seletskiy/hierr"
Expand All @@ -23,18 +24,34 @@ func (execution *remoteExecution) wait() error {
results := make(chan *remoteExecutionResult, 0)
for _, node := range execution.nodes {
go func(node *remoteExecutionNode) {
results <- &remoteExecutionResult{node, node.wait()}
pool.run(func() {
results <- &remoteExecutionResult{node, node.wait()}
})
}(node)
}

executionErrors := fmt.Errorf(
`can't run remote commands on %d nodes`,
len(execution.nodes),
)

erroneous := false

for range execution.nodes {
result := <-results
if result.err != nil {
return hierr.Errorf(
result.err,
`%s has finished with error`,
result.node.node.String(),
executionErrors = hierr.Push(
executionErrors,
hierr.Errorf(
result.err,
`%s has finished with error`,
result.node.node.String(),
),
)

erroneous = true

continue
}

tracef(
Expand All @@ -43,5 +60,9 @@ func (execution *remoteExecution) wait() error {
)
}

if erroneous {
return executionErrors
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ tests:not tests:ensure :orgalorg:with-key -o example.com -C whoami

tests:ensure :orgalorg:with-key -o example.com -w -C whoami

tests:assert-stderr-re "WARNING.*can't create runner.*example.com"
tests:assert-stderr-re "WARNING.*can't connect to address.*example.com"

:check-node-output() {
local container_ip="$2"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
tests:not tests:ensure :orgalorg:with-key -e -C echo 1 '&&' exit 1

:check-node-output() {
local container_ip="$2"

tests:assert-stdout "$container_ip 1"
tests:assert-stderr-re "$container_ip.*non-zero code: 1"
}

containers:do :check-node-output
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
tests:not tests:ensure :orgalorg:with-key --json -o example.com -C pwd

tests:assert-stderr-re '"stream":"stderr"'
tests:assert-stderr-re '"body":".*ERROR.*create runner for address'
tests:assert-stderr-re '"body":".*ERROR.*connect to address'
tests:not tests:assert-stderr '└─'
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
tests:not tests:ensure :orgalorg:with-key -o example.com -C pwd

tests:assert-stderr "└─ can't create runner"
tests:assert-stderr "└─ can't connect to address"
50 changes: 50 additions & 0 deletions thread_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"strconv"

"github.com/seletskiy/hierr"
)

type threadPool struct {
available chan struct{}

size int
}

func newThreadPool(size int) *threadPool {
available := make(chan struct{}, size)
for i := 0; i < size; i++ {
available <- struct{}{}
}

return &threadPool{
available,
size,
}
}

func (pool *threadPool) run(task func()) {
<-pool.available
defer func() {
pool.available <- struct{}{}
}()

task()
}

func parseThreadPoolSize(args map[string]interface{}) (int, error) {
var (
poolSizeRaw = args["--threads"].(string)
)

poolSize, err := strconv.Atoi(poolSizeRaw)
if err != nil {
return 0, hierr.Errorf(
err,
`can't parse threads count`,
)
}

return poolSize, nil
}

0 comments on commit 87fbb42

Please sign in to comment.