Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow message to match on multiple OS #245

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion napalm_logs/listener/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
try:
import kafka
HAS_KAFKA = True
except ImportError as err:
except ImportError:
HAS_KAFKA = False

# Import napalm-logs pkgs
Expand Down
2 changes: 1 addition & 1 deletion napalm_logs/listener/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
try:
import zmq
HAS_ZMQ = True
except ImportError as err:
except ImportError:
HAS_ZMQ = False

# Import napalm-logs pkgs
Expand Down
169 changes: 91 additions & 78 deletions napalm_logs/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,58 +130,70 @@ def _compile_prefixes(self):
# log.debug('Compiled prefixes')
# log.debug(self.compiled_prefixes)

def _identify_prefix(self, msg, data):
'''
Check the message again each OS prefix and if matched return the
message dict
'''
prefix_id = -1
for prefix in data:
msg_dict = {}
prefix_id += 1
match = None
if '__python_fun__' in prefix:
log.debug('Trying to match using the %s custom python profiler', prefix['__python_mod__'])
try:
match = prefix['__python_fun__'](msg)
except Exception:
log.error('Exception while parsing %s with the %s python profiler',
msg, prefix['__python_mod__'], exc_info=True)
else:
log.debug('Matching using YAML-defined profiler:')
log.debug(prefix['raw_prefix'])
match = prefix['prefix'].search(msg)
if not match:
log.debug('Match not found')
continue
if '__python_fun__' in prefix:
log.debug('%s matched using the custom python profiler %s', msg, prefix['__python_mod__'])
msg_dict = match # the output as-is from the custom function
else:
positions = prefix.get('prefix_positions', {})
values = prefix.get('values')
msg_dict = {}
for key in values.keys():
msg_dict[key] = match.group(positions.get(key))
# Remove whitespace from the start or end of the message
msg_dict['__prefix_id__'] = prefix_id
msg_dict['message'] = msg_dict['message'].strip()

# The pri has to be an int as it is retrived using regex '\<(\d+)\>'
if 'pri' in msg_dict:
msg_dict['facility'] = int(int(msg_dict['pri']) / 8)
msg_dict['severity'] = int(int(msg_dict['pri']) - (msg_dict['facility'] * 8))
return msg_dict

def _identify_os(self, msg):
'''
Using the prefix of the syslog message,
we are able to identify the operating system and then continue parsing.
'''
ret = {}
ret = []
for dev_os, data in self.compiled_prefixes.items():
# TODO Should we prevent attepmting to determine the OS for the blacklisted?
# [mircea] I think its good from a logging perspective to know at least that
# that the server found the matching and it tells that it won't be processed
# further. Later, we could potentially add an option to control this.
log.debug('Matching under %s', dev_os)
prefix_id = -1
for prefix in data:
prefix_id += 1
match = None
if '__python_fun__' in prefix:
log.debug('Trying to match using the %s custom python profiler', prefix['__python_mod__'])
try:
match = prefix['__python_fun__'](msg)
except Exception:
log.error('Exception while parsing %s with the %s python profiler',
msg, prefix['__python_mod__'], exc_info=True)
else:
log.debug('Matching using YAML-defined profiler:')
log.debug(prefix['raw_prefix'])
match = prefix['prefix'].search(msg)
if not match:
log.debug('Match not found')
continue
if '__python_fun__' in prefix:
log.debug('%s matched using the custom python profiler %s', msg, prefix['__python_mod__'])
ret = match # the output as-is from the custom function
else:
positions = prefix.get('prefix_positions', {})
values = prefix.get('values')
ret = {}
for key in values.keys():
ret[key] = match.group(positions.get(key))
# Remove whitespace from the start or end of the message
ret['__prefix_id__'] = prefix_id
ret['message'] = ret['message'].strip()

# The pri has to be an int as it is retrived using regex '\<(\d+)\>'
if 'pri' in ret:
ret['facility'] = int(int(ret['pri']) / 8)
ret['severity'] = int(int(ret['pri']) - (ret['facility'] * 8))
# TODO Should we stop searching and just return, or should we return all matches OS?
return dev_os, ret
log.debug('No prefix matched under %s', dev_os)
log.debug('No OS matched for: %s', msg)
return '', ret
msg_dict = self._identify_prefix(msg, data)
if msg_dict:
log.debug('Adding %s to list of matched OS', dev_os)
ret.append((dev_os, msg_dict))
else:
log.debug('No match found for %s', dev_os)
if not ret:
log.debug('Not matched any OS')
return ret

