Skip to content

Commit

Permalink
Merge pull request raiden-network#2880 from ulope/sp/more_logs
Browse files Browse the repository at this point in the history
Improve startup time, record node's stdout / stderr
  • Loading branch information
ulope authored Oct 24, 2018
2 parents f543900 + c542fdc commit 0f7774c
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 23 deletions.
1 change: 1 addition & 0 deletions raiden/ui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def _setup_matrix(config):
# fetch list of known servers from raiden-network/raiden-tranport repo
available_servers_url = DEFAULT_MATRIX_KNOWN_SERVERS[config['environment_type']]
available_servers = get_matrix_servers(available_servers_url)
log.debug('Fetching available matrix servers', available_servers=available_servers)
config['transport']['matrix']['available_servers'] = available_servers

try:
Expand Down
4 changes: 3 additions & 1 deletion tools/scenario-player/scenario_player/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def main(
mailgun_api_key,
)
except ScenarioError as ex:
log.error('Run finished', result='scenario error')
log.exception('Run finished', result='scenario error')
send_notification_mail(
runner.notification_email,
f'Invalid scenario {scenario_file.name}',
Expand All @@ -167,6 +167,8 @@ def main(
while not ui_greenlet.dead:
gevent.sleep(1)
finally:
if runner.is_managed:
runner.node_controller.stop()
if not ui_greenlet.dead:
ui_greenlet.kill(ExitMainLoop)
ui_greenlet.join()
Expand Down
76 changes: 70 additions & 6 deletions tools/scenario-player/scenario_player/node_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import time
from datetime import timedelta
from enum import Enum
from pathlib import Path
from tarfile import TarFile
from zipfile import ZipFile
Expand All @@ -19,7 +20,8 @@
from eth_keyfile import create_keyfile_json
from eth_utils import to_checksum_address
from gevent import Greenlet
from gevent.pool import Pool
from gevent.pool import Group, Pool
from mirakuru import ProcessExitedWithError

from scenario_player.exceptions import ScenarioError
from scenario_player.runner import ScenarioRunner
Expand Down Expand Up @@ -63,6 +65,11 @@
}


class NodeState(Enum):
STOPPED = 1
STARTED = 2


class RaidenReleaseKeeper:
def __init__(self, release_cache_dir: Path):
self._releases = {}
Expand Down Expand Up @@ -156,6 +163,10 @@ def __init__(self, runner: ScenarioRunner, index: int, raiden_version, options:
self._executor = None
self._port = None

self.state: NodeState = NodeState.STOPPED

self._output_files = {}

if options.pop('_clean', False):
shutil.rmtree(self._datadir)
self._datadir.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -189,22 +200,41 @@ def start(self):
address=self.address,
port=self._api_address.rpartition(':')[2],
)
self._output_files['stdout'] = self._stdout_file.open('at', 1)
self._output_files['stderr'] = self._stderr_file.open('at', 1)
for file in self._output_files.values():
file.write('--------- Starting ---------\n')

begin = time.monotonic()
ret = self.executor.start()
try:
ret = self.executor.start(**self._output_files)
except ProcessExitedWithError as ex:
raise ScenarioError(f'Failed to start Raiden node {self._index}') from ex
self.state = NodeState.STARTED
duration = str(timedelta(seconds=time.monotonic() - begin))
log.info('Node started', node=self._index, duration=duration)
return ret

def stop(self, timeout=600):
def stop(self, timeout=60):
log.info('Stopping node', node=self._index)
begin = time.monotonic()
self.state = NodeState.STOPPED
ret = self.executor.stop(timeout=timeout)
duration = str(timedelta(seconds=time.monotonic() - begin))
for file in self._output_files.values():
file.write('--------- Stopped ---------\n')
file.close()
self._output_files = {}
log.info('Node stopped', node=self._index, duration=duration)
return ret

def kill(self):
log.info('Killing node', node=self._index)
for file in self._output_files.values():
file.write('--------- Killed ---------\n')
file.close()
self._output_files = {}
self.state = NodeState.STOPPED
return self.executor.kill()

@property
Expand Down Expand Up @@ -251,8 +281,6 @@ def _command(self):
self._password_file,
'--network-id',
self._runner.chain_id,
'--environment-type',
self._runner.environment_type.name.lower(),
'--sync-check', # FIXME: Disable sync check for private chains
'--gas-price',
self._options.get('gas-price', 'normal'),
Expand All @@ -273,6 +301,9 @@ def _command(self):
if option_name in self._options:
cmd.extend([f'--{option_name}', self._options[option_name]])

if 'environment-type' in self._options:
cmd.extend(['--environment-type', self._options['environment-type']])

# Ensure path instances are converted to strings
cmd = [str(c) for c in cmd]
return cmd
Expand Down Expand Up @@ -313,7 +344,15 @@ def _api_address(self):

@property
def _log_file(self):
return self._datadir.joinpath(f'run-{self._runner.run_number}.log')
return self._datadir.joinpath(f'run-{self._runner.run_number:03d}.log')

@property
def _stdout_file(self):
return self._datadir.joinpath(f'run-{self._runner.run_number:03d}.stdout')

@property
def _stderr_file(self):
return self._datadir.joinpath(f'run-{self._runner.run_number:03d}.stderr')


class NodeController:
Expand Down Expand Up @@ -355,6 +394,7 @@ def _start():
for runner in self._node_runners:
pool.start(Greenlet(runner.start))
pool.join(raise_error=True)
log.info('All nodes started')

starter = gevent.spawn(_start)
if wait:
Expand All @@ -365,11 +405,13 @@ def stop(self):
log.info('Stopping nodes')
stop_tasks = [gevent.spawn(runner.stop) for runner in self._node_runners]
gevent.joinall(stop_tasks, raise_error=True)
log.info('Nodes stopped')

def kill(self):
log.info('Killing nodes')
kill_tasks = [gevent.spawn(runner.kill) for runner in self._node_runners]
gevent.joinall(kill_tasks, raise_error=True)
log.info('Nodes killed')

def initialize_nodes(self):
for runner in self._node_runners:
Expand All @@ -378,3 +420,25 @@ def initialize_nodes(self):
@property
def addresses(self):
return {runner.address for runner in self._node_runners}

def start_node_monitor(self):
def _monitor(runner: NodeRunner):
while not self._runner.root_task.done:
if runner.state is NodeState.STARTED:
try:
runner.executor.check_subprocess()
except ProcessExitedWithError as ex:
raise ScenarioError(
f'Raiden node {runner._index} died with non-zero exit status',
) from ex
gevent.sleep(.5)

monitor_group = Group()
for runner in self._node_runners:
monitor_group.start(Greenlet(_monitor, runner))

def _wait():
while not monitor_group.join(.5, raise_error=True):
pass

return gevent.spawn(_wait)
26 changes: 14 additions & 12 deletions tools/scenario-player/scenario_player/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from web3 import HTTPProvider, Web3

from raiden.accounts import Account
from raiden.constants import Environment
from raiden.network.rpc.client import JSONRPCClient
from scenario_player.exceptions import NodesUnreachableError, ScenarioError, TokenRegistrationError
from scenario_player.utils import (
Expand Down Expand Up @@ -142,10 +141,6 @@ def __init__(
)

self.chain_id = self.client.web3.net.version
# FIXME: Support main net type for test chains
self.environment_type = (
Environment.PRODUCTION if self.chain_id == 1 else Environment.DEVELOPMENT
)

balance = self.client.balance(account.address)
if balance < OWN_ACCOUNT_BALANCE_MIN:
Expand Down Expand Up @@ -238,13 +233,9 @@ def run_scenario(self):
elif balance > token_balance_min:
log.warning("Node is overfunded", address=address, balance=balance)

all_tx = mint_tx
if fund_tx:
all_tx.extend(fund_tx)

wait_for_txs(self.client, all_tx)
wait_for_txs(self.client, mint_tx + fund_tx)

if node_starter:
if node_starter is not None:
node_starter.get(block=True)

registered_tokens = set(
Expand All @@ -258,7 +249,18 @@ def run_scenario(self):
log.error("Couldn't register token with network", code=code, message=msg)
raise TokenRegistrationError(msg)

self.root_task()
# Start root task
root_task_greenlet = gevent.spawn(self.root_task)
greenlets = [root_task_greenlet]
if self.is_managed:
greenlets.append(self.node_controller.start_node_monitor())
try:
gevent.joinall(greenlets, raise_error=True)
except BaseException:
if not root_task_greenlet.dead:
# Make sure we kill the tasks if a node dies
root_task_greenlet.kill()
raise

def register_token(self, token_address, node):
try:
Expand Down
2 changes: 1 addition & 1 deletion tools/scenario-player/scenario_player/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __call__(self, *args, **kwargs):
self._start_time = time.monotonic()
try:
return self._run(*args, **kwargs)
except Exception as ex:
except BaseException as ex:
self.state = TaskState.ERRORED
log.exception('Task errored', task=self)
self.exception = ex
Expand Down
41 changes: 38 additions & 3 deletions tools/scenario-player/scenario_player/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import os
import platform
import subprocess
import time
import uuid
from collections import defaultdict, deque
Expand All @@ -14,8 +16,8 @@
import structlog
from eth_keyfile import decode_keyfile_json
from eth_utils import encode_hex, to_checksum_address
from mirakuru import TimeoutExpired
from mirakuru.base import IGNORED_ERROR_CODES
from mirakuru import AlreadyRunning, TimeoutExpired
from mirakuru.base import ENV_UUID, IGNORED_ERROR_CODES
from requests.adapters import HTTPAdapter
from web3 import HTTPProvider, Web3
from web3.gas_strategies.time_based import fast_gas_price_strategy, medium_gas_price_strategy
Expand Down Expand Up @@ -85,6 +87,39 @@ def convert(self, value, param, ctx):


class HTTPExecutor(mirakuru.HTTPExecutor):
def start(self, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
""" Merged copy paste from the inheritance chain with modified stdout/err behaviour """
if self.pre_start_check():
# Some other executor (or process) is running with same config:
raise AlreadyRunning(self)

if self.process is None:
command = self.command
if not self._shell:
command = self.command_parts

env = os.environ.copy()
env[ENV_UUID] = self._uuid
popen_kwargs = {
'shell': self._shell,
'stdin': subprocess.PIPE,
'stdout': stdout,
'stderr': stderr,
'universal_newlines': True,
'env': env,
}
if platform.system() != 'Windows':
popen_kwargs['preexec_fn'] = os.setsid
self.process = subprocess.Popen(
command,
**popen_kwargs,
)

self._set_timeout()

self.wait_for(self.check_subprocess)
return self

def stop(self, sig=None, timeout=10):
""" Copy paste job from `SimpleExecutor.stop()` to add the `timeout` parameter. """
if self.process is None:
Expand All @@ -109,7 +144,7 @@ def process_stopped():
try:
self.wait_for(process_stopped)
except TimeoutExpired:
# at this moment, process got killed,
log.warning('Timeout expired, killing process', process=self)
pass

self._kill_all_kids(sig)
Expand Down

0 comments on commit 0f7774c

Please sign in to comment.