Skip to content

Commit

Permalink
Reset host ip address and iridium mqtt queue after a timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
acfloria committed Sep 23, 2019
1 parent f070ee7 commit 5635040
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 14 deletions.
8 changes: 8 additions & 0 deletions relay.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@ hostname = localhost
# UDP port on which the relay server is listening for messages
target_port = 30000

# Timeout for messages received from the plane. If no messages are received
# during this timespan the host ip and port of the plane are reset. [s]
timeout = 600

[iridium]
# Timeout for messages received from the plane. If no messages are received
# during this timespan the MQTT queue is cleared. [s]
timeout = 600

# Address of the destination Rock7 HTTP gateway:
# for simulator
url = http://localhost:45678
Expand Down
70 changes: 59 additions & 11 deletions relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,49 @@
LOGGER = logging.getLogger(__name__)

class LteInterface():
def __init__(self, rx_port):
def __init__(self, rx_port, timeout):
self.__sock = None
self.__rx_port = rx_port
self.__host_ip = None
self.__tx_port = None
self.__message_counter = 0
self.__bytes_counter = 0
self.__last_time = time.clock()
self.__last_time = time.time()
self.__receive_time = time.time()
self.__timeout = timeout
self.__timeout_scheduler = None
self.__lock = Lock()
self.on_message_callback = None

def __on_receive_timeout(self):
self.__lock.acquire()
if (time.time() - self.__receive_time > self.__timeout):
if self.__host_ip:
self.__host_ip = None
self.__tx_port = None
self.__message_counter = 0
self.__bytes_counter = 0
LOGGER.warn('No LTE message received for {0} seconds, resetting host ip.'.format(time.time() - self.__receive_time))
self.__last_time = time.time()
self.__lock.release()

def on_receive(self, fd, events):
(data, source_ip_port) = self.__sock.recvfrom(4096)
self.__lock.acquire()
self.__receive_time = time.time()

self.__message_counter += 1
self.__bytes_counter += sys.getsizeof(data) - 37
if (self.__message_counter % 1000 == 0):
LOGGER.warn('Received LTE data #{0}, rate: {1} kB/s'.format(self.__message_counter, self.__bytes_counter / (1000.0 * (time.clock() - self.__last_time))))
self.__last_time = time.clock()
LOGGER.warn('Received LTE data #{0}, rate: {1} kB/s'.format(self.__message_counter, self.__bytes_counter / (1000.0 * (time.time() - self.__last_time))))
self.__last_time = time.time()
self.__bytes_counter = 0
else:
LOGGER.info('Received LTE data #%d', self.__message_counter)

self.__host_ip = source_ip_port[0]
self.__tx_port = source_ip_port[1]
self.__lock.release()
self.on_message_callback(data)

def send(self, data):
Expand All @@ -54,9 +74,12 @@ def open(self):
self.__sock.setblocking(False)
tornado.ioloop.IOLoop.current().add_handler(self.__sock.fileno(), self.on_receive, tornado.ioloop.IOLoop.READ)
self.__sock.bind(('', self.__rx_port)) # all available interfaces
self.__timeout_scheduler = tornado.ioloop.PeriodicCallback(self.__on_receive_timeout, 1000)
self.__timeout_scheduler.start()

def close(self):
LOGGER.warn('Closing UDP port')
self.__timeout_scheduler.stop()
tornado.ioloop.IOLoop.current().remove_handler(self.__sock.fileno())
self.__sock.close()
self.__sock = None
Expand Down Expand Up @@ -115,23 +138,23 @@ def post(self):

def post_message(self, data, idx):
self.__lock.acquire()
LOGGER.info('Sending MT message # %i to Iridium', idx)
LOGGER.warn('Sending MT message # %i to Iridium', idx)
self.__post_data['data'] = data.encode('hex')
body = tornado.httputil.urlencode(self.__post_data)
request = tornado.httpclient.HTTPRequest(self.__url, method='POST', body=body)
self.__waiting_for_confirm[request] = (idx, data)
self.__lock.release()
self.__http_client.fetch(request, self.__on_message_sent)


def start(self):
args = dict(cb=self.on_message_callback)
self.__http_server = tornado.web.Application([(r"/", self.PostHandler, args)])
self.__http_server.listen(self.__port)
LOGGER.warn('Starting iridum interface on %s', self.__url)


class MqttInterface(object):
def __init__(self, ip, port, user, pwd):
def __init__(self, ip, port, user, pwd, iridium_timeout):
self.__broker_ip = ip
self.__broker_port = port
self.__broker_user = user
Expand All @@ -141,9 +164,22 @@ def __init__(self, ip, port, user, pwd):
self.__client_bad_connection_flag = False
self.__publish_counter = 1
self.__iridium_counter = 0
self.__iridium_timeout = iridium_timeout
self.__iridium_queue_cleared = False
self.__last_message_received = time.time()
self.__timeout_scheduler = None
self.__lock = Lock()
self.lte_on_message_callback = None
self.satcom_on_message_callback = None

