Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor data processing for easier implementation of raw data processing #960

Merged
merged 7 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 10 additions & 29 deletions mapadroid/mitm_receiver/MITMReceiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from flask import Flask, Response, request, send_file
from gevent.pywsgi import WSGIServer

from mapadroid.mitm_receiver.MITMDataProcessor import MitmDataProcessor
from mapadroid.mitm_receiver.MitmMapper import MitmMapper
from mapadroid.utils import MappingManager
from mapadroid.utils.authHelper import check_auth
Expand Down Expand Up @@ -161,7 +160,8 @@ def __call__(self, *args, **kwargs):

class MITMReceiver(Process):
def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_manager: MappingManager,
db_wrapper, data_manager, storage_obj, name=None, enable_configmode: Optional[bool] = False):
db_wrapper, data_manager, storage_obj, data_queue: JoinableQueue,
name=None, enable_configmode: Optional[bool] = False):
Process.__init__(self, name=name)
self.__application_args = args_passed
self.__mapping_manager = mapping_manager
Expand All @@ -172,8 +172,7 @@ def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_man
self.__hopper_mutex = RLock()
self._db_wrapper = db_wrapper
self.__storage_obj = storage_obj
self._data_queue: JoinableQueue = JoinableQueue()
self.worker_threads = []
self._data_queue: JoinableQueue = data_queue
self.app = Flask("MITMReceiver")
self.add_endpoint(endpoint='/get_addresses/', endpoint_name='get_addresses/',
handler=self.get_addresses,
Expand Down Expand Up @@ -218,27 +217,13 @@ def __init__(self, listen_ip, listen_port, mitm_mapper, args_passed, mapping_man
methods_passed=['GET'])
self.add_endpoint(endpoint='/status/', endpoint_name='status/', handler=self.status,
methods_passed=['GET'])
for i in range(self.__application_args.mitmreceiver_data_workers):
data_processor: MitmDataProcessor = MitmDataProcessor(self._data_queue, self.__application_args,
self.__mitm_mapper, db_wrapper,
name='MITMReceiver-%s' % str(i))
data_processor.start()
self.worker_threads.append(data_processor)

self.__mitmreceiver_startup_time: float = time.time()

def shutdown(self):
logger.info("MITMReceiver stop called...")
logger.info("Adding None to queue")
if self._data_queue:
for i in range(self.__application_args.mitmreceiver_data_workers):
self._data_queue.put(None)
logger.info("Trying to join workers...")
for worker_thread in self.worker_threads:
worker_thread.terminate()
worker_thread.join()
self._data_queue.close()
logger.info("Workers stopped...")
for i in range(self.__application_args.mitmreceiver_data_workers):
self._add_to_queue(None)

def run(self):
httpsrv = WSGIServer((self.__listen_ip, int(
Expand Down Expand Up @@ -298,7 +283,11 @@ def __handle_proto_data_dict(self, origin: str, data: dict) -> None:
timestamp_received_receiver=time.time(), key=proto_type, values_dict=data,
location=location_of_data)
origin_logger.debug2("Placing data received to data_queue")
self._data_queue.put((timestamp, data, origin))
self._add_to_queue((timestamp, data, origin))

def _add_to_queue(self, data):
if self._data_queue:
self._data_queue.put(data)

def get_latest(self, origin, data):
injected_settings = self.__mitm_mapper.request_latest(
Expand Down Expand Up @@ -327,9 +316,7 @@ def get_addresses(self, origin, data):

def status(self, origin, data):
origin_return: dict = {}
process_return: dict = {}
data_return: dict = {}
process_count: int = 0
for origin in self.__mapping_manager.get_all_devicemappings().keys():
origin_return[origin] = {}
origin_return[origin]['injection_status'] = self.__mitm_mapper.get_injection_status(origin)
Expand All @@ -340,13 +327,7 @@ def status(self, origin, data):
origin_return[origin][
'last_possibly_moved'] = self.__mitm_mapper.get_last_timestamp_possible_moved(origin)

for process in self.worker_threads:
process_return['MITMReceiver-' + str(process_count)] = {}
process_return['MITMReceiver-' + str(process_count)]['queue_length'] = process.get_queue_items()
process_count += 1

data_return['origin_status'] = origin_return
data_return['process_status'] = process_return

return json.dumps(data_return)

Expand Down
71 changes: 71 additions & 0 deletions mapadroid/mitm_receiver/MitmDataProcessorManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import time
import threading
from multiprocessing import JoinableQueue

from mapadroid.db.DbWrapper import DbWrapper
from mapadroid.mitm_receiver.MitmMapper import MitmMapper
from mapadroid.mitm_receiver.SerializedMitmDataProcessor import SerializedMitmDataProcessor
from mapadroid.utils.logging import get_logger, LoggerEnums

logger = get_logger(LoggerEnums.mitm)


class MitmDataProcessorManager():
def __init__(self, args, mitm_mapper: MitmMapper, db_wrapper: DbWrapper):
self._worker_threads = []
self._args = args
self._mitm_data_queue: JoinableQueue = JoinableQueue()
self._mitm_mapper: MitmMapper = mitm_mapper
self._db_wrapper: DbWrapper = db_wrapper
self._queue_check_thread = None
self._stop_queue_check_thread = False

self._queue_check_thread = threading.Thread(target=self._queue_size_check, args=())
self._queue_check_thread.daemon = True
self._queue_check_thread.start()

def get_queue(self):
return self._mitm_data_queue

def get_queue_size(self):
# for whatever reason, there's no actual implementation of qsize()
# on MacOS. There are better solutions for this but c'mon, who is
# running MAD on MacOS anyway?
try:
item_count = self._mitm_data_queue.qsize()
except NotImplementedError:
item_count = 0

return item_count

def _queue_size_check(self):
while not self._stop_queue_check_thread:
item_count = self.get_queue_size()
if item_count > 50:
logger.warning("MITM data processing workers are falling behind! Queue length: {}", item_count)

time.sleep(3)

def launch_processors(self):
for i in range(self._args.mitmreceiver_data_workers):
data_processor: SerializedMitmDataProcessor = SerializedMitmDataProcessor(
self._mitm_data_queue,
self._args,
self._mitm_mapper,
self._db_wrapper,
name="SerialiedMitmDataProcessor-%s" % str(i))

data_processor.start()
self._worker_threads.append(data_processor)

def shutdown(self):
self._stop_queue_check_thread = True

logger.info("Stopping {} MITM data processors", len(self._worker_threads))
for worker_thread in self._worker_threads:
worker_thread.terminate()
worker_thread.join()
logger.info("Stopped MITM datap rocessors")

if self._mitm_data_queue is not None:
self._mitm_data_queue.close()
11 changes: 7 additions & 4 deletions mapadroid/mitm_receiver/MitmMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,22 @@
from queue import Empty
from threading import Thread, Event
from typing import Dict

from mapadroid.db.DbStatsSubmit import DbStatsSubmit
from mapadroid.mitm_receiver.PlayerStats import PlayerStats
from mapadroid.utils.MappingManager import MappingManager
from mapadroid.utils.collections import Location
from mapadroid.utils.walkerArgs import parse_args
from mapadroid.utils.logging import get_logger, LoggerEnums, get_origin_logger


logger = get_logger(LoggerEnums.mitm)
args = parse_args()


class MitmMapperManager(SyncManager):
pass


class MitmMapper(object):
def __init__(self, mapping_manager: MappingManager, db_stats_submit: DbStatsSubmit):
def __init__(self, args, mapping_manager: MappingManager, db_stats_submit: DbStatsSubmit):
self.__mapping = {}
self.__playerstats: Dict[str, PlayerStats] = {}
self.__mapping_mutex = Lock()
Expand Down Expand Up @@ -187,6 +185,11 @@ def get_injection_status(self, origin):
return self.__injected.get(origin, False)

def run_stats_collector(self, origin: str):
if not self.__application_args.game_stats:
pass

origin_logger = get_origin_logger(logger, origin=origin)
origin_logger.debug2("Running stats collector")
if self.__playerstats.get(origin, None) is not None:
self.__playerstats.get(origin).stats_collector()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from mapadroid.mitm_receiver.MitmMapper import MitmMapper
from mapadroid.utils.logging import get_logger, LoggerEnums, get_origin_logger


logger = get_logger(LoggerEnums.mitm)


class MitmDataProcessor(Process):
class SerializedMitmDataProcessor(Process):
def __init__(self, multi_proc_queue: Queue, application_args, mitm_mapper: MitmMapper,
db_wrapper: DbWrapper, name=None):
Process.__init__(self, name=name)
Expand All @@ -19,59 +18,35 @@ def __init__(self, multi_proc_queue: Queue, application_args, mitm_mapper: MitmM
self.__mitm_mapper: MitmMapper = mitm_mapper

def run(self):
# build a private DbWrapper instance...
logger.info("Starting MITMDataProcessor")
logger.info("Starting serialized MITM data processor")
while True:
try:
item = self.__queue.get()

try:
items_left = self.__queue.qsize()
except NotImplementedError:
items_left = 0

logger.debug("MITM data processing worker retrieved data. Queue length left afterwards: {}", items_left)
if items_left > 50:
logger.warning("MITM data processing workers are falling behind! Queue length: {}", items_left)

if item is None:
logger.warning("Received none from queue of data")
logger.info("Received signal to stop MITM data processor")
break
self.process_data(item[0], item[1], item[2])
self.__queue.task_done()
except KeyboardInterrupt:
logger.info("MITMDataProcessor received keyboard interrupt, stopping")
logger.info("Received keyboard interrupt, stopping MITM data processor")
break

def get_queue_items(self):
try:
items_left = self.__queue.qsize()
except NotImplementedError:
items_left = 0
return items_left

@logger.catch
def process_data(self, received_timestamp, data, origin):
origin_logger = get_origin_logger(logger, origin=origin)
data_type = data.get("type", None)
raw = data.get("raw", False)
origin_logger.debug2("Processing received data")
processed_timestamp = datetime.fromtimestamp(received_timestamp)
if raw:
origin_logger.debug4("Received raw payload: {}", data["payload"])

if data_type and not raw:
origin_logger.debug2("Running stats collector")
if self.__application_args.game_stats:
self.__mitm_mapper.run_stats_collector(origin)
if data_type and not data.get("raw", False):
self.__mitm_mapper.run_stats_collector(origin)

origin_logger.debug4("Received data: {}", data)
if data_type == 106:
# process GetMapObject
origin_logger.info("Processing GMO received. Received at {}", processed_timestamp)

if self.__application_args.weather:
self.__db_submit.weather(origin, data["payload"], received_timestamp)
self.__db_submit.weather(origin, data["payload"], received_timestamp)

self.__db_submit.stops(origin, data["payload"])
self.__db_submit.gyms(origin, data["payload"])
Expand Down
20 changes: 15 additions & 5 deletions start.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from mapadroid.utils.MappingManager import MappingManager, MappingManagerManager
from mapadroid.db.DbFactory import DbFactory
from mapadroid.mitm_receiver.MitmMapper import MitmMapper, MitmMapperManager
from mapadroid.mitm_receiver.MitmDataProcessorManager import MitmDataProcessorManager
from mapadroid.mitm_receiver.MITMReceiver import MITMReceiver
from mapadroid.utils.madGlobals import terminate_mad
from mapadroid.utils.rarity import Rarity
Expand Down Expand Up @@ -206,7 +207,7 @@ def check_dependencies():
data_manager,
configmode=args.config_mode)
if args.only_routes:
logger.info('Running in route recalculation mode. MAD will exit once complete')
logger.info('Running in route recalculation mode. MAD will exit once complete')
recalc_in_progress = True
while recalc_in_progress:
time.sleep(5)
Expand All @@ -221,14 +222,20 @@ def check_dependencies():
MitmMapperManager.register('MitmMapper', MitmMapper)
mitm_mapper_manager = MitmMapperManager()
mitm_mapper_manager.start()
mitm_mapper = mitm_mapper_manager.MitmMapper(mapping_manager, db_wrapper.stats_submit)
mitm_mapper = mitm_mapper_manager.MitmMapper(args, mapping_manager, db_wrapper.stats_submit)

logger.info('Starting PogoDroid Receiver server on port {}'.format(str(args.mitmreceiver_port)))

mitm_data_processor_manager = MitmDataProcessorManager(args, mitm_mapper, db_wrapper)
mitm_data_processor_manager.launch_processors()

mitm_receiver_process = MITMReceiver(args.mitmreceiver_ip, int(args.mitmreceiver_port),
mitm_mapper, args, mapping_manager, db_wrapper,
data_manager,
storage_elem,
data_manager, storage_elem,
mitm_data_processor_manager.get_queue(),
enable_configmode=args.config_mode)
mitm_receiver_process.start()

logger.info('Starting websocket server on port {}'.format(str(args.ws_port)))
ws_server = WebsocketServer(args=args,
mitm_mapper=mitm_mapper,
Expand Down Expand Up @@ -276,6 +283,7 @@ def check_dependencies():
'storage_elem': storage_elem,
'webhook_worker': webhook_worker,
'ws_server': ws_server,
'mitm_data_processor_manager': mitm_data_processor_manager
}
mad_plugins = PluginCollection('plugins', plugin_parts)
mad_plugins.apply_all_plugins_on_value()
Expand Down Expand Up @@ -338,11 +346,13 @@ def check_dependencies():
if mitm_receiver_process is not None:
logger.info("Trying to stop receiver")
mitm_receiver_process.shutdown()
logger.debug("MITM child threads successfully shutdown. Terminating parent thread")
logger.debug("MITM child threads successfully shutdown. Terminating parent thread")
mitm_receiver_process.terminate()
logger.debug("Trying to join MITMReceiver")
mitm_receiver_process.join()
logger.debug("MITMReceiver joined")
if mitm_data_processor_manager is not None:
mitm_data_processor_manager.shutdown()
if device_updater is not None:
device_updater.stop_updater()
if t_whw is not None:
Expand Down