diff --git a/dexbot/controllers/main_controller.py b/dexbot/controllers/main_controller.py index ebe422e5a..7995c6362 100644 --- a/dexbot/controllers/main_controller.py +++ b/dexbot/controllers/main_controller.py @@ -19,7 +19,7 @@ def __init__(self, bitshares_instance, config): self.config = config self.worker_manager = None - # Configure logging + # Configure per_worker logging data_dir = user_data_dir(APP_NAME, AUTHOR) filename = os.path.join(data_dir, 'dexbot.log') formatter = logging.Formatter( @@ -35,6 +35,14 @@ def __init__(self, bitshares_instance, config): logger.info("DEXBot {} on python {} {}".format(VERSION, sys.version[:6], sys.platform), extra={ 'worker_name': 'NONE', 'account': 'NONE', 'market': 'NONE'}) + # Configure root logger + logger = logging.getLogger("dexbot") + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + fh = logging.FileHandler(filename) + fh.setFormatter(formatter) + logger.addHandler(fh) + logger.setLevel(logging.INFO) + # Configure orders logging initialize_orders_log() diff --git a/dexbot/helper.py b/dexbot/helper.py index 4f778b4b7..9682f2f80 100644 --- a/dexbot/helper.py +++ b/dexbot/helper.py @@ -3,6 +3,7 @@ import shutil import errno import logging +import pkgutil from appdirs import user_data_dir from dexbot import APP_NAME, AUTHOR @@ -77,6 +78,15 @@ def initialize_orders_log(): logger.info("worker_name;ID;operation_type;base_asset;base_amount;quote_asset;quote_amount;timestamp") +def iter_namespace(ns_pkg): + # https://packaging.python.org/guides/creating-and-discovering-plugins/ + # Specifying the second argument (prefix) to iter_modules makes the + # returned name an absolute name instead of a relative one. This allows + # import_module to work without having to do additional modification to + # the name. + return pkgutil.iter_modules(ns_pkg.__path__, ns_pkg.__name__ + ".") + + try: # Unfortunately setuptools is only "kinda-sorta" a standard module # it's available on pretty much any modern Python system, but some embedded Pythons may not have it diff --git a/dexbot/plugin.py b/dexbot/plugin.py new file mode 100644 index 000000000..85889f0a3 --- /dev/null +++ b/dexbot/plugin.py @@ -0,0 +1,81 @@ +import asyncio +import threading +import importlib +import logging + +import dexbot.plugins +from dexbot.helper import iter_namespace + +from bitshares import BitShares + +log = logging.getLogger(__name__) + +class PluginInfrastructure(threading.Thread): + """ Run plugins as asyncio tasks + + :param dict config: dexbot config + + PluginInfrastructure class is needed to be able to run asyncio plugins while having synchronous core. After + switching to asyncio-aware main thread we may continue to use all plugins without refactoring them. + """ + + def __init__(self, config): + super().__init__() + + self.bitshares = BitShares(node=config['node'], num_retries=-1) + self.config = config + self.loop = None + self.need_stop = False + self.plugins = [] + + def run(self): + log.debug('Starting PluginInfrastructure thread') + self.init_plugins() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.loop.create_task(self.run_plugins()) + self.loop.create_task(self.stop_handler()) + self.loop.run_forever() + + def init_plugins(self): + """ Initialize plugin instances + """ + plugins = {name: importlib.import_module(name) for finder, name, ispkg in iter_namespace(dexbot.plugins)} + + for name, plugin in plugins.items(): + self.plugins.append(plugin.Plugin(config=self.config, bitshares_instance=self.bitshares)) + + async def run_plugins(self): + """ Run each discovered plugin by calling Plugin.main() + """ + # Schedule every plugin as asyncio Task; use ensure_future() for python3.6 compatibility + tasks = [asyncio.ensure_future(plugin.main()) for plugin in self.plugins] + try: + # Wait until all plugins are finished, but catch exceptions immediately as they occure + await asyncio.gather(*tasks, return_exceptions=False) + except asyncio.CancelledError: + # Note: task.cancel() will not propagate this exception here, so it will appear only on current task cancel + log.debug('Stopping run_plugins()') + except Exception: + log.exception('Task finished with exception:') + + async def stop_handler(self): + """ Watch for self.need_stop flag to cancel tasks and stop the thread + + With this solution it's easier to achieve correct tasks stopping. self.loop.call_soon_threadsafe() requires + additional wrapping to stop tasks or catch exceptions. + """ + while True: + if self.need_stop: + log.debug('Stopping event loop') + tasks = [task for task in asyncio.Task.all_tasks() if task is not asyncio.tasks.Task.current_task()] + # Cancel all tasks + list(map(lambda task: task.cancel(), tasks)) + # Wait for tasks finish + results = await asyncio.gather(*tasks, return_exceptions=True) + log.debug('Finished awaiting cancelled tasks, results: {0}'.format(results)) + # Stop the event loop + self.loop.stop() + return + else: + await asyncio.sleep(1) diff --git a/dexbot/plugins/__init__.py b/dexbot/plugins/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dexbot/plugins/dummy.py.example b/dexbot/plugins/dummy.py.example new file mode 100644 index 000000000..70a42285d --- /dev/null +++ b/dexbot/plugins/dummy.py.example @@ -0,0 +1,31 @@ +import asyncio +import logging + +log = logging.getLogger(__name__) + + +class Plugin: + """ Example plugin class + + Plugin must have main() method to run. main() is expected to be an asyncio coroutine + """ + + def __init__(self, config=None, bitshares_instance=None): + pass + + async def do_stuff(self): + log.info('Doing some stuff') + await asyncio.sleep(10) + log.info('Stuff done') + + async def boom(self): + raise Exception('Boom!') + + async def main(self): + try: + while True: + await self.do_stuff() + await asyncio.sleep(5) + await self.boom() + except asyncio.CancelledError: + log.info('Stopping correctly') diff --git a/dexbot/storage.py b/dexbot/storage.py index 96203c9a8..a68a8ab5e 100644 --- a/dexbot/storage.py +++ b/dexbot/storage.py @@ -163,37 +163,59 @@ def __init__(self): self.lock = threading.Lock() self.event = threading.Event() + # Daemon thread means it will be abruptly killed on parent thread shutdown self.daemon = True self.start() def run(self): + # Continuously iterate over task queue and execute tasks for func, args, token in iter(self.task_queue.get, None): if token is not None: args = args+(token,) func(*args) def _get_result(self, token): + """ Get task result from all results by token + + This function is invoked from self.execute() when caller thread executes a method which needs to return some + result back to caller. On each loop iteration results are being examined for specified task token. If it is + not yet available, the loop execution suspended until DatabaseWorker thread will set self.event flag + indicating some task processing is completed. _get_result() is called from another thread while queue + processing is performed inside the current thread. + """ while True: with self.lock: if token in self.results: + # Find and return task results by token return_value = self.results[token] del self.results[token] return return_value else: + # Suspend loop execution and wait for flag self.event.clear() + # Block loop execution waiting Event flag set() from DatabaseWorker thread self.event.wait() def _set_result(self, token, result): + """ Associate query results with task token + """ with self.lock: self.results[token] = result self.event.set() def execute(self, func, *args): + """ Create queue task and return task result + """ + # Token is an unique task identifier token = str(uuid.uuid4) + # Schedule query execution into DatabaseWorker thread queue self.task_queue.put((func, args, token)) + # Return results when they will be available return self._get_result(token) def execute_noreturn(self, func, *args): + """ Create queue task without returning a result + """ self.task_queue.put((func, args, None)) def set_item(self, category, key, value): diff --git a/dexbot/ui.py b/dexbot/ui.py index 4408f1f0f..aa974fcf0 100644 --- a/dexbot/ui.py +++ b/dexbot/ui.py @@ -51,7 +51,13 @@ def new_func(ctx, *args, **kwargs): # Set the root logger with basic format ch = logging.StreamHandler() ch.setFormatter(formatter1) + fh = logging.FileHandler('dexbot.log') + fh.setFormatter(formatter1) + + # Root logger also logs into stream and file with respect of cli-defined verbosity logging.getLogger("dexbot").addHandler(ch) + logging.getLogger("dexbot").addHandler(fh) + logging.getLogger("dexbot").setLevel(getattr(logging, verbosity.upper())) logging.getLogger("").handlers = [] # GrapheneAPI logging diff --git a/dexbot/worker.py b/dexbot/worker.py index ccdaf126c..1bd138e49 100644 --- a/dexbot/worker.py +++ b/dexbot/worker.py @@ -6,6 +6,7 @@ import copy import dexbot.errors as errors +from dexbot.plugin import PluginInfrastructure from dexbot.strategies.base import StrategyBase from bitshares import BitShares @@ -173,6 +174,8 @@ def add_worker(self, worker_name, config): self.update_notify() def run(self): + self.plugins_thread = PluginInfrastructure(self.config) + self.plugins_thread.start() self.init_workers(self.config) self.update_notify() self.notify.listen() @@ -206,6 +209,9 @@ def stop(self, worker_name=None, pause=False): self.workers[worker].pause() self.workers = [] + # Notify plugins to stop + self.plugins_thread.need_stop = True + # Update other workers if len(self.workers) > 0: self.update_notify()