From 0c90d6e987b98070ece0b9c0c5b2b2fc2307af91 Mon Sep 17 00:00:00 2001 From: Luke Overend Date: Tue, 29 May 2018 11:35:51 +0100 Subject: [PATCH 1/2] Allow message to match on multiple OS This fixes #242 Currently when a matching OS is found we do not check remaining OS. This mean that if 2 OS have message prefixes that match each other's logs then you cannot be sure that the message will be forwarded onto the correct `device` process. I have updated so the message is now forwarded onto all matching os, not just the first match. To allow the tests to work correctly I have removed `'send_raw': True` from the config test. --- napalm_logs/server.py | 169 +++++++++++++++++++++++------------------- tests/test_config.py | 2 +- 2 files changed, 92 insertions(+), 79 deletions(-) diff --git a/napalm_logs/server.py b/napalm_logs/server.py index cbc29e22..d6fc9c0c 100644 --- a/napalm_logs/server.py +++ b/napalm_logs/server.py @@ -130,58 +130,70 @@ def _compile_prefixes(self): # log.debug('Compiled prefixes') # log.debug(self.compiled_prefixes) + def _identify_prefix(self, msg, data): + ''' + Check the message again each OS prefix and if matched return the + message dict + ''' + prefix_id = -1 + for prefix in data: + msg_dict = {} + prefix_id += 1 + match = None + if '__python_fun__' in prefix: + log.debug('Trying to match using the %s custom python profiler', prefix['__python_mod__']) + try: + match = prefix['__python_fun__'](msg) + except Exception: + log.error('Exception while parsing %s with the %s python profiler', + msg, prefix['__python_mod__'], exc_info=True) + else: + log.debug('Matching using YAML-defined profiler:') + log.debug(prefix['raw_prefix']) + match = prefix['prefix'].search(msg) + if not match: + log.debug('Match not found') + continue + if '__python_fun__' in prefix: + log.debug('%s matched using the custom python profiler %s', msg, prefix['__python_mod__']) + msg_dict = match # the output as-is from the custom function + else: + positions = prefix.get('prefix_positions', {}) + values = prefix.get('values') + msg_dict = {} + for key in values.keys(): + msg_dict[key] = match.group(positions.get(key)) + # Remove whitespace from the start or end of the message + msg_dict['__prefix_id__'] = prefix_id + msg_dict['message'] = msg_dict['message'].strip() + + # The pri has to be an int as it is retrived using regex '\<(\d+)\>' + if 'pri' in msg_dict: + msg_dict['facility'] = int(int(msg_dict['pri']) / 8) + msg_dict['severity'] = int(int(msg_dict['pri']) - (msg_dict['facility'] * 8)) + return msg_dict + def _identify_os(self, msg): ''' Using the prefix of the syslog message, we are able to identify the operating system and then continue parsing. ''' - ret = {} + ret = [] for dev_os, data in self.compiled_prefixes.items(): # TODO Should we prevent attepmting to determine the OS for the blacklisted? # [mircea] I think its good from a logging perspective to know at least that # that the server found the matching and it tells that it won't be processed # further. Later, we could potentially add an option to control this. log.debug('Matching under %s', dev_os) - prefix_id = -1 - for prefix in data: - prefix_id += 1 - match = None - if '__python_fun__' in prefix: - log.debug('Trying to match using the %s custom python profiler', prefix['__python_mod__']) - try: - match = prefix['__python_fun__'](msg) - except Exception: - log.error('Exception while parsing %s with the %s python profiler', - msg, prefix['__python_mod__'], exc_info=True) - else: - log.debug('Matching using YAML-defined profiler:') - log.debug(prefix['raw_prefix']) - match = prefix['prefix'].search(msg) - if not match: - log.debug('Match not found') - continue - if '__python_fun__' in prefix: - log.debug('%s matched using the custom python profiler %s', msg, prefix['__python_mod__']) - ret = match # the output as-is from the custom function - else: - positions = prefix.get('prefix_positions', {}) - values = prefix.get('values') - ret = {} - for key in values.keys(): - ret[key] = match.group(positions.get(key)) - # Remove whitespace from the start or end of the message - ret['__prefix_id__'] = prefix_id - ret['message'] = ret['message'].strip() - - # The pri has to be an int as it is retrived using regex '\<(\d+)\>' - if 'pri' in ret: - ret['facility'] = int(int(ret['pri']) / 8) - ret['severity'] = int(int(ret['pri']) - (ret['facility'] * 8)) - # TODO Should we stop searching and just return, or should we return all matches OS? - return dev_os, ret - log.debug('No prefix matched under %s', dev_os) - log.debug('No OS matched for: %s', msg) - return '', ret + msg_dict = self._identify_prefix(msg, data) + if msg_dict: + log.debug('Adding %s to list of matched OS', dev_os) + ret.append((dev_os, msg_dict)) + else: + log.debug('No match found for %s', dev_os) + if not ret: + log.debug('Not matched any OS') + return ret def start(self): ''' @@ -241,47 +253,48 @@ def start(self): msg = msg.encode('utf-8') log.debug('[%s] Dequeued message from %s: %s', address, msg, time.time()) napalm_logs_server_messages_received.inc() - dev_os, msg_dict = self._identify_os(msg) + os_list = self._identify_os(msg) - if dev_os and dev_os in self.started_os_proc: - # Identified the OS and the corresponding process is started. - # Then send the message in the right queue - log.debug('Identified OS: %s', dev_os) - log.debug('Queueing message to %s', dev_os) - if six.PY3: - dev_os = bytes(dev_os, 'utf-8') - self.pub.send_multipart([dev_os, - umsgpack.packb((msg_dict, address))]) - # self.os_pipes[dev_os].send((msg_dict, address)) - napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc() - napalm_logs_server_messages_device_queued.labels(device_os=dev_os).inc() + for dev_os, msg_dict in os_list: + if dev_os and dev_os in self.started_os_proc: + # Identified the OS and the corresponding process is started. + # Then send the message in the right queue + log.debug('Identified OS: %s', dev_os) + log.debug('Queueing message to %s', dev_os) + if six.PY3: + dev_os = bytes(dev_os, 'utf-8') + self.pub.send_multipart([dev_os, + umsgpack.packb((msg_dict, address))]) + # self.os_pipes[dev_os].send((msg_dict, address)) + napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc() + napalm_logs_server_messages_device_queued.labels(device_os=dev_os).inc() - elif dev_os and dev_os not in self.started_os_proc: - # Identified the OS, but the corresponding process does not seem to be started. - log.info('Unable to queue the message to %s. Is the sub-process started?', dev_os) - napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc() - napalm_logs_server_messages_failed_device_queuing.labels(device_os=dev_os).inc() + elif dev_os and dev_os not in self.started_os_proc: + # Identified the OS, but the corresponding process does not seem to be started. + log.info('Unable to queue the message to %s. Is the sub-process started?', dev_os) + napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc() + napalm_logs_server_messages_failed_device_queuing.labels(device_os=dev_os).inc() - elif not dev_os and self.opts['_server_send_unknown']: - # OS not identified, but the user requested to publish the message as-is - log.debug('Unable to identify the OS, sending directly to the publishers') - to_publish = { - 'ip': address, - 'host': 'unknown', - 'timestamp': int(time.time()), - 'message_details': msg_dict, - 'os': UNKNOWN_DEVICE_NAME, - 'error': 'UNKNOWN', - 'model_name': 'unknown' - } - self.publisher_pub.send(umsgpack.packb(to_publish)) - napalm_logs_server_messages_unknown_queued.inc() - napalm_logs_server_messages_without_identified_os.inc() + elif not dev_os and self.opts['_server_send_unknown']: + # OS not identified, but the user requested to publish the message as-is + log.debug('Unable to identify the OS, sending directly to the publishers') + to_publish = { + 'ip': address, + 'host': 'unknown', + 'timestamp': int(time.time()), + 'message_details': msg_dict, + 'os': UNKNOWN_DEVICE_NAME, + 'error': 'UNKNOWN', + 'model_name': 'unknown' + } + self.publisher_pub.send(umsgpack.packb(to_publish)) + napalm_logs_server_messages_unknown_queued.inc() + napalm_logs_server_messages_without_identified_os.inc() - elif not dev_os and not self.opts['_server_send_unknown']: - # OS not identified and we are told to do nothing - log.debug('Unable to identify the OS') - napalm_logs_server_messages_without_identified_os.inc() + elif not dev_os and not self.opts['_server_send_unknown']: + # OS not identified and we are told to do nothing + log.debug('Unable to identify the OS') + napalm_logs_server_messages_without_identified_os.inc() def stop(self): log.info('Stopping server process') diff --git a/tests/test_config.py b/tests/test_config.py index 1dd6e4e4..be3449fc 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -47,7 +47,7 @@ def startup_proc(): NL_BASE = NapalmLogs(disable_security=True, address=NAPALM_LOGS_TEST_ADDR, port=NAPALM_LOGS_TEST_PORT, - publisher=[{'zmq': {'send_raw': True, 'send_unknown': True}}], + publisher=[{'zmq': {'send_unknown': True}}], listener=[{'udp': {}}], publish_address=NAPALM_LOGS_TEST_PUB_ADDR, publish_port=NAPALM_LOGS_TEST_PUB_PORT, From 15adfe302c0cf3f38ff54930303305021ec8b3cd Mon Sep 17 00:00:00 2001 From: Luke Overend Date: Tue, 29 May 2018 15:29:21 +0100 Subject: [PATCH 2/2] Remove unused variable `err` This fixes the following: ``` =================================== FAILURES =================================== _______________________________________ _______________________________________ napalm_logs/listener/kafka.py:17:1: W0612 local variable 'err' is assigned to but never used [pyflakes] _______________________________________ _______________________________________ napalm_logs/listener/zeromq.py:16:1: W0612 local variable 'err' is assigned to but never used [pyflakes] _______________________________________ _______________________________________ napalm_logs/transport/kafka.py:15:1: W0612 local variable 'err' is assigned to but never used [pyflakes] ``` --- napalm_logs/listener/kafka.py | 2 +- napalm_logs/listener/zeromq.py | 2 +- napalm_logs/transport/kafka.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/napalm_logs/listener/kafka.py b/napalm_logs/listener/kafka.py index 6ad9d2f8..fcd81cbe 100644 --- a/napalm_logs/listener/kafka.py +++ b/napalm_logs/listener/kafka.py @@ -14,7 +14,7 @@ try: import kafka HAS_KAFKA = True -except ImportError as err: +except ImportError: HAS_KAFKA = False # Import napalm-logs pkgs diff --git a/napalm_logs/listener/zeromq.py b/napalm_logs/listener/zeromq.py index 9f235423..ddb7cb53 100644 --- a/napalm_logs/listener/zeromq.py +++ b/napalm_logs/listener/zeromq.py @@ -13,7 +13,7 @@ try: import zmq HAS_ZMQ = True -except ImportError as err: +except ImportError: HAS_ZMQ = False # Import napalm-logs pkgs diff --git a/napalm_logs/transport/kafka.py b/napalm_logs/transport/kafka.py index 6a40f8d8..f9614ecd 100644 --- a/napalm_logs/transport/kafka.py +++ b/napalm_logs/transport/kafka.py @@ -12,7 +12,7 @@ try: import kafka HAS_KAFKA = True -except ImportError as err: +except ImportError: HAS_KAFKA = False # Import napalm-logs pkgs