def start(self):
'''
Expand Down Expand Up @@ -241,47 +253,48 @@ def start(self):
msg = msg.encode('utf-8')
log.debug('[%s] Dequeued message from %s: %s', address, msg, time.time())
napalm_logs_server_messages_received.inc()
dev_os, msg_dict = self._identify_os(msg)
os_list = self._identify_os(msg)

if dev_os and dev_os in self.started_os_proc:
# Identified the OS and the corresponding process is started.
# Then send the message in the right queue
log.debug('Identified OS: %s', dev_os)
log.debug('Queueing message to %s', dev_os)
if six.PY3:
dev_os = bytes(dev_os, 'utf-8')
self.pub.send_multipart([dev_os,
umsgpack.packb((msg_dict, address))])
# self.os_pipes[dev_os].send((msg_dict, address))
napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc()
napalm_logs_server_messages_device_queued.labels(device_os=dev_os).inc()
for dev_os, msg_dict in os_list:
if dev_os and dev_os in self.started_os_proc:
# Identified the OS and the corresponding process is started.
# Then send the message in the right queue
log.debug('Identified OS: %s', dev_os)
log.debug('Queueing message to %s', dev_os)
if six.PY3:
dev_os = bytes(dev_os, 'utf-8')
self.pub.send_multipart([dev_os,
umsgpack.packb((msg_dict, address))])
# self.os_pipes[dev_os].send((msg_dict, address))
napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc()
napalm_logs_server_messages_device_queued.labels(device_os=dev_os).inc()

elif dev_os and dev_os not in self.started_os_proc:
# Identified the OS, but the corresponding process does not seem to be started.
log.info('Unable to queue the message to %s. Is the sub-process started?', dev_os)
napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc()
napalm_logs_server_messages_failed_device_queuing.labels(device_os=dev_os).inc()
elif dev_os and dev_os not in self.started_os_proc:
# Identified the OS, but the corresponding process does not seem to be started.
log.info('Unable to queue the message to %s. Is the sub-process started?', dev_os)
napalm_logs_server_messages_with_identified_os.labels(device_os=dev_os).inc()
napalm_logs_server_messages_failed_device_queuing.labels(device_os=dev_os).inc()

elif not dev_os and self.opts['_server_send_unknown']:
# OS not identified, but the user requested to publish the message as-is
log.debug('Unable to identify the OS, sending directly to the publishers')
to_publish = {
'ip': address,
'host': 'unknown',
'timestamp': int(time.time()),
'message_details': msg_dict,
'os': UNKNOWN_DEVICE_NAME,
'error': 'UNKNOWN',
'model_name': 'unknown'
}
self.publisher_pub.send(umsgpack.packb(to_publish))
napalm_logs_server_messages_unknown_queued.inc()
napalm_logs_server_messages_without_identified_os.inc()
elif not dev_os and self.opts['_server_send_unknown']:
# OS not identified, but the user requested to publish the message as-is
log.debug('Unable to identify the OS, sending directly to the publishers')
to_publish = {
'ip': address,
'host': 'unknown',
'timestamp': int(time.time()),
'message_details': msg_dict,
'os': UNKNOWN_DEVICE_NAME,
'error': 'UNKNOWN',
'model_name': 'unknown'
}
self.publisher_pub.send(umsgpack.packb(to_publish))
napalm_logs_server_messages_unknown_queued.inc()
napalm_logs_server_messages_without_identified_os.inc()

elif not dev_os and not self.opts['_server_send_unknown']:
# OS not identified and we are told to do nothing
log.debug('Unable to identify the OS')
napalm_logs_server_messages_without_identified_os.inc()
elif not dev_os and not self.opts['_server_send_unknown']:
# OS not identified and we are told to do nothing
log.debug('Unable to identify the OS')
napalm_logs_server_messages_without_identified_os.inc()

def stop(self):
log.info('Stopping server process')
Expand Down
2 changes: 1 addition & 1 deletion napalm_logs/transport/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
try:
import kafka
HAS_KAFKA = True
except ImportError as err:
except ImportError:
HAS_KAFKA = False

# Import napalm-logs pkgs
Expand Down
2 changes: 1 addition & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def startup_proc():
NL_BASE = NapalmLogs(disable_security=True,
address=NAPALM_LOGS_TEST_ADDR,
port=NAPALM_LOGS_TEST_PORT,
publisher=[{'zmq': {'send_raw': True, 'send_unknown': True}}],
publisher=[{'zmq': {'send_unknown': True}}],
listener=[{'udp': {}}],
publish_address=NAPALM_LOGS_TEST_PUB_ADDR,
publish_port=NAPALM_LOGS_TEST_PUB_PORT,
Expand Down