diff --git a/relay.cfg b/relay.cfg index 271ecd8..9194710 100644 --- a/relay.cfg +++ b/relay.cfg @@ -20,7 +20,15 @@ hostname = localhost # UDP port on which the relay server is listening for messages target_port = 30000 +# Timeout for messages received from the plane. If no messages are received +# during this timespan the host ip and port of the plane are reset. [s] +timeout = 600 + [iridium] +# Timeout for messages received from the plane. If no messages are received +# during this timespan the MQTT queue is cleared. [s] +timeout = 600 + # Address of the destination Rock7 HTTP gateway: # for simulator url = http://localhost:45678 diff --git a/relay.py b/relay.py index b61e9ec..b7c6695 100755 --- a/relay.py +++ b/relay.py @@ -16,29 +16,49 @@ LOGGER = logging.getLogger(__name__) class LteInterface(): - def __init__(self, rx_port): + def __init__(self, rx_port, timeout): self.__sock = None self.__rx_port = rx_port self.__host_ip = None + self.__tx_port = None self.__message_counter = 0 self.__bytes_counter = 0 - self.__last_time = time.clock() + self.__last_time = time.time() + self.__receive_time = time.time() + self.__timeout = timeout + self.__timeout_scheduler = None + self.__lock = Lock() self.on_message_callback = None + def __on_receive_timeout(self): + self.__lock.acquire() + if (time.time() - self.__receive_time > self.__timeout): + if self.__host_ip: + self.__host_ip = None + self.__tx_port = None + self.__message_counter = 0 + self.__bytes_counter = 0 + LOGGER.warn('No LTE message received for {0} seconds, resetting host ip.'.format(time.time() - self.__receive_time)) + self.__last_time = time.time() + self.__lock.release() + def on_receive(self, fd, events): (data, source_ip_port) = self.__sock.recvfrom(4096) + self.__lock.acquire() + self.__receive_time = time.time() self.__message_counter += 1 self.__bytes_counter += sys.getsizeof(data) - 37 if (self.__message_counter % 1000 == 0): - LOGGER.warn('Received LTE data #{0}, rate: {1} kB/s'.format(self.__message_counter, self.__bytes_counter / (1000.0 * (time.clock() - self.__last_time)))) - self.__last_time = time.clock() + LOGGER.warn('Received LTE data #{0}, rate: {1} kB/s'.format(self.__message_counter, self.__bytes_counter / (1000.0 * (time.time() - self.__last_time)))) + self.__last_time = time.time() self.__bytes_counter = 0 else: LOGGER.info('Received LTE data #%d', self.__message_counter) self.__host_ip = source_ip_port[0] self.__tx_port = source_ip_port[1] + self.__lock.release() self.on_message_callback(data) def send(self, data): @@ -54,9 +74,12 @@ def open(self): self.__sock.setblocking(False) tornado.ioloop.IOLoop.current().add_handler(self.__sock.fileno(), self.on_receive, tornado.ioloop.IOLoop.READ) self.__sock.bind(('', self.__rx_port)) # all available interfaces + self.__timeout_scheduler = tornado.ioloop.PeriodicCallback(self.__on_receive_timeout, 1000) + self.__timeout_scheduler.start() def close(self): LOGGER.warn('Closing UDP port') + self.__timeout_scheduler.stop() tornado.ioloop.IOLoop.current().remove_handler(self.__sock.fileno()) self.__sock.close() self.__sock = None @@ -115,14 +138,13 @@ def post(self): def post_message(self, data, idx): self.__lock.acquire() - LOGGER.info('Sending MT message # %i to Iridium', idx) + LOGGER.warn('Sending MT message # %i to Iridium', idx) self.__post_data['data'] = data.encode('hex') body = tornado.httputil.urlencode(self.__post_data) request = tornado.httpclient.HTTPRequest(self.__url, method='POST', body=body) self.__waiting_for_confirm[request] = (idx, data) self.__lock.release() self.__http_client.fetch(request, self.__on_message_sent) - def start(self): args = dict(cb=self.on_message_callback) @@ -130,8 +152,9 @@ def start(self): self.__http_server.listen(self.__port) LOGGER.warn('Starting iridum interface on %s', self.__url) + class MqttInterface(object): - def __init__(self, ip, port, user, pwd): + def __init__(self, ip, port, user, pwd, iridium_timeout): self.__broker_ip = ip self.__broker_port = port self.__broker_user = user @@ -141,9 +164,22 @@ def __init__(self, ip, port, user, pwd): self.__client_bad_connection_flag = False self.__publish_counter = 1 self.__iridium_counter = 0 + self.__iridium_timeout = iridium_timeout + self.__iridium_queue_cleared = False + self.__last_message_received = time.time() + self.__timeout_scheduler = None + self.__lock = Lock() self.lte_on_message_callback = None self.satcom_on_message_callback = None + def __on_receive_timeout(self): + self.__lock.acquire() + if (time.time() - self.__last_message_received > self.__iridium_timeout) and not self.__iridium_queue_cleared: + self.__client.publish('telem/SatCom_from_plane', None, qos=0, retain=True) + self.__iridium_queue_cleared = True + LOGGER.warn('Clear SatCom queue, no message from plane received for {0} seconds'.format(time.time() - self.__last_message_received)) + self.__lock.release() + def __connect(self): self.__client = mqtt.Client('relay_server') self.__client.on_connect = self.__on_connect @@ -208,15 +244,22 @@ def publish_lte_message(self, data): self.__publish_message('telem/LTE_from_plane', data, False) def publish_satcom_message(self, data): + self.__lock.acquire() + self.__iridium_queue_cleared = False + self.__last_message_received = time.time() self.__publish_message('telem/SatCom_from_plane', data, True) + self.__lock.release() def start(self): self.__connect() + self.__timeout_scheduler = tornado.ioloop.PeriodicCallback(self.__on_receive_timeout, 2500) + self.__timeout_scheduler.start() def stop(self): self.__client.loop_stop() self.__client.disconnect() self.__client = None + self.__timeout_scheduler.stop() LOGGER.warn('Stopped') @@ -234,8 +277,10 @@ def main(): user = credentials.get('mqtt', 'user') pwd = credentials.get('mqtt', 'password') rx_port = config.getint('lte', 'target_port') + lte_timeout = config.getint('lte', 'timeout') iridium_url = config.get('iridium', 'url') iridium_local_port = config.getint('iridium', 'local_port') + iridium_timeout = config.getint('iridium', 'timeout') rock7_credentials['imei'] = credentials.get('rockblock', 'imei') rock7_credentials['username'] = credentials.get('rockblock', 'username') rock7_credentials['password'] = credentials.get('rockblock', 'password') @@ -251,8 +296,8 @@ def main(): formatter = logging.Formatter(LOG_FORMAT) console.setFormatter(formatter) logging.getLogger('').addHandler(console) - mi = MqttInterface(host, port, user, pwd) - li = LteInterface(rx_port) + mi = MqttInterface(host, port, user, pwd, iridium_timeout) + li = LteInterface(rx_port, lte_timeout) ii = IridiumInterface(iridium_url, iridium_local_port, rock7_credentials) mi.lte_on_message_callback = li.send @@ -269,10 +314,13 @@ def main(): except KeyboardInterrupt: # start the stopping in a separate thread so that is not # stopped by the KeyboardInterrupt - a = Thread(target=mi.stop()) + a = Thread(target=mi.stop()) a.start() a.join() - + a = Thread(target=li.close()) + a.start() + a.join() + if __name__ == '__main__': main() diff --git a/udp2mqtt.py b/udp2mqtt.py index fe8afc7..86ec5a2 100755 --- a/udp2mqtt.py +++ b/udp2mqtt.py @@ -106,7 +106,7 @@ def __on_disconnect(self, client, userdata, rc): LOGGER.warn('Client disconnecting, reason: ' + str(rc)) def __callback_SatCom(self, client, userdata, msg): - LOGGER.info('MQTT received message from ' + msg.topic) + LOGGER.warn('MQTT received message from ' + msg.topic) self.satcom_on_message_callback(msg.payload) def __callback_LTE(self, client, userdata, msg): @@ -122,6 +122,7 @@ def publish_lte_message(self, data): self.__publish_message('telem/LTE_to_plane', data) def publish_satcom_message(self, data): + LOGGER.warn('Send SatCom message to plane') self.__publish_message('telem/SatCom_to_plane', data) def start(self): @@ -155,7 +156,7 @@ def main(): print(e) quit() - logging.basicConfig(filename='udp2mqtt.log', level=logging.INFO, format=LOG_FORMAT) + logging.basicConfig(filename='udp2mqtt.log', level=logging.WARN, format=LOG_FORMAT) console = logging.StreamHandler() console.setLevel(logging.WARN) formatter = logging.Formatter(LOG_FORMAT) @@ -179,7 +180,13 @@ def main(): except KeyboardInterrupt: # start the stopping in a separate thread so that is not # stopped by the KeyboardInterrupt - a = Thread(target=mi.stop()) + a = Thread(target=mi.stop()) + a.start() + a.join() + a = Thread(target=si.close()) + a.start() + a.join() + a = Thread(target=li.close()) a.start() a.join()