def __on_receive_timeout(self):
self.__lock.acquire()
if (time.time() - self.__last_message_received > self.__iridium_timeout) and not self.__iridium_queue_cleared:
self.__client.publish('telem/SatCom_from_plane', None, qos=0, retain=True)
self.__iridium_queue_cleared = True
LOGGER.warn('Clear SatCom queue, no message from plane received for {0} seconds'.format(time.time() - self.__last_message_received))
self.__lock.release()

def __connect(self):
self.__client = mqtt.Client('relay_server')
self.__client.on_connect = self.__on_connect
Expand Down Expand Up @@ -208,15 +244,22 @@ def publish_lte_message(self, data):
self.__publish_message('telem/LTE_from_plane', data, False)

def publish_satcom_message(self, data):
self.__lock.acquire()
self.__iridium_queue_cleared = False
self.__last_message_received = time.time()
self.__publish_message('telem/SatCom_from_plane', data, True)
self.__lock.release()

def start(self):
self.__connect()
self.__timeout_scheduler = tornado.ioloop.PeriodicCallback(self.__on_receive_timeout, 2500)
self.__timeout_scheduler.start()

def stop(self):
self.__client.loop_stop()
self.__client.disconnect()
self.__client = None
self.__timeout_scheduler.stop()
LOGGER.warn('Stopped')


Expand All @@ -234,8 +277,10 @@ def main():
user = credentials.get('mqtt', 'user')
pwd = credentials.get('mqtt', 'password')
rx_port = config.getint('lte', 'target_port')
lte_timeout = config.getint('lte', 'timeout')
iridium_url = config.get('iridium', 'url')
iridium_local_port = config.getint('iridium', 'local_port')
iridium_timeout = config.getint('iridium', 'timeout')
rock7_credentials['imei'] = credentials.get('rockblock', 'imei')
rock7_credentials['username'] = credentials.get('rockblock', 'username')
rock7_credentials['password'] = credentials.get('rockblock', 'password')
Expand All @@ -251,8 +296,8 @@ def main():
formatter = logging.Formatter(LOG_FORMAT)
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
mi = MqttInterface(host, port, user, pwd)
li = LteInterface(rx_port)
mi = MqttInterface(host, port, user, pwd, iridium_timeout)
li = LteInterface(rx_port, lte_timeout)
ii = IridiumInterface(iridium_url, iridium_local_port, rock7_credentials)

mi.lte_on_message_callback = li.send
Expand All @@ -269,10 +314,13 @@ def main():
except KeyboardInterrupt:
# start the stopping in a separate thread so that is not
# stopped by the KeyboardInterrupt
a = Thread(target=mi.stop())
a = Thread(target=mi.stop())
a.start()
a.join()

a = Thread(target=li.close())
a.start()
a.join()


if __name__ == '__main__':
main()
13 changes: 10 additions & 3 deletions udp2mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __on_disconnect(self, client, userdata, rc):
LOGGER.warn('Client disconnecting, reason: ' + str(rc))

def __callback_SatCom(self, client, userdata, msg):
LOGGER.info('MQTT received message from ' + msg.topic)
LOGGER.warn('MQTT received message from ' + msg.topic)
self.satcom_on_message_callback(msg.payload)

def __callback_LTE(self, client, userdata, msg):
Expand All @@ -122,6 +122,7 @@ def publish_lte_message(self, data):
self.__publish_message('telem/LTE_to_plane', data)

def publish_satcom_message(self, data):
LOGGER.warn('Send SatCom message to plane')
self.__publish_message('telem/SatCom_to_plane', data)

def start(self):
Expand Down Expand Up @@ -155,7 +156,7 @@ def main():
print(e)
quit()

logging.basicConfig(filename='udp2mqtt.log', level=logging.INFO, format=LOG_FORMAT)
logging.basicConfig(filename='udp2mqtt.log', level=logging.WARN, format=LOG_FORMAT)
console = logging.StreamHandler()
console.setLevel(logging.WARN)
formatter = logging.Formatter(LOG_FORMAT)
Expand All @@ -179,7 +180,13 @@ def main():
except KeyboardInterrupt:
# start the stopping in a separate thread so that is not
# stopped by the KeyboardInterrupt
a = Thread(target=mi.stop())
a = Thread(target=mi.stop())
a.start()
a.join()
a = Thread(target=si.close())
a.start()
a.join()
a = Thread(target=li.close())
a.start()
a.join()

Expand Down

0 comments on commit 5635040

Please sign in to comment.