From e41e23280698db65ee1cfda634aa3c534bb4511a Mon Sep 17 00:00:00 2001 From: Sascha Ohms Date: Wed, 19 Aug 2020 17:16:34 +0200 Subject: [PATCH 1/5] initial refactor for data processing --- mapadroid/mitm_receiver/MITMReceiver.py | 28 ++---------- .../mitm_receiver/MitmDataProcessorManager.py | 44 +++++++++++++++++++ mapadroid/mitm_receiver/MitmMapper.py | 11 +++-- ...ssor.py => SerializedMitmDataProcessor.py} | 43 +++++------------- start.py | 21 ++++++--- 5 files changed, 81 insertions(+), 66 deletions(-) create mode 100644 mapadroid/mitm_receiver/MitmDataProcessorManager.py rename mapadroid/mitm_receiver/{MITMDataProcessor.py => SerializedMitmDataProcessor.py} (73%) diff --git a/mapadroid/mitm_receiver/MITMReceiver.py b/mapadroid/mitm_receiver/MITMReceiver.py index 2b9bcaa97..4de45612f 100644 --- a/mapadroid/mitm_receiver/MITMReceiver.py +++ b/mapadroid/mitm_receiver/MITMReceiver.py @@ -11,7 +11,6 @@ from flask import Flask, Response, request, send_file from gevent.pywsgi import WSGIServer -from mapadroid.mitm_receiver.MITMDataProcessor import MitmDataProcessor from mapadroid.mitm_receiver.MitmMapper import MitmMapper from mapadroid.utils import MappingManager from mapadroid.utils.authHelper import check_auth @@ -161,7 +160,8 @@ def __call__(self, *args, **kwargs): class MITMReceiver(Process): def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_manager: MappingManager, - db_wrapper, data_manager, storage_obj, name=None, enable_configmode: Optional[bool] = False): + db_wrapper, data_manager, storage_obj, data_queue: JoinableQueue, + name=None, enable_configmode: Optional[bool] = False): Process.__init__(self, name=name) self.__application_args = args_passed self.__mapping_manager = mapping_manager @@ -172,8 +172,7 @@ def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_man self.__hopper_mutex = RLock() self._db_wrapper = db_wrapper self.__storage_obj = storage_obj - self._data_queue: JoinableQueue = JoinableQueue() - self.worker_threads = [] + self._data_queue: JoinableQueue = data_queue self.app = Flask("MITMReceiver") self.add_endpoint(endpoint='/get_addresses/', endpoint_name='get_addresses/', handler=self.get_addresses, @@ -218,25 +217,12 @@ def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_man methods_passed=['GET']) self.add_endpoint(endpoint='/status/', endpoint_name='status/', handler=self.status, methods_passed=['GET']) - for i in range(self.__application_args.mitmreceiver_data_workers): - data_processor: MitmDataProcessor = MitmDataProcessor(self._data_queue, self.__application_args, - self.__mitm_mapper, db_wrapper, - name='MITMReceiver-%s' % str(i)) - data_processor.start() - self.worker_threads.append(data_processor) def shutdown(self): logger.info("MITMReceiver stop called...") - logger.info("Adding None to queue") if self._data_queue: for i in range(self.__application_args.mitmreceiver_data_workers): self._data_queue.put(None) - logger.info("Trying to join workers...") - for worker_thread in self.worker_threads: - worker_thread.terminate() - worker_thread.join() - self._data_queue.close() - logger.info("Workers stopped...") def run(self): httpsrv = WSGIServer((self.__listen_ip, int( @@ -318,9 +304,7 @@ def get_addresses(self, origin, data): def status(self, origin, data): origin_return: dict = {} - process_return: dict = {} data_return: dict = {} - process_count: int = 0 for origin in self.__mapping_manager.get_all_devicemappings().keys(): origin_return[origin] = {} origin_return[origin]['injection_status'] = self.__mitm_mapper.get_injection_status(origin) @@ -331,13 +315,7 @@ def status(self, origin, data): origin_return[origin][ 'last_possibly_moved'] = self.__mitm_mapper.get_last_timestamp_possible_moved(origin) - for process in self.worker_threads: - process_return['MITMReceiver-' + str(process_count)] = {} - process_return['MITMReceiver-' + str(process_count)]['queue_length'] = process.get_queue_items() - process_count += 1 - data_return['origin_status'] = origin_return - data_return['process_status'] = process_return return json.dumps(data_return) diff --git a/mapadroid/mitm_receiver/MitmDataProcessorManager.py b/mapadroid/mitm_receiver/MitmDataProcessorManager.py new file mode 100644 index 000000000..9a9c94f51 --- /dev/null +++ b/mapadroid/mitm_receiver/MitmDataProcessorManager.py @@ -0,0 +1,44 @@ +from multiprocessing import JoinableQueue + +from mapadroid.db.DbWrapper import DbWrapper +from mapadroid.mitm_receiver.MitmMapper import MitmMapper +from mapadroid.mitm_receiver.SerializedMitmDataProcessor import SerializedMitmDataProcessor +from mapadroid.utils.logging import get_logger, LoggerEnums + +logger = get_logger(LoggerEnums.mitm) + + +class MitmDataProcessorManager(): + def __init__(self, args, mitm_mapper: MitmMapper, db_wrapper: DbWrapper): + self._worker_threads = [] + self._args = args + self._mitm_data_queue: JoinableQueue = JoinableQueue() + self._mitm_mapper: MitmMapper = mitm_mapper + self._db_wrapper: DbWrapper = db_wrapper + + def get_queue(self): + return self._mitm_data_queue + + def launch_processors(self): + for i in range(self._args.mitmreceiver_data_workers): + data_processor: SerializedMitmDataProcessor = SerializedMitmDataProcessor( + self._mitm_data_queue, + self._args, + self._mitm_mapper, + self._db_wrapper, + name="SerialiedMitmDataProcessor-%s" % str(i)) + + data_processor.start() + self._worker_threads.append(data_processor) + + def shutdown(self): + logger.info("Stopping {} MITM data processors", len(self._worker_threads)) + for worker_thread in self._worker_threads: + worker_thread.terminate() + worker_thread.join() + logger.info("Stopped MITM datap rocessors") + + if self._mitm_data_queue is not None: + self._mitm_data_queue.close() + + diff --git a/mapadroid/mitm_receiver/MitmMapper.py b/mapadroid/mitm_receiver/MitmMapper.py index 3d6740b03..52829b346 100644 --- a/mapadroid/mitm_receiver/MitmMapper.py +++ b/mapadroid/mitm_receiver/MitmMapper.py @@ -4,24 +4,22 @@ from queue import Empty from threading import Thread, Event from typing import Dict + from mapadroid.db.DbStatsSubmit import DbStatsSubmit from mapadroid.mitm_receiver.PlayerStats import PlayerStats from mapadroid.utils.MappingManager import MappingManager from mapadroid.utils.collections import Location -from mapadroid.utils.walkerArgs import parse_args from mapadroid.utils.logging import get_logger, LoggerEnums, get_origin_logger logger = get_logger(LoggerEnums.mitm) -args = parse_args() - class MitmMapperManager(SyncManager): pass class MitmMapper(object): - def __init__(self, mapping_manager: MappingManager, db_stats_submit: DbStatsSubmit): + def __init__(self, args, mapping_manager: MappingManager, db_stats_submit: DbStatsSubmit): self.__mapping = {} self.__playerstats: Dict[str, PlayerStats] = {} self.__mapping_mutex = Lock() @@ -187,6 +185,11 @@ def get_injection_status(self, origin): return self.__injected.get(origin, False) def run_stats_collector(self, origin: str): + if not self.__application_args.game_stats: + pass + + origin_logger = get_origin_logger(logger, origin=origin) + origin_logger.debug2("Running stats collector") if self.__playerstats.get(origin, None) is not None: self.__playerstats.get(origin).stats_collector() diff --git a/mapadroid/mitm_receiver/MITMDataProcessor.py b/mapadroid/mitm_receiver/SerializedMitmDataProcessor.py similarity index 73% rename from mapadroid/mitm_receiver/MITMDataProcessor.py rename to mapadroid/mitm_receiver/SerializedMitmDataProcessor.py index 16e7aad10..567775bd9 100644 --- a/mapadroid/mitm_receiver/MITMDataProcessor.py +++ b/mapadroid/mitm_receiver/SerializedMitmDataProcessor.py @@ -5,11 +5,10 @@ from mapadroid.mitm_receiver.MitmMapper import MitmMapper from mapadroid.utils.logging import get_logger, LoggerEnums, get_origin_logger - logger = get_logger(LoggerEnums.mitm) -class MitmDataProcessor(Process): +class SerializedMitmDataProcessor(Process): def __init__(self, multi_proc_queue: Queue, application_args, mitm_mapper: MitmMapper, db_wrapper: DbWrapper, name=None): Process.__init__(self, name=name) @@ -19,59 +18,41 @@ def __init__(self, multi_proc_queue: Queue, application_args, mitm_mapper: MitmM self.__mitm_mapper: MitmMapper = mitm_mapper def run(self): - # build a private DbWrapper instance... - logger.info("Starting MITMDataProcessor") + logger.info("Starting serialized MITM data processor") while True: try: - item = self.__queue.get() - - try: - items_left = self.__queue.qsize() - except NotImplementedError: - items_left = 0 - - logger.debug("MITM data processing worker retrieved data. Queue length left afterwards: {}", items_left) + items_left = self.__queue.qsize() if items_left > 50: - logger.warning("MITM data processing workers are falling behind! Queue length: {}", items_left) + logger.warning("MITM data processors are falling behind! Queue length: {}", items_left) + else: + logger.debug("MITM data processor retrieved data. Queue length: {}", items_left) + item = self.__queue.get() if item is None: - logger.warning("Received none from queue of data") + logger.info("Received signal to stop MITM data processor") break self.process_data(item[0], item[1], item[2]) self.__queue.task_done() except KeyboardInterrupt: - logger.info("MITMDataProcessor received keyboard interrupt, stopping") + logger.info("Received keyboard interrupt, stopping MITM data processor") break - def get_queue_items(self): - try: - items_left = self.__queue.qsize() - except NotImplementedError: - items_left = 0 - return items_left - @logger.catch def process_data(self, received_timestamp, data, origin): origin_logger = get_origin_logger(logger, origin=origin) data_type = data.get("type", None) - raw = data.get("raw", False) origin_logger.debug2("Processing received data") processed_timestamp = datetime.fromtimestamp(received_timestamp) - if raw: - origin_logger.debug4("Received raw payload: {}", data["payload"]) - if data_type and not raw: - origin_logger.debug2("Running stats collector") - if self.__application_args.game_stats: - self.__mitm_mapper.run_stats_collector(origin) + if data_type and not data.get("raw", False): + self.__mitm_mapper.run_stats_collector(origin) origin_logger.debug4("Received data: {}", data) if data_type == 106: # process GetMapObject origin_logger.info("Processing GMO received. Received at {}", processed_timestamp) - if self.__application_args.weather: - self.__db_submit.weather(origin, data["payload"], received_timestamp) + self.__db_submit.weather(origin, data["payload"], received_timestamp) self.__db_submit.stops(origin, data["payload"]) self.__db_submit.gyms(origin, data["payload"]) diff --git a/start.py b/start.py index 54902e4e0..43d0b4832 100644 --- a/start.py +++ b/start.py @@ -5,7 +5,7 @@ print("MAD requires at least python 3.6! Your version: {}.{}" .format(py_version.major, py_version.minor)) sys.exit(1) -from multiprocessing import Process +from multiprocessing import JoinableQueue, Process from typing import Optional import calendar import datetime @@ -18,6 +18,7 @@ from mapadroid.utils.MappingManager import MappingManager, MappingManagerManager from mapadroid.db.DbFactory import DbFactory from mapadroid.mitm_receiver.MitmMapper import MitmMapper, MitmMapperManager +from mapadroid.mitm_receiver.MitmDataProcessorManager import MitmDataProcessorManager from mapadroid.mitm_receiver.MITMReceiver import MITMReceiver from mapadroid.utils.madGlobals import terminate_mad from mapadroid.utils.rarity import Rarity @@ -206,7 +207,7 @@ def check_dependencies(): data_manager, configmode=args.config_mode) if args.only_routes: - logger.info('Running in route recalculation mode. MAD will exit once complete') + logger.info('Running in route recalculation mode. MAD will exit once complete') recalc_in_progress = True while recalc_in_progress: time.sleep(5) @@ -221,14 +222,20 @@ def check_dependencies(): MitmMapperManager.register('MitmMapper', MitmMapper) mitm_mapper_manager = MitmMapperManager() mitm_mapper_manager.start() - mitm_mapper = mitm_mapper_manager.MitmMapper(mapping_manager, db_wrapper.stats_submit) + mitm_mapper = mitm_mapper_manager.MitmMapper(args, mapping_manager, db_wrapper.stats_submit) + logger.info('Starting PogoDroid Receiver server on port {}'.format(str(args.mitmreceiver_port))) + + mitm_data_processor_manager = MitmDataProcessorManager(args, mitm_mapper, db_wrapper) + mitm_data_processor_manager.launch_processors() + mitm_receiver_process = MITMReceiver(args.mitmreceiver_ip, int(args.mitmreceiver_port), mitm_mapper, args, mapping_manager, db_wrapper, - data_manager, - storage_elem, + data_manager, storage_elem, + mitm_data_processor_manager.get_queue(), enable_configmode=args.config_mode) mitm_receiver_process.start() + logger.info('Starting websocket server on port {}'.format(str(args.ws_port))) ws_server = WebsocketServer(args=args, mitm_mapper=mitm_mapper, @@ -338,11 +345,13 @@ def check_dependencies(): if mitm_receiver_process is not None: logger.info("Trying to stop receiver") mitm_receiver_process.shutdown() - logger.debug("MITM child threads successfully shutdown. Terminating parent thread") + logger.debug("MITM child threads successfully shutdown. Terminating parent thread") mitm_receiver_process.terminate() logger.debug("Trying to join MITMReceiver") mitm_receiver_process.join() logger.debug("MITMReceiver joined") + if mitm_data_processor_manager is not None: + mitm_data_processor_manager.shutdown() if device_updater is not None: device_updater.stop_updater() if t_whw is not None: From 736c6cf0865e0021bb51c3fa0f39d7780ed0d37c Mon Sep 17 00:00:00 2001 From: Sascha Ohms Date: Wed, 19 Aug 2020 18:05:22 +0200 Subject: [PATCH 2/5] allow overwriting the MITM data processor manager via plugins --- start.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/start.py b/start.py index 43d0b4832..362e9e9f0 100644 --- a/start.py +++ b/start.py @@ -231,7 +231,7 @@ def check_dependencies(): mitm_receiver_process = MITMReceiver(args.mitmreceiver_ip, int(args.mitmreceiver_port), mitm_mapper, args, mapping_manager, db_wrapper, - data_manager, storage_elem, + data_manager, storage_elem, mitm_data_processor_manager.get_queue(), enable_configmode=args.config_mode) mitm_receiver_process.start() @@ -283,6 +283,7 @@ def check_dependencies(): 'storage_elem': storage_elem, 'webhook_worker': webhook_worker, 'ws_server': ws_server, + 'mitm_data_processor_manager': mitm_data_processor_manager } mad_plugins = PluginCollection('plugins', plugin_parts) mad_plugins.apply_all_plugins_on_value() From a70df8a134dec12e11582bddb22ae670d46c6ee8 Mon Sep 17 00:00:00 2001 From: Sascha Ohms Date: Wed, 19 Aug 2020 18:17:10 +0200 Subject: [PATCH 3/5] fix syntax --- mapadroid/mitm_receiver/MitmDataProcessorManager.py | 2 -- mapadroid/mitm_receiver/MitmMapper.py | 2 +- start.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/mapadroid/mitm_receiver/MitmDataProcessorManager.py b/mapadroid/mitm_receiver/MitmDataProcessorManager.py index 9a9c94f51..0674bfa36 100644 --- a/mapadroid/mitm_receiver/MitmDataProcessorManager.py +++ b/mapadroid/mitm_receiver/MitmDataProcessorManager.py @@ -40,5 +40,3 @@ def shutdown(self): if self._mitm_data_queue is not None: self._mitm_data_queue.close() - - diff --git a/mapadroid/mitm_receiver/MitmMapper.py b/mapadroid/mitm_receiver/MitmMapper.py index 52829b346..cda3d2065 100644 --- a/mapadroid/mitm_receiver/MitmMapper.py +++ b/mapadroid/mitm_receiver/MitmMapper.py @@ -11,9 +11,9 @@ from mapadroid.utils.collections import Location from mapadroid.utils.logging import get_logger, LoggerEnums, get_origin_logger - logger = get_logger(LoggerEnums.mitm) + class MitmMapperManager(SyncManager): pass diff --git a/start.py b/start.py index 362e9e9f0..a90852029 100644 --- a/start.py +++ b/start.py @@ -5,7 +5,7 @@ print("MAD requires at least python 3.6! Your version: {}.{}" .format(py_version.major, py_version.minor)) sys.exit(1) -from multiprocessing import JoinableQueue, Process +from multiprocessing import Process from typing import Optional import calendar import datetime From e399af2a4d6570c6f3e2a585b1129e1c407488b8 Mon Sep 17 00:00:00 2001 From: Sascha Ohms Date: Sun, 23 Aug 2020 22:03:24 +0200 Subject: [PATCH 4/5] move mitm data queue check to seperate thread --- mapadroid/mitm_receiver/MITMReceiver.py | 11 ++++--- .../mitm_receiver/MitmDataProcessorManager.py | 31 +++++++++++++++++++ .../SerializedMitmDataProcessor.py | 6 ---- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/mapadroid/mitm_receiver/MITMReceiver.py b/mapadroid/mitm_receiver/MITMReceiver.py index 4de45612f..f8d5826b7 100644 --- a/mapadroid/mitm_receiver/MITMReceiver.py +++ b/mapadroid/mitm_receiver/MITMReceiver.py @@ -220,9 +220,8 @@ def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_man def shutdown(self): logger.info("MITMReceiver stop called...") - if self._data_queue: - for i in range(self.__application_args.mitmreceiver_data_workers): - self._data_queue.put(None) + for i in range(self.__application_args.mitmreceiver_data_workers): + self._add_to_queue(None) def run(self): httpsrv = WSGIServer((self.__listen_ip, int( @@ -275,7 +274,11 @@ def __handle_proto_data_dict(self, origin: str, data: dict) -> None: timestamp_received_receiver=time.time(), key=proto_type, values_dict=data, location=location_of_data) origin_logger.debug2("Placing data received to data_queue") - self._data_queue.put((timestamp, data, origin)) + self._add_to_queue((timestamp, data, origin)) + + def _add_to_queue(self, data): + if self._data_queue: + self._data_queue.put(data) def get_latest(self, origin, data): injected_settings = self.__mitm_mapper.request_latest( diff --git a/mapadroid/mitm_receiver/MitmDataProcessorManager.py b/mapadroid/mitm_receiver/MitmDataProcessorManager.py index 0674bfa36..3c6b0706b 100644 --- a/mapadroid/mitm_receiver/MitmDataProcessorManager.py +++ b/mapadroid/mitm_receiver/MitmDataProcessorManager.py @@ -1,3 +1,5 @@ +import time +import threading from multiprocessing import JoinableQueue from mapadroid.db.DbWrapper import DbWrapper @@ -15,10 +17,35 @@ def __init__(self, args, mitm_mapper: MitmMapper, db_wrapper: DbWrapper): self._mitm_data_queue: JoinableQueue = JoinableQueue() self._mitm_mapper: MitmMapper = mitm_mapper self._db_wrapper: DbWrapper = db_wrapper + self._queue_check_thread = None + self._stop_queue_check_thread = False + + self._queue_check_thread = threading.Thread(target=self._queue_size_check, args=()) + self._queue_check_thread.daemon = True + self._queue_check_thread.start() def get_queue(self): return self._mitm_data_queue + def get_queue_size(self): + # for whatever reason, there's no actual implementation of qsize() + # on MacOS. There are better solutions for this but c'mon, who is + # running MAD on MacOS anyway? + try: + item_count = self._mitm_data_queue.qsize() + except NotImplementedError: + item_count = 0 + + return item_count + + def _queue_size_check(self): + while not self._stop_queue_check_thread: + item_count = self.get_queue_size() + if item_count > 50: + logger.warning("MITM data processing workers are falling behind! Queue length: {}", item_count) + + time.sleep(3) + def launch_processors(self): for i in range(self._args.mitmreceiver_data_workers): data_processor: SerializedMitmDataProcessor = SerializedMitmDataProcessor( @@ -32,6 +59,10 @@ def launch_processors(self): self._worker_threads.append(data_processor) def shutdown(self): + self._stop_queue_check_thread = True + if self._queue_check_thread is not None: + self._queue_check_thread.terminate() + logger.info("Stopping {} MITM data processors", len(self._worker_threads)) for worker_thread in self._worker_threads: worker_thread.terminate() diff --git a/mapadroid/mitm_receiver/SerializedMitmDataProcessor.py b/mapadroid/mitm_receiver/SerializedMitmDataProcessor.py index 567775bd9..23f8cd3d1 100644 --- a/mapadroid/mitm_receiver/SerializedMitmDataProcessor.py +++ b/mapadroid/mitm_receiver/SerializedMitmDataProcessor.py @@ -21,12 +21,6 @@ def run(self): logger.info("Starting serialized MITM data processor") while True: try: - items_left = self.__queue.qsize() - if items_left > 50: - logger.warning("MITM data processors are falling behind! Queue length: {}", items_left) - else: - logger.debug("MITM data processor retrieved data. Queue length: {}", items_left) - item = self.__queue.get() if item is None: logger.info("Received signal to stop MITM data processor") From 9c168c5916bdab6dd4a1908b0e5c17ae8e31f3d6 Mon Sep 17 00:00:00 2001 From: Sascha Ohms Date: Mon, 24 Aug 2020 19:57:08 +0200 Subject: [PATCH 5/5] properly stop queue check thread --- mapadroid/mitm_receiver/MitmDataProcessorManager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/mapadroid/mitm_receiver/MitmDataProcessorManager.py b/mapadroid/mitm_receiver/MitmDataProcessorManager.py index 3c6b0706b..15a636e6b 100644 --- a/mapadroid/mitm_receiver/MitmDataProcessorManager.py +++ b/mapadroid/mitm_receiver/MitmDataProcessorManager.py @@ -60,8 +60,6 @@ def launch_processors(self): def shutdown(self): self._stop_queue_check_thread = True - if self._queue_check_thread is not None: - self._queue_check_thread.terminate() logger.info("Stopping {} MITM data processors", len(self._worker_threads)) for worker_thread in self._worker_threads: