Skip to content

Commit

Permalink
Merge pull request #245 from loverend/send_to_multiple_os
Browse files Browse the repository at this point in the history
Allow message to match on multiple OS
  • Loading branch information
mirceaulinic authored May 30, 2018
2 parents fa78472 + 15adfe3 commit ebc36d2
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 82 deletions.
2 changes: 1 addition & 1 deletion napalm_logs/listener/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
try:
import kafka
HAS_KAFKA = True
except ImportError as err:
except ImportError:
HAS_KAFKA = False

# Import napalm-logs pkgs
Expand Down
2 changes: 1 addition & 1 deletion napalm_logs/listener/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
try:
import zmq
HAS_ZMQ = True
except ImportError as err:
except ImportError:
HAS_ZMQ = False

# Import napalm-logs pkgs
Expand Down
169 changes: 91 additions & 78 deletions napalm_logs/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,58 +130,70 @@ def _compile_prefixes(self):
# log.debug('Compiled prefixes')
# log.debug(self.compiled_prefixes)

def _identify_prefix(self, msg, data):
'''
Check the message again each OS prefix and if matched return the
message dict
'''
prefix_id = -1
for prefix in data:
msg_dict = {}
prefix_id += 1
match = None
if '__python_fun__' in prefix:
log.debug('Trying to match using the %s custom python profiler', prefix['__python_mod__'])
try:
match = prefix['__python_fun__'](msg)
except Exception:
log.error('Exception while parsing %s with the %s python profiler',
msg, prefix['__python_mod__'], exc_info=True)
else:
log.debug('Matching using YAML-defined profiler:')
log.debug(prefix['raw_prefix'])
match = prefix['prefix'].search(msg)
if not match:
log.debug('Match not found')
continue
if '__python_fun__' in prefix:
log.debug('%s matched using the custom python profiler %s', msg, prefix['__python_mod__'])
msg_dict = match # the output as-is from the custom function
else:
positions = prefix.get('prefix_positions', {})
values = prefix.get('values')
msg_dict = {}
for key in values.keys():
msg_dict[key] = match.group(positions.get(key))
# Remove whitespace from the start or end of the message
msg_dict['__prefix_id__'] = prefix_id
msg_dict['message'] = msg_dict['message'].strip()

# The pri has to be an int as it is retrived using regex '\<(\d+)\>'
if 'pri' in msg_dict:
msg_dict['facility'] = int(int(msg_dict['pri']) / 8)
msg_dict['severity'] = int(int(msg_dict['pri']) - (msg_dict['facility'] * 8))
return msg_dict

def _identify_os(self, msg):
'''
Using the prefix of the syslog message,
we are able to identify the operating system and then continue parsing.
'''
ret = {}
ret = []
for dev_os, data in self.compiled_prefixes.items():
# TODO Should we prevent attepmting to determine the OS for the blacklisted?
# [mircea] I think its good from a logging perspective to know at least that
# that the server found the matching and it tells that it won't be processed
# further. Later, we could potentially add an option to control this.
log.debug('Matching under %s', dev_os)
prefix_id = -1
for prefix in data:
prefix_id += 1
match = None
if '__python_fun__' in prefix:
log.debug('Trying to match using the %s custom python profiler', prefix['__python_mod__'])
try:
match = prefix['__python_fun__'](msg)
except Exception:
log.error('Exception while parsing %s with the %s python profiler',
msg, prefix['__python_mod__'], exc_info=True)
else:
log.debug('Matching using YAML-defined profiler:')
log.debug(prefix['raw_prefix'])
match = prefix['prefix'].search(msg)
if not match:
log.debug('Match not found')
continue
if '__python_fun__' in prefix:
log.debug('%s matched using the custom python profiler %s', msg, prefix['__python_mod__'])
ret = match # the output as-is from the custom function
else:
positions = prefix.get('prefix_positions', {})
values = prefix.get('values')
ret = {}
for key in values.keys():
ret[key] = match.group(positions.get(key))
# Remove whitespace from the start or end of the message
ret['__prefix_id__'] = prefix_id
ret['message'] = ret['message'].strip()

# The pri has to be an int as it is retrived using regex '\<(\d+)\>'
if 'pri' in ret:
ret['facility'] = int(int(ret['pri']) / 8)
ret['severity'] = int(int(ret['pri']) - (ret['facility'] * 8))
# TODO Should we stop searching and just return, or should we return all matches OS?
return dev_os, ret
log.debug('No prefix matched under %s', dev_os)
log.debug('No OS matched for: %s', msg)
return '', ret
msg_dict = self._identify_prefix(msg, data)
if msg_dict:
log.debug('Adding %s to list of matched OS', dev_os)
ret.append((dev_os, msg_dict))
else:
log.debug('No match found for %s', dev_os)
if not ret:
log.debug('Not matched any OS')
return ret

def start(self):
'''
Expand Down Expand Up @@ -241,47 +253,48 @@ def start(self):
msg = msg.encode('utf-8')
log.debug('[%s] Dequeued message from %s: %s', address, msg, time.time())
napalm_logs_server_messages_received.inc()
dev_os, msg_dict = self._identify_os(msg)
os_list = self._identify_os(msg)

if dev_os and dev_os in self.started_os_proc:
# Identified the OS and the corresponding process is started.
# Then send the message in the right queue
log.debug('Identified OS: %s', dev_os)
log.debug('Queueing message to %s', dev_os)
if six.PY3:
dev_os = bytes(dev_os, 'utf-8')
self.pub.send_multipart([dev_os,
umsgpack.packb((msg_dict, address))])
# self.os_pipes[dev_os].send((msg_dict, address))
napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc()
napalm_logs_server_messages_device_queued.labels(device_os=dev_os).inc()
for dev_os, msg_dict in os_list:
if dev_os and dev_os in self.started_os_proc:
# Identified the OS and the corresponding process is started.
# Then send the message in the right queue
log.debug('Identified OS: %s', dev_os)
log.debug('Queueing message to %s', dev_os)
if six.PY3:
dev_os = bytes(dev_os, 'utf-8')
self.pub.send_multipart([dev_os,
umsgpack.packb((msg_dict, address))])
# self.os_pipes[dev_os].send((msg_dict, address))
napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc()
napalm_logs_server_messages_device_queued.labels(device_os=dev_os).inc()

elif dev_os and dev_os not in self.started_os_proc:
# Identified the OS, but the corresponding process does not seem to be started.
log.info('Unable to queue the message to %s. Is the sub-process started?', dev_os)
napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc()
napalm_logs_server_messages_failed_device_queuing.labels(device_os=dev_os).inc()
elif dev_os and dev_os not in self.started_os_proc:
# Identified the OS, but the corresponding process does not seem to be started.
log.info('Unable to queue the message to %s. Is the sub-process started?', dev_os)
napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc()
napalm_logs_server_messages_failed_device_queuing.labels(device_os=dev_os).inc()

elif not dev_os and self.opts['_server_send_unknown']:
# OS not identified, but the user requested to publish the message as-is
log.debug('Unable to identify the OS, sending directly to the publishers')
to_publish = {
'ip': address,
'host': 'unknown',
'timestamp': int(time.time()),
'message_details': msg_dict,
'os': UNKNOWN_DEVICE_NAME,
'error': 'UNKNOWN',
'model_name': 'unknown'
}
self.publisher_pub.send(umsgpack.packb(to_publish))
napalm_logs_server_messages_unknown_queued.inc()
napalm_logs_server_messages_without_identified_os.inc()
elif not dev_os and self.opts['_server_send_unknown']:
# OS not identified, but the user requested to publish the message as-is
log.debug('Unable to identify the OS, sending directly to the publishers')
to_publish = {
'ip': address,
'host': 'unknown',
'timestamp': int(time.time()),
'message_details': msg_dict,
'os': UNKNOWN_DEVICE_NAME,
'error': 'UNKNOWN',
'model_name': 'unknown'
}
self.publisher_pub.send(umsgpack.packb(to_publish))
napalm_logs_server_messages_unknown_queued.inc()
napalm_logs_server_messages_without_identified_os.inc()

elif not dev_os and not self.opts['_server_send_unknown']:
# OS not identified and we are told to do nothing
log.debug('Unable to identify the OS')
napalm_logs_server_messages_without_identified_os.inc()
elif not dev_os and not self.opts['_server_send_unknown']:
# OS not identified and we are told to do nothing
log.debug('Unable to identify the OS')
napalm_logs_server_messages_without_identified_os.inc()

def stop(self):
log.info('Stopping server process')
Expand Down
2 changes: 1 addition & 1 deletion napalm_logs/transport/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
try:
import kafka
HAS_KAFKA = True
except ImportError as err:
except ImportError:
HAS_KAFKA = False

# Import napalm-logs pkgs
Expand Down
2 changes: 1 addition & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def startup_proc():
NL_BASE = NapalmLogs(disable_security=True,
address=NAPALM_LOGS_TEST_ADDR,
port=NAPALM_LOGS_TEST_PORT,
publisher=[{'zmq': {'send_raw': True, 'send_unknown': True}}],
publisher=[{'zmq': {'send_unknown': True}}],
listener=[{'udp': {}}],
publish_address=NAPALM_LOGS_TEST_PUB_ADDR,
publish_port=NAPALM_LOGS_TEST_PUB_PORT,
Expand Down

0 comments on commit ebc36d2

Please sign in to comment.