diff --git a/mapadroid/mitm_receiver/MITMReceiver.py b/mapadroid/mitm_receiver/MITMReceiver.py index 8d591b6a8..73d60c2c8 100644 --- a/mapadroid/mitm_receiver/MITMReceiver.py +++ b/mapadroid/mitm_receiver/MITMReceiver.py @@ -11,7 +11,6 @@ from flask import Flask, Response, request, send_file from gevent.pywsgi import WSGIServer -from mapadroid.mitm_receiver.MITMDataProcessor import MitmDataProcessor from mapadroid.mitm_receiver.MitmMapper import MitmMapper from mapadroid.utils import MappingManager from mapadroid.utils.authHelper import check_auth @@ -161,7 +160,8 @@ def __call__(self, *args, **kwargs): class MITMReceiver(Process): def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_manager: MappingManager, - db_wrapper, data_manager, storage_obj, name=None, enable_configmode: Optional[bool] = False): + db_wrapper, data_manager, storage_obj, data_queue: JoinableQueue, + name=None, enable_configmode: Optional[bool] = False): Process.__init__(self, name=name) self.__application_args = args_passed self.__mapping_manager = mapping_manager @@ -172,8 +172,7 @@ def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_man self.__hopper_mutex = RLock() self._db_wrapper = db_wrapper self.__storage_obj = storage_obj - self._data_queue: JoinableQueue = JoinableQueue() - self.worker_threads = [] + self._data_queue: JoinableQueue = data_queue self.app = Flask("MITMReceiver") self.add_endpoint(endpoint='/get_addresses/', endpoint_name='get_addresses/', handler=self.get_addresses, @@ -218,27 +217,13 @@ def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_man methods_passed=['GET']) self.add_endpoint(endpoint='/status/', endpoint_name='status/', handler=self.status, methods_passed=['GET']) - for i in range(self.__application_args.mitmreceiver_data_workers): - data_processor: MitmDataProcessor = MitmDataProcessor(self._data_queue, self.__application_args, - self.__mitm_mapper, db_wrapper, - name='MITMReceiver-%s' % str(i)) - data_processor.start() - self.worker_threads.append(data_processor) self.__mitmreceiver_startup_time: float = time.time() def shutdown(self): logger.info("MITMReceiver stop called...") - logger.info("Adding None to queue") - if self._data_queue: - for i in range(self.__application_args.mitmreceiver_data_workers): - self._data_queue.put(None) - logger.info("Trying to join workers...") - for worker_thread in self.worker_threads: - worker_thread.terminate() - worker_thread.join() - self._data_queue.close() - logger.info("Workers stopped...") + for i in range(self.__application_args.mitmreceiver_data_workers): + self._add_to_queue(None) def run(self): httpsrv = WSGIServer((self.__listen_ip, int( @@ -298,7 +283,11 @@ def __handle_proto_data_dict(self, origin: str, data: dict) -> None: timestamp_received_receiver=time.time(), key=proto_type, values_dict=data, location=location_of_data) origin_logger.debug2("Placing data received to data_queue") - self._data_queue.put((timestamp, data, origin)) + self._add_to_queue((timestamp, data, origin)) + + def _add_to_queue(self, data): + if self._data_queue: + self._data_queue.put(data) def get_latest(self, origin, data): injected_settings = self.__mitm_mapper.request_latest( @@ -327,9 +316,7 @@ def get_addresses(self, origin, data): def status(self, origin, data): origin_return: dict = {} - process_return: dict = {} data_return: dict = {} - process_count: int = 0 for origin in self.__mapping_manager.get_all_devicemappings().keys(): origin_return[origin] = {} origin_return[origin]['injection_status'] = self.__mitm_mapper.get_injection_status(origin) @@ -340,13 +327,7 @@ def status(self, origin, data): origin_return[origin][ 'last_possibly_moved'] = self.__mitm_mapper.get_last_timestamp_possible_moved(origin) - for process in self.worker_threads: - process_return['MITMReceiver-' + str(process_count)] = {} - process_return['MITMReceiver-' + str(process_count)]['queue_length'] = process.get_queue_items() - process_count += 1 - data_return['origin_status'] = origin_return - data_return['process_status'] = process_return return json.dumps(data_return) diff --git a/mapadroid/mitm_receiver/MitmDataProcessorManager.py b/mapadroid/mitm_receiver/MitmDataProcessorManager.py new file mode 100644 index 000000000..15a636e6b --- /dev/null +++ b/mapadroid/mitm_receiver/MitmDataProcessorManager.py @@ -0,0 +1,71 @@ +import time +import threading +from multiprocessing import JoinableQueue + +from mapadroid.db.DbWrapper import DbWrapper +from mapadroid.mitm_receiver.MitmMapper import MitmMapper +from mapadroid.mitm_receiver.SerializedMitmDataProcessor import SerializedMitmDataProcessor +from mapadroid.utils.logging import get_logger, LoggerEnums + +logger = get_logger(LoggerEnums.mitm) + + +class MitmDataProcessorManager(): + def __init__(self, args, mitm_mapper: MitmMapper, db_wrapper: DbWrapper): + self._worker_threads = [] + self._args = args + self._mitm_data_queue: JoinableQueue = JoinableQueue() + self._mitm_mapper: MitmMapper = mitm_mapper + self._db_wrapper: DbWrapper = db_wrapper + self._queue_check_thread = None + self._stop_queue_check_thread = False + + self._queue_check_thread = threading.Thread(target=self._queue_size_check, args=()) + self._queue_check_thread.daemon = True + self._queue_check_thread.start() + + def get_queue(self): + return self._mitm_data_queue + + def get_queue_size(self): + # for whatever reason, there's no actual implementation of qsize() + # on MacOS. There are better solutions for this but c'mon, who is + # running MAD on MacOS anyway? + try: + item_count = self._mitm_data_queue.qsize() + except NotImplementedError: + item_count = 0 + + return item_count + + def _queue_size_check(self): + while not self._stop_queue_check_thread: + item_count = self.get_queue_size() + if item_count > 50: + logger.warning("MITM data processing workers are falling behind! Queue length: {}", item_count) + + time.sleep(3) + + def launch_processors(self): + for i in range(self._args.mitmreceiver_data_workers): + data_processor: SerializedMitmDataProcessor = SerializedMitmDataProcessor( + self._mitm_data_queue, + self._args, + self._mitm_mapper, + self._db_wrapper, + name="SerialiedMitmDataProcessor-%s" % str(i)) + + data_processor.start() + self._worker_threads.append(data_processor) + + def shutdown(self): + self._stop_queue_check_thread = True + + logger.info("Stopping {} MITM data processors", len(self._worker_threads)) + for worker_thread in self._worker_threads: + worker_thread.terminate() + worker_thread.join() + logger.info("Stopped MITM datap rocessors") + + if self._mitm_data_queue is not None: + self._mitm_data_queue.close() diff --git a/mapadroid/mitm_receiver/MitmMapper.py b/mapadroid/mitm_receiver/MitmMapper.py index 3d6740b03..cda3d2065 100644 --- a/mapadroid/mitm_receiver/MitmMapper.py +++ b/mapadroid/mitm_receiver/MitmMapper.py @@ -4,16 +4,14 @@ from queue import Empty from threading import Thread, Event from typing import Dict + from mapadroid.db.DbStatsSubmit import DbStatsSubmit from mapadroid.mitm_receiver.PlayerStats import PlayerStats from mapadroid.utils.MappingManager import MappingManager from mapadroid.utils.collections import Location -from mapadroid.utils.walkerArgs import parse_args from mapadroid.utils.logging import get_logger, LoggerEnums, get_origin_logger - logger = get_logger(LoggerEnums.mitm) -args = parse_args() class MitmMapperManager(SyncManager): @@ -21,7 +19,7 @@ class MitmMapperManager(SyncManager): class MitmMapper(object): - def __init__(self, mapping_manager: MappingManager, db_stats_submit: DbStatsSubmit): + def __init__(self, args, mapping_manager: MappingManager, db_stats_submit: DbStatsSubmit): self.__mapping = {} self.__playerstats: Dict[str, PlayerStats] = {} self.__mapping_mutex = Lock() @@ -187,6 +185,11 @@ def get_injection_status(self, origin): return self.__injected.get(origin, False) def run_stats_collector(self, origin: str): + if not self.__application_args.game_stats: + pass + + origin_logger = get_origin_logger(logger, origin=origin) + origin_logger.debug2("Running stats collector") if self.__playerstats.get(origin, None) is not None: self.__playerstats.get(origin).stats_collector() diff --git a/mapadroid/mitm_receiver/MITMDataProcessor.py b/mapadroid/mitm_receiver/SerializedMitmDataProcessor.py similarity index 72% rename from mapadroid/mitm_receiver/MITMDataProcessor.py rename to mapadroid/mitm_receiver/SerializedMitmDataProcessor.py index 16e7aad10..23f8cd3d1 100644 --- a/mapadroid/mitm_receiver/MITMDataProcessor.py +++ b/mapadroid/mitm_receiver/SerializedMitmDataProcessor.py @@ -5,11 +5,10 @@ from mapadroid.mitm_receiver.MitmMapper import MitmMapper from mapadroid.utils.logging import get_logger, LoggerEnums, get_origin_logger - logger = get_logger(LoggerEnums.mitm) -class MitmDataProcessor(Process): +class SerializedMitmDataProcessor(Process): def __init__(self, multi_proc_queue: Queue, application_args, mitm_mapper: MitmMapper, db_wrapper: DbWrapper, name=None): Process.__init__(self, name=name) @@ -19,59 +18,35 @@ def __init__(self, multi_proc_queue: Queue, application_args, mitm_mapper: MitmM self.__mitm_mapper: MitmMapper = mitm_mapper def run(self): - # build a private DbWrapper instance... - logger.info("Starting MITMDataProcessor") + logger.info("Starting serialized MITM data processor") while True: try: item = self.__queue.get() - - try: - items_left = self.__queue.qsize() - except NotImplementedError: - items_left = 0 - - logger.debug("MITM data processing worker retrieved data. Queue length left afterwards: {}", items_left) - if items_left > 50: - logger.warning("MITM data processing workers are falling behind! Queue length: {}", items_left) - if item is None: - logger.warning("Received none from queue of data") + logger.info("Received signal to stop MITM data processor") break self.process_data(item[0], item[1], item[2]) self.__queue.task_done() except KeyboardInterrupt: - logger.info("MITMDataProcessor received keyboard interrupt, stopping") + logger.info("Received keyboard interrupt, stopping MITM data processor") break - def get_queue_items(self): - try: - items_left = self.__queue.qsize() - except NotImplementedError: - items_left = 0 - return items_left - @logger.catch def process_data(self, received_timestamp, data, origin): origin_logger = get_origin_logger(logger, origin=origin) data_type = data.get("type", None) - raw = data.get("raw", False) origin_logger.debug2("Processing received data") processed_timestamp = datetime.fromtimestamp(received_timestamp) - if raw: - origin_logger.debug4("Received raw payload: {}", data["payload"]) - if data_type and not raw: - origin_logger.debug2("Running stats collector") - if self.__application_args.game_stats: - self.__mitm_mapper.run_stats_collector(origin) + if data_type and not data.get("raw", False): + self.__mitm_mapper.run_stats_collector(origin) origin_logger.debug4("Received data: {}", data) if data_type == 106: # process GetMapObject origin_logger.info("Processing GMO received. Received at {}", processed_timestamp) - if self.__application_args.weather: - self.__db_submit.weather(origin, data["payload"], received_timestamp) + self.__db_submit.weather(origin, data["payload"], received_timestamp) self.__db_submit.stops(origin, data["payload"]) self.__db_submit.gyms(origin, data["payload"]) diff --git a/start.py b/start.py index 54902e4e0..a90852029 100644 --- a/start.py +++ b/start.py @@ -18,6 +18,7 @@ from mapadroid.utils.MappingManager import MappingManager, MappingManagerManager from mapadroid.db.DbFactory import DbFactory from mapadroid.mitm_receiver.MitmMapper import MitmMapper, MitmMapperManager +from mapadroid.mitm_receiver.MitmDataProcessorManager import MitmDataProcessorManager from mapadroid.mitm_receiver.MITMReceiver import MITMReceiver from mapadroid.utils.madGlobals import terminate_mad from mapadroid.utils.rarity import Rarity @@ -206,7 +207,7 @@ def check_dependencies(): data_manager, configmode=args.config_mode) if args.only_routes: - logger.info('Running in route recalculation mode. MAD will exit once complete') + logger.info('Running in route recalculation mode. MAD will exit once complete') recalc_in_progress = True while recalc_in_progress: time.sleep(5) @@ -221,14 +222,20 @@ def check_dependencies(): MitmMapperManager.register('MitmMapper', MitmMapper) mitm_mapper_manager = MitmMapperManager() mitm_mapper_manager.start() - mitm_mapper = mitm_mapper_manager.MitmMapper(mapping_manager, db_wrapper.stats_submit) + mitm_mapper = mitm_mapper_manager.MitmMapper(args, mapping_manager, db_wrapper.stats_submit) + logger.info('Starting PogoDroid Receiver server on port {}'.format(str(args.mitmreceiver_port))) + + mitm_data_processor_manager = MitmDataProcessorManager(args, mitm_mapper, db_wrapper) + mitm_data_processor_manager.launch_processors() + mitm_receiver_process = MITMReceiver(args.mitmreceiver_ip, int(args.mitmreceiver_port), mitm_mapper, args, mapping_manager, db_wrapper, - data_manager, - storage_elem, + data_manager, storage_elem, + mitm_data_processor_manager.get_queue(), enable_configmode=args.config_mode) mitm_receiver_process.start() + logger.info('Starting websocket server on port {}'.format(str(args.ws_port))) ws_server = WebsocketServer(args=args, mitm_mapper=mitm_mapper, @@ -276,6 +283,7 @@ def check_dependencies(): 'storage_elem': storage_elem, 'webhook_worker': webhook_worker, 'ws_server': ws_server, + 'mitm_data_processor_manager': mitm_data_processor_manager } mad_plugins = PluginCollection('plugins', plugin_parts) mad_plugins.apply_all_plugins_on_value() @@ -338,11 +346,13 @@ def check_dependencies(): if mitm_receiver_process is not None: logger.info("Trying to stop receiver") mitm_receiver_process.shutdown() - logger.debug("MITM child threads successfully shutdown. Terminating parent thread") + logger.debug("MITM child threads successfully shutdown. Terminating parent thread") mitm_receiver_process.terminate() logger.debug("Trying to join MITMReceiver") mitm_receiver_process.join() logger.debug("MITMReceiver joined") + if mitm_data_processor_manager is not None: + mitm_data_processor_manager.shutdown() if device_updater is not None: device_updater.stop_updater() if t_whw is not None: