From dcaede59714d40a97febb391aab2263eb9e8ef66 Mon Sep 17 00:00:00 2001 From: Shyam Kumar Date: Tue, 9 Jan 2024 21:47:10 -0800 Subject: [PATCH 1/5] SONiC FM (Fault Mgmt) infrastructure -Base version - This adds a generic FM infrastructure to SONiC for fault analysis and handling. Broadly comprising of following three entities: 1. Faults (Events) publisher daemon which formulates certain fault events and populate them to EVENT_TABLE in redisDB 2. Faults manager daemon which gets events from redisDB, parses them against schema (sonic-event.yang), perform lookup for fault type & severity in fault_policy.json file to determine fault action 3. fault_policy.json file comprises of generic and platform specific F-A (Fault-Action) blocks i.e. for a particular fault type & severity, what all action(s) are needed (to recover the system from the fault). It abstracts platform/HWSKU fault handling nuances from the open source NOS (e.g. SONiC) Signed-off-by: Shyam Kumar --- sonic-faultmgrd/scripts/fault_policy.json | 29 ++ sonic-faultmgrd/scripts/faultmgrd | 331 ++++++++++++++++++ sonic-faultmgrd/scripts/faultpubd | 389 ++++++++++++++++++++++ 3 files changed, 749 insertions(+) create mode 100755 sonic-faultmgrd/scripts/fault_policy.json create mode 100755 sonic-faultmgrd/scripts/faultmgrd create mode 100644 sonic-faultmgrd/scripts/faultpubd diff --git a/sonic-faultmgrd/scripts/fault_policy.json b/sonic-faultmgrd/scripts/fault_policy.json new file mode 100755 index 000000000..b0cb74530 --- /dev/null +++ b/sonic-faultmgrd/scripts/fault_policy.json @@ -0,0 +1,29 @@ +{ + "chassis": [ + { + "name": "Cisco-8102-C64", + "faults": [ + { + "type" : "CUSTOM_EVPROFILE_CHANGE", + "severity" : "MAJOR", + "action" : ["syslog"] + }, + { + "type" : "TEMPERATURE_EXCEEDED", + "severity" : "CRITICAL", + "action" : ["syslog", "obfl", "reload"] + }, + { + "type": "FANS MISSING", + "severity": "CRITICAL", + "action" : ["syslog", "obfl", "shutdown"] + }, + { + "type": "EVENTS_MONITORING", + "severity": "WARNING", + "action" : ["syslog"] + } + ] + } + ] +} diff --git a/sonic-faultmgrd/scripts/faultmgrd b/sonic-faultmgrd/scripts/faultmgrd new file mode 100755 index 000000000..2fcc5097f --- /dev/null +++ b/sonic-faultmgrd/scripts/faultmgrd @@ -0,0 +1,331 @@ +#!/usr/bin/env python3 + +import os +import re +import yaml +import argparse +import json + +import redis +from cisco.pacific.thermal.thermal_zone import thermal_zone +from sonic_py_common import daemon_base, multi_asic, logger +from swsscommon import swsscommon + +import sys +import syslog +import time +import threading +from enum import Enum +import subprocess +import uuid + +##################################### +## Host microservice fault_manager ## +##################################### + +SYSLOG_IDENTIFIER = 'faultMgrd' +helper_logger = logger.Logger(SYSLOG_IDENTIFIER) + +ASYNC_CONN_WAIT = 0.3 +# In case of no event from publisher(s), event subscriber's +# max time (in ms) to wait before checking for the next event +RECEIVE_TIMEOUT = 1000 +rc_of_events_received = -1 +redisDB_event_count = 0 +publish_cnt = 0 +json_data = {} + +######################################################################### +# 1. Presently, events (faults) are published by sonic-events*.yang files +# source location: /usr/local/yang-models/ +# 2. In near future, there would be a single events SCHEMA yang file +# named: sonic-event.yang +# 3. Subscribe to event-framework to recieve events +# 4. Look for the events periodically; as and when they are received, +# parse and process them; formulate an event (fault) entry locally +# 5. Add the event data as an entry in the new EVENT_TABLE of chassis +# redis-DB +# 6. Then fetch the event entry from this EVENT_TABLE, perform a lookup +# on event's id and severity in fault__policy info file (table) +# to find a match. +# 7. Once a match is found, take the action(s) as specified by the match +######################################################################### + +class faultManager(): + # Interval to run this microservice + FM_INTERVAL = 15 + # Policy file defining fault policies and their respective actions + FM_POLICY_FILE = '/usr/share/sonic/platform/fault_policy.json' + # microservice identifier + + _thermal_zone = None + _interval = 15 + + _card_type = None + _pi_slot_id = None + + _redis_chassis_db = None + _state_db = {} + _temperature_info_tbl = {} + _system_health_info_tbl = {} + TEMPERATURE_INFO_TABLE = "TEMPERATURE_INFO" + SYSTEM_HEALTH_INFO_TABLE = "SYSTEM_HEALTH_INFO" + + EVENT_ENTRY_KEY = 'EVENT_TABLE|{}' + + REDIS_DB_SERVERS = { + 'local': {'host': 'localhost', 'port': 6379}, + 'chassis': {'host': 'redis_chassis.server', 'port': 6380}} + + def access_redis_db(self): + ''' + Connect to all required tables for all corresponding namespace databases + ''' + for namespace in self.namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + # Connet to redis STATE_DB to fetch data from its tables + self._state_db[asic_id] = daemon_base.db_connect('STATE_DB', namespace) + self._temperature_info_tbl[asic_id] = swsscommon.Table(self._state_db[asic_id], self.TEMPERATURE_INFO_TABLE) + self._system_health_info_tbl[asic_id] = swsscommon.Table(self._state_db[asic_id], self.SYSTEM_HEALTH_INFO_TABLE) + + def determine_fault_action_policies(self): + global json_data + try: + # Load, parse fault_policy json file + helper_logger.log_notice("INIT data from JSON file: {}".format(json_data)) + with open('./fault_policy.json', 'r') as f: + json_data = json.load(f) + helper_logger.log_notice("data from JSON file: {}".format(json_data)) + helper_logger.log_notice("chassis name: {}".format(json_data['chassis'])) + for entry in json_data['chassis']: + helper_logger.log_notice("chassis entry: {}".format(entry)) + helper_logger.log_notice("each entry name: {}".format(entry['name'])) + helper_logger.log_notice("each entry faults list: {}".format(entry['faults'])) + for fault_entry in entry['faults']: + helper_logger.log_notice("TRAVERSING through fault entry: {}".format(fault_entry)) + helper_logger.log_notice("TRAVERSING inside each fault entry TYPE : {}".format(fault_entry['type'])) + helper_logger.log_notice("TRAVERSING inside each fault entry SEVERITY: {}".format(fault_entry['severity'])) + helper_logger.log_notice("TRAVERSING inside each fault entry ACTION: {}".format(fault_entry['action'])) + + #TODO: correct this error name + except redis.exceptions.ConnectionError as exc: + helper_logger.log_notice('issue with opening JSON policy file or parsing its contents: {}'.format(exc)) + + def cleanup_DB_events(self): + redisDB_event_count = 75 + while redisDB_event_count >= 0: + #self._redis_chassis_db.delete(self.EVENT_ENTRY_KEY.format('{}'.format('*'))) + self._redis_chassis_db.delete(self.EVENT_ENTRY_KEY.format('{}'.format(redisDB_event_count))) + helper_logger.log_notice('done deleting redisDB_event_count:{} entry from chassis redisDB'.format(redisDB_event_count)) + redisDB_event_count = redisDB_event_count - 1 + + def __init__(self, id): + """ + Initializer of faultManager microservice(process) + """ + super(faultManager, self).__init__() + + self.stop_event = threading.Event() + self.wait_time = self.FM_INTERVAL + + self._thermal_zone = thermal_zone(cfg_filename=os.path.join(os.path.sep, 'opt', 'cisco', 'etc', 'thermal_zone.yaml')) + helper_logger.log_notice("thermal_zone: {}".format(self._thermal_zone)) + self._interval = self._thermal_zone.interval + helper_logger.log_notice("interval: {}".format(self._interval)) + + # Select redis chassis server details, card type and slot id from chassis bootstrap details + bootstrap = self._thermal_zone.platform.bootstrap + helper_logger.log_notice("bootstrap: {}".format(bootstrap)) + + try: + self._card_type = bootstrap['card_type'].split(':')[0] + slot_id = bootstrap['slot_id'] + # Distributed platforms host a specific chassis DB server on the RP + # Fixed platforms do not need this, and just use the default local host + if bootstrap['platform_type'] == 'distributed': + # Convert slot_id from pd to pi + self._pi_slot_id = bootstrap['chassis']['{}_pd_to_pi'.format(self._card_type.lower())][slot_id] + server = self.REDIS_DB_SERVERS['chassis'] + else: + self._pi_slot_id = slot_id + server = self.REDIS_DB_SERVERS['local'] + except KeyError as exc: + raise Exception('Insufficient platform bootstrap data for DB key: {}'.format(exc)) + self._redis_chassis_db = redis.Redis( + host=server['host'], + port=server['port'], + decode_responses=True, + db=swsscommon.CHASSIS_STATE_DB) + + # Fetch namespaces based on number of ASICs on the chassis or the board + # For instance, on a multi-asic chassis or board (RP,LC): + # - Detected namespaces: ['asic0', 'asic1', 'asic2']; num_asics 3; asic_id_list [0, 1, 2] + # For single-asic chassis or board, + # - Detected namespaces: ['']; num_asics 1; asic_id_list [] + # Load the namespace details first from the database_global.json file + if multi_asic.is_multi_asic(): + swsscommon.SonicDBConfig.initializeGlobalConfig() + self.namespaces = multi_asic.get_front_end_namespaces() + + # Initialize redisDB accessor method + self.access_redis_db() + + # Fault-Action policies + self.determine_fault_action_policies() + + #TODO : ensure if this intial cleanup is neededfault_action_policy_info + # cleanup all 'events' i.e. EVENT_TABLE entries in chassis redisDB + self.cleanup_DB_events() + + def stop_fault_manager_algorithm(self): + ''' + Stop fault manager algorithm + ''' + global redisDB_event_count + self._algorithm_running = False + + # Delete the EVENT_TABLE contents from chassis_redis_db + # cleanup all 'events' i.e. EVENT_TABLE entries in chassis redisDB + helper_logger.log_notice('redisDB_event_count:{} at almost done stage'.format(redisDB_event_count)) + self.cleanup_DB_events() + + def stop(self): + ''' + Stop fault manager instance + ''' + self.stop_fault_manager_algorithm() + self._running = False + + def deinitialize(self): + ''' + Destroy fault manager instance + ''' + self.stop() + self._thermal_zone = None + self._redis_chassis_db = None + self._card_type = None + self._pi_slot_id = None + + self._state_db = {} + self._temperature_info_tbl, self._system_health_info_tbl = {}, {} + + def map_dict_fvm(self, s, d): + for k, v in s.items(): + d[k] = v + + def map_db_event_to_action(self, json_data): + # processing events from redisDB table + ''' + Retrieve events (faults) data from the redisDB and parse them against fault_policy JSON file + ''' + try: + for key in self._redis_chassis_db.keys(pattern=self.EVENT_ENTRY_KEY.format('*')): + helper_logger.log_notice("KEY fetched: {} from EVENT_TABLE of chassis redisDB".format(key)) + event_data = {} + evdata = {} + data = self._redis_chassis_db.hgetall(key) + evdata['id'] = int(data['id']) + evdata['resource'] = data['resource'] + evdata['text'] = data['text'] + evdata['time-created'] = data['time-created'] + evdata['type-id'] = data['type-id'] + evdata['severity'] = data['severity'] + helper_logger.log_notice("Updating evdata[id]:{}".format(evdata['id'])) + #helper_logger.log_notice("KEY evdata[id]:{}".format(key[evdata['id']])) + event_data[evdata['id']] = evdata + helper_logger.log_notice("Received event: {} from chassis redisDB EVENT_TABLE".format(event_data[evdata['id']])) + helper_logger.log_notice(" event_data DICT from chassis redisDB EVENT_TABLE: {}".format(event_data)) + + # Iterate through the fault_action_policy_info dictionary to find a match for the received event (fault). + # This dictionary is derived from fault_action_policy_info json file. + for entry in json_data['chassis']: + for fault_entry in entry['faults']: + if ((fault_entry['type'] == evdata['type-id']) and (fault_entry['severity'] == evdata['severity'])): + helper_logger.log_notice("KEYS MATCHED at fault_seq_id: {}".format(fault_entry)) + helper_logger.log_notice("Action to be taken: {}".format(fault_entry['action'])) + break + except redis.exceptions.ConnectionError as exc: + helper_logger.log_notice('state DB currently unavailable: {}'.format(exc)) + + def analyze_db_events(self, event_obj, cnt): + # get events from event redisDB, analyze them + # Map each event against each of the fault policy + # once a match is found, take the needed action(s) + + global json_data + iter = 0 + + while True: + # perform lookup for the received event(fault) match in fault_action_policy_info dictionary + # (which is derived from fault_policy.json file) + # key is EVENT_TABLE| from the 'key' above + iter += 1 + helper_logger.log_notice('Map redisDB fault events to actions: Iter: {}'.format(iter)) + self.map_db_event_to_action(json_data) + # wait sometime before scanning for the events again in eventDB + time.sleep(25) + + def map_event_to_action(self, cnt): + # Initialising event consumer object + event_consume = threading.Event() + + # Start events' consumer thread to consume events from eventDB + thread_consume = threading.Thread(target=self.analyze_db_events, args=(event_consume, cnt)) + thread_consume.start() + helper_logger.log_notice("analyze_db_events THREAD started") + event_consume.wait(1) + event_consume.clear() + helper_logger.log_notice("event_consume clear through") + + def start_fault_manager_algorithm(self, cnt): + ''' + Start fault management algorithm + ''' + self._algorithm_running = True + helper_logger.log_notice("Entered start_fault_manager_algorithm...") + if self._algorithm_running and self._thermal_zone: + try: + helper_logger.log_notice("Initiating FM algorithm sub-tasks") + helper_logger.log_notice("Task1: spawning map_event_to_action THREAD") + self.map_event_to_action(cnt) + helper_logger.log_notice("Task2: main THREAD returned to its NORMAL course") + except: + self.stop_fault_manager_algorithm() + raise + + # primary logic to run fault management service + def run(self, cnt): + """ + Run main logic of this fault management service + :return: + """ + try: + if self: + helper_logger.log_notice("FM start_fault_manager_algorithm starting up...") + self.start_fault_manager_algorithm(cnt) + return True + except Exception as e: + helper_logger.log_error('Caught exception while executing FM run_policy() - {}'.format(repr(e))) + return False + +def main(): + + helper_logger.log_notice("FM (Fault Management) service starting up...") + + parser=argparse.ArgumentParser( + description="Check events published, receive and parse them") + parser.add_argument('-n', "--cnt", default=0, type=int, + help="count of events to receive") + args = parser.parse_args() + + # Instantiate an object of class faultManager + fault_mgr = faultManager(SYSLOG_IDENTIFIER) + + if not fault_mgr.run(args.cnt): + helper_logger.log_notice("Shutting down FM service with exit code ...") + fault_mgr.deinitialize() + helper_logger.log_notice("FM service exiting") + sys.exit(1) + +if __name__ == '__main__': + main() diff --git a/sonic-faultmgrd/scripts/faultpubd b/sonic-faultmgrd/scripts/faultpubd new file mode 100644 index 000000000..2c240c9f3 --- /dev/null +++ b/sonic-faultmgrd/scripts/faultpubd @@ -0,0 +1,389 @@ +#!/usr/bin/env python3 + +import os +import re +import yaml +import argparse + +import redis +from cisco.pacific.thermal.thermal_zone import thermal_zone +from sonic_py_common import daemon_base, multi_asic, logger +from swsscommon import swsscommon +from swsscommon.swsscommon import event_receive_op_t, event_receive +from swsscommon.swsscommon import events_init_subscriber, events_deinit_subscriber + +import sys +import syslog +import time +import threading +from enum import Enum +import subprocess +import uuid + +######################################################### +## fault_publisher: microservice running at Linux host ## +######################################################### + +SYSLOG_IDENTIFIER = 'faultPubd' +helper_logger = logger.Logger(SYSLOG_IDENTIFIER) + +ASYNC_CONN_WAIT = 0.3 +# In case of no event from publisher(s), event subscriber's +# max time (in ms) to wait before checking for the next event +RECEIVE_TIMEOUT = 1000 +rc_of_events_received = -1 +redisDB_event_count = 0 +publish_cnt = 0 + +# sonic-event.yang is the source of these tags (yet to be committed) +EVENTS_PUBLISHER_SOURCE = "sonic-event" +EVENTS_PUBLISHER_TAG = "EVENT" +EVENTS_PUBLISHER_ID = "{}:{}".format(EVENTS_PUBLISHER_SOURCE, EVENTS_PUBLISHER_TAG) +exp_event_params = { + "id": "0", + "resource": "fault-manager", + "text": "device, component FM test", + "time-created": "De19", + "type-id": "CUSTOM_EVPROFILE_CHANGE", + "severity": "MAJOR", +} + +######################################################################### +# 1. Presently, events (faults) are published by sonic-events*.yang files +# source location: /usr/local/yang-models/ +# 2. In near future, there would be a single events SCHEMA yang file +# named: sonic-event.yang +# 3. Subscribe to event-framework to recieve events +# 4. Look for the events periodically; as and when they are received, +# parse and process them; formulate an event (fault) entry locally +# 5. Add the event data as an entry in the new EVENT_TABLE of chassis +# redis-DB +# 6. Then fetch the event entry from this EVENT_TABLE, perform a lookup +# on event's id and severity in fault_action_policy_info file (table) +# to find a match. +# 7. Once a match is found, take the action(s) as specified by the match +######################################################################### + +class faultPublisher(): + # Interval to run this microservice + FM_INTERVAL = 15 + # microservice identifier + + _thermal_zone = None + _interval = 15 + + _card_type = None + _pi_slot_id = None + + _redis_chassis_db = None + _state_db = {} + _temperature_info_tbl = {} + _system_health_info_tbl = {} + TEMPERATURE_INFO_TABLE = "TEMPERATURE_INFO" + SYSTEM_HEALTH_INFO_TABLE = "SYSTEM_HEALTH_INFO" + + EVENT_ENTRY_KEY = 'EVENT_TABLE|{}' + + REDIS_DB_SERVERS = { + 'local': {'host': 'localhost', 'port': 6379}, + 'chassis': {'host': 'redis_chassis.server', 'port': 6380}} + + def access_redis_db(self): + ''' + Connect to all required tables for all corresponding namespace databases + ''' + for namespace in self.namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + # Connet to redis STATE_DB to fetch data from its tables + self._state_db[asic_id] = daemon_base.db_connect('STATE_DB', namespace) + self._temperature_info_tbl[asic_id] = swsscommon.Table(self._state_db[asic_id], self.TEMPERATURE_INFO_TABLE) + self._system_health_info_tbl[asic_id] = swsscommon.Table(self._state_db[asic_id], self.SYSTEM_HEALTH_INFO_TABLE) + + def cleanup_DB_events(self): + redisDB_event_count = 75 + while redisDB_event_count >= 0: + #self._redis_chassis_db.delete(self.EVENT_ENTRY_KEY.format('{}'.format('*'))) + self._redis_chassis_db.delete(self.EVENT_ENTRY_KEY.format('{}'.format(redisDB_event_count))) + helper_logger.log_notice('done deleting redisDB_event_count:{} entry from chassis redisDB'.format(redisDB_event_count)) + redisDB_event_count = redisDB_event_count - 1 + + def __init__(self, id): + """ + Initializer of faultPublisher microservice(process) + """ + super(faultPublisher, self).__init__() + + self.stop_event = threading.Event() + self.wait_time = self.FM_INTERVAL + + self._thermal_zone = thermal_zone(cfg_filename=os.path.join(os.path.sep, 'opt', 'cisco', 'etc', 'thermal_zone.yaml')) + helper_logger.log_notice("thermal_zone: {}".format(self._thermal_zone)) + self._interval = self._thermal_zone.interval + helper_logger.log_notice("interval: {}".format(self._interval)) + + # Select redis chassis server details, card type and slot id from chassis bootstrap details + bootstrap = self._thermal_zone.platform.bootstrap + helper_logger.log_notice("bootstrap: {}".format(bootstrap)) + + try: + self._card_type = bootstrap['card_type'].split(':')[0] + slot_id = bootstrap['slot_id'] + # Distributed platforms host a specific chassis DB server on the RP + # Fixed platforms do not need this, and just use the default local host + if bootstrap['platform_type'] == 'distributed': + # Convert slot_id from pd to pi + self._pi_slot_id = bootstrap['chassis']['{}_pd_to_pi'.format(self._card_type.lower())][slot_id] + server = self.REDIS_DB_SERVERS['chassis'] + else: + self._pi_slot_id = slot_id + server = self.REDIS_DB_SERVERS['local'] + except KeyError as exc: + raise Exception('Insufficient platform bootstrap data for DB key: {}'.format(exc)) + self._redis_chassis_db = redis.Redis( + host=server['host'], + port=server['port'], + decode_responses=True, + db=swsscommon.CHASSIS_STATE_DB) + + # Fetch namespaces based on number of ASICs on the chassis or the board + # For instance, on a multi-asic chassis or board (RP,LC): + # - Detected namespaces: ['asic0', 'asic1', 'asic2']; num_asics 3; asic_id_list [0, 1, 2] + # For single-asic chassis or board, + # - Detected namespaces: ['']; num_asics 1; asic_id_list [] + # Load the namespace details first from the database_global.json file + if multi_asic.is_multi_asic(): + swsscommon.SonicDBConfig.initializeGlobalConfig() + self.namespaces = multi_asic.get_front_end_namespaces() + + # Initialize redisDB accessor method + self.access_redis_db() + + #TODO : ensure if this intial cleanup is neededfault_action_policy_info + # cleanup all 'events' i.e. EVENT_TABLE entries in chassis redisDB + self.cleanup_DB_events() + + def stop_fault_publisher_algorithm(self): + ''' + Stop fault publisher algorithm + ''' + global redisDB_event_count + self._algorithm_running = False + + # Delete the EVENT_TABLE contents from chassis_redis_db + # cleanup all 'events' i.e. EVENT_TABLE entries in chassis redisDB + helper_logger.log_notice('redisDB_event_count:{} at almost done stage'.format(redisDB_event_count)) + self.cleanup_DB_events() + + def stop(self): + ''' + Stop fault publisher instance + ''' + self.stop_fault_publisher_algorithm() + self._running = False + + def deinitialize(self): + ''' + Destroy fault publisher instance + ''' + self.stop() + self._thermal_zone = None + self._redis_chassis_db = None + self._card_type = None + self._pi_slot_id = None + + self._state_db = {} + self._temperature_info_tbl, self._system_health_info_tbl = {}, {} + + def map_dict_fvm(self, s, d): + for k, v in s.items(): + d[k] = v + + def populate_events_to_redisDB(self, cnt): + # Populate redis (chassis) DB with fault event entries + # redis only stores string data, NoneType must be converted to 'None' string + global redisDB_event_count + event_id = 50 + while cnt > 0: + helper_logger.log_notice("Received event_id{}".format(event_id)) + data = {'id': str(event_id), 'resource': 'fault-manager', 'text': 'device, component FM test', 'time-created': 'Dec52023', 'type-id': 'CUSTOM_EVPROFILE_CHANGE', 'severity': 'CRITICAL', 'action': 'reload'} + key = self.EVENT_ENTRY_KEY.format('{}'.format(event_id)) + self._redis_chassis_db.hset(key, mapping=data) + redisDB_event_count += 1 + event_id += 1 + cnt = cnt - 1 + + def parse_rcvd_events(self, event_obj, cnt): + global rc_of_events_received + global redisDB_event_count + + # subscribe to the events (from the event-pubisher framework) + # EVENTS_PUBLISHER_SOURCE: sonic-events-host, sonic-events-swss, sonic-event etc. + # are 'module' under their respective sonic-events-*yang or sonic-event.yang file. + # EVENTS_PUBLISHER_TAG: event-disk, if-state, EVENT etc. are 'container' under their + # respective sonic-events-*yang or sonic-event.yang file. + sh = events_init_subscriber(False, RECEIVE_TIMEOUT, None) + helper_logger.log_notice("events_init_subscriber HANDLE: {}".format(sh)) + + # Sleep ASYNC_CONN_WAIT to ensure async connectivity is complete. + #time.sleep(ASYNC_CONN_WAIT) + + #exp_params = dict(exp_event_params) + + # Signal main thread that subscriber is ready to receive + event_obj.set() + + evt_rcvd_cntr = 0 + evt_rcvd_key_matched = 0 + + helper_logger.log_notice("event_receive counter init'ed to {}".format(evt_rcvd_cntr)) + + while True: + p = event_receive_op_t() + rc = event_receive(sh, p) + + helper_logger.log_notice("event_receive()'s iteration:{} rc:{}".format(evt_rcvd_cntr, rc)) + evt_rcvd_cntr = evt_rcvd_cntr + 1 + if rc > 0: + helper_logger.log_notice("rc:{} denotes no event reveived within timeout! continue to look for next event".format(rc)) + continue + elif rc != 0: + helper_logger.log_notice("rc:{} denotes failure! abort event_receive()".format(rc)) + break + + helper_logger.log_notice("Events publisher id - exp:{} vs rcvd:{}".format(EVENTS_PUBLISHER_ID, p.key)) + if p.key != EVENTS_PUBLISHER_ID: + # received a different EVENTS_PUBLISHER_ID than expected + helper_logger.log_notice("Events publisher id mismatch! continue to look for next event") + continue + else: + # Events publisher identier (source and tag) matched + helper_logger.log_notice("Events publisher id MATCHED. proceed...") + + # In order to process the received event, ensure event payload has mandatory fields in it + if "id" not in p.params or "type-id" not in p.params or "severity" not in p.params: + helper_logger.log_notice("key or mandatory fields missing in received event!") + continue + + rcvd_event_params = {} + self.map_dict_fvm(p.params, rcvd_event_params) + + event_dict = {} + for k, v in exp_event_params.items(): + if k in rcvd_event_params: + # add the key,value to event_dict dictionary which would then be populated as + # EVENT_TABLE entry into redisDB via hset + event_dict[k] = rcvd_event_params[k] + if (rcvd_event_params[k] != v): + helper_logger.log_notice("key:{} value rcvd:{} != exp:{}".format(k, rcvd_event_params[k], v)) + rc = -1 + else: + helper_logger.log_notice("key:{} value rcvd:{} MATCHES exp:{}".format(k, rcvd_event_params[k], v)) + if k == 'id': + # add 'id' field as key to the EVENT_TABLE entry + key = self.EVENT_ENTRY_KEY.format('{}'.format(int(rcvd_event_params[k]))) + helper_logger.log_notice(":event_dict KEY is:{} k:{} v:{}".format(key, k, rcvd_event_params[k])) + helper_logger.log_notice("dict updated:{}".format(event_dict)) + else: + helper_logger.log_notice("key:{} is missing in rcvd_event_params".format(k)) + rc = -1 + + if (rc != 0): + helper_logger.log_notice("rc:{} denotes missing key or params mismatch for the rcvd event! ignore it".format(rc)) + + if p.missed_cnt != 0: + helper_logger.log_notice("Expect missed_cnt {} == 0 {}/{}".format(p.missed_cnt, evt_rcvd_cntr, cnt)) + break + + if p.publish_epoch_ms == 0: + helper_logger.log_notice("Expect publish_epoch_ms != 0 {}/{}".format(evt_rcvd_cntr, cnt)) + break + + # populate event_dict entry as EVENT_TABLE entry into redisDB + self._redis_chassis_db.hset(key, mapping=event_dict) + redisDB_event_count += 1 + helper_logger.log_notice("In total, parsed {} event entries and populated them all to redisDB".format(redisDB_event_count)) + + if (evt_rcvd_cntr == cnt): + rc_of_events_received = 0 + else: + helper_logger.log_notice("received events further parsing aborted {}/{}".format(evt_rcvd_cntr, cnt)) + + # Signal main thread that subscriber thread is done + event_obj.set() + events_deinit_subscriber(sh) + + helper_logger.log_notice("Received {}/{}".format(evt_rcvd_cntr, cnt)) + + def receive_events(self, cnt): + # Initialising event subscriber object + event_sub = threading.Event() + + # Start events' subscriber thread + thread_sub = threading.Thread(target=self.parse_rcvd_events, args=(event_sub, cnt)) + thread_sub.start() + helper_logger.log_notice("parse_rcvd_events THREAD started") + + # Wait until subscriber thread completes the async subscription + # Any event published prior to that could get lost! + # Subscriber would wait for ASYNC_CONN_WAIT. Wait additional 200ms + # for signal from test_receiver as ready. + event_sub.wait(ASYNC_CONN_WAIT + 0.2) + helper_logger.log_notice("WAITED {} time prior to event_sub clear".format(ASYNC_CONN_WAIT+0.2)) + event_sub.clear() + helper_logger.log_notice("event_sub clear through") + + def start_fault_publisher_algorithm(self, cnt): + ''' + Start fault publisher algorithm + ''' + self._algorithm_running = True + helper_logger.log_notice("Entered start_fault_publisher_algorithm...") + if self._algorithm_running and self._thermal_zone: + try: + helper_logger.log_notice("Initiating FP algorithm sub-tasks") + helper_logger.log_notice("Task1: Adding {} FP test events to chassis_server_db...".format(cnt)) + self.populate_events_to_redisDB(cnt) + helper_logger.log_notice("Task2: Spawning parse_rcvd_events THREAD") + self.receive_events(cnt) + helper_logger.log_notice("Task3: main THREAD returned to its NORMAL course") + except: + # TODO + self.stop_fault_publisher_algorithm() + raise + + # primary logic to run FDR (Fault Detection & Reporting) as fault publishing service + def run(self, cnt): + """ + Run main logic of this fault publisher service + :return: + """ + try: + if self: + helper_logger.log_notice("FP start_fault_publisher_algorithm starting up...") + self.start_fault_publisher_algorithm(cnt) + return True + except Exception as e: + helper_logger.log_error('Caught exception while executing FM run_policy() - {}'.format(repr(e))) + return False + +def main(): + + helper_logger.log_notice("FDR (fault publisher) service starting up...") + + parser=argparse.ArgumentParser( + description="Check events published, receive and parse them") + parser.add_argument('-n', "--cnt", default=0, type=int, + help="count of events to receive") + args = parser.parse_args() + + # Instantiate an object of class faultPublisher + fault_pub = faultPublisher(SYSLOG_IDENTIFIER) + + if not fault_pub.run(args.cnt): + helper_logger.log_notice("Shutting down FDR service with exit code ...") + fault_pub.deinitialize() + helper_logger.log_notice("FDR service exiting") + sys.exit(1) + +if __name__ == '__main__': + main() From ae249154aecf3526c085f3901d3c8f0c6831e12b Mon Sep 17 00:00:00 2001 From: Shyam Kumar Date: Thu, 1 Feb 2024 15:05:09 -0800 Subject: [PATCH 2/5] Spawn Fault Publisher & Manager as micro-services - Added faultmgrd micro-service and timer service - Added faultpubd micro-service and timer service Signed-off-by: Shyam Kumar --- .../sonic-host-services-data.faultmgrd.service | 16 ++++++++++++++++ .../sonic-host-services-data.faultmgrd.timer | 11 +++++++++++ .../sonic-host-services-data.faultpubd.service | 16 ++++++++++++++++ .../sonic-host-services-data.faultpubd.timer | 11 +++++++++++ 4 files changed, 54 insertions(+) create mode 100644 sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.service create mode 100644 sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.timer create mode 100644 sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.service create mode 100644 sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.timer diff --git a/sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.service b/sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.service new file mode 100644 index 000000000..4b003aa76 --- /dev/null +++ b/sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.service @@ -0,0 +1,16 @@ +[Unit] +Description=Fault Manager daemon +Requires=updategraph.service +After=updategraph.service +BindsTo=sonic.target +After=sonic.target +After=database-chassis.service + +[Service] +Type=simple +ExecStart=/usr/local/bin/faultmgrd +Restart=always +RestartSec=30 + +[Install] +WantedBy=sonic.target diff --git a/sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.timer b/sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.timer new file mode 100644 index 000000000..dfd76ec72 --- /dev/null +++ b/sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.timer @@ -0,0 +1,11 @@ +[Unit] +Description=Delays faultmgrd daemon until SONiC, other services have started +PartOf=faultmgrd.service + +[Timer] +OnUnitActiveSec=0 sec +OnBootSec=1min 30 sec +Unit=hostcfgd.service + +[Install] +WantedBy=timers.target sonic.target diff --git a/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.service b/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.service new file mode 100644 index 000000000..f09b56e42 --- /dev/null +++ b/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.service @@ -0,0 +1,16 @@ +[Unit] +Description=Fault Publisher daemon +Requires=updategraph.service +After=updategraph.service +BindsTo=sonic.target +After=sonic.target +After=database-chassis.service + +[Service] +Type=simple +ExecStart=/usr/local/bin/faultpubd +Restart=always +RestartSec=30 + +[Install] +WantedBy=sonic.target diff --git a/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.timer b/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.timer new file mode 100644 index 000000000..5aad8bd3c --- /dev/null +++ b/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.timer @@ -0,0 +1,11 @@ +[Unit] +Description=Delays faultpubd daemon until SONiC, other services have started +PartOf=faultpubd.service + +[Timer] +OnUnitActiveSec=0 sec +OnBootSec=1min 30 sec +Unit=hostcfgd.service + +[Install] +WantedBy=timers.target sonic.target From fbcb980ff104941a60fbcad17131a02428e75625 Mon Sep 17 00:00:00 2001 From: Shyam Kumar Date: Thu, 8 Feb 2024 11:09:09 -0800 Subject: [PATCH 3/5] Updated FM infra sys reboot call with reboot cause - Determined reboot casue from the fault entry - Passed the reboot cause as an argument to system 'reboot' invocation - Updated the mechanism to fetch chassis type (fixed or modular) - Removed faultpubd micro-service and moved it out, as sonic FM HLD focuses on faultmgrd Signed-off-by: Shyam Kumar --- sonic-faultmgrd/scripts/faultpubd | 389 ------------------ ...sonic-host-services-data.faultpubd.service | 16 - .../sonic-host-services-data.faultpubd.timer | 11 - 3 files changed, 416 deletions(-) delete mode 100644 sonic-faultmgrd/scripts/faultpubd delete mode 100644 sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.service delete mode 100644 sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.timer diff --git a/sonic-faultmgrd/scripts/faultpubd b/sonic-faultmgrd/scripts/faultpubd deleted file mode 100644 index 2c240c9f3..000000000 --- a/sonic-faultmgrd/scripts/faultpubd +++ /dev/null @@ -1,389 +0,0 @@ -#!/usr/bin/env python3 - -import os -import re -import yaml -import argparse - -import redis -from cisco.pacific.thermal.thermal_zone import thermal_zone -from sonic_py_common import daemon_base, multi_asic, logger -from swsscommon import swsscommon -from swsscommon.swsscommon import event_receive_op_t, event_receive -from swsscommon.swsscommon import events_init_subscriber, events_deinit_subscriber - -import sys -import syslog -import time -import threading -from enum import Enum -import subprocess -import uuid - -######################################################### -## fault_publisher: microservice running at Linux host ## -######################################################### - -SYSLOG_IDENTIFIER = 'faultPubd' -helper_logger = logger.Logger(SYSLOG_IDENTIFIER) - -ASYNC_CONN_WAIT = 0.3 -# In case of no event from publisher(s), event subscriber's -# max time (in ms) to wait before checking for the next event -RECEIVE_TIMEOUT = 1000 -rc_of_events_received = -1 -redisDB_event_count = 0 -publish_cnt = 0 - -# sonic-event.yang is the source of these tags (yet to be committed) -EVENTS_PUBLISHER_SOURCE = "sonic-event" -EVENTS_PUBLISHER_TAG = "EVENT" -EVENTS_PUBLISHER_ID = "{}:{}".format(EVENTS_PUBLISHER_SOURCE, EVENTS_PUBLISHER_TAG) -exp_event_params = { - "id": "0", - "resource": "fault-manager", - "text": "device, component FM test", - "time-created": "De19", - "type-id": "CUSTOM_EVPROFILE_CHANGE", - "severity": "MAJOR", -} - -######################################################################### -# 1. Presently, events (faults) are published by sonic-events*.yang files -# source location: /usr/local/yang-models/ -# 2. In near future, there would be a single events SCHEMA yang file -# named: sonic-event.yang -# 3. Subscribe to event-framework to recieve events -# 4. Look for the events periodically; as and when they are received, -# parse and process them; formulate an event (fault) entry locally -# 5. Add the event data as an entry in the new EVENT_TABLE of chassis -# redis-DB -# 6. Then fetch the event entry from this EVENT_TABLE, perform a lookup -# on event's id and severity in fault_action_policy_info file (table) -# to find a match. -# 7. Once a match is found, take the action(s) as specified by the match -######################################################################### - -class faultPublisher(): - # Interval to run this microservice - FM_INTERVAL = 15 - # microservice identifier - - _thermal_zone = None - _interval = 15 - - _card_type = None - _pi_slot_id = None - - _redis_chassis_db = None - _state_db = {} - _temperature_info_tbl = {} - _system_health_info_tbl = {} - TEMPERATURE_INFO_TABLE = "TEMPERATURE_INFO" - SYSTEM_HEALTH_INFO_TABLE = "SYSTEM_HEALTH_INFO" - - EVENT_ENTRY_KEY = 'EVENT_TABLE|{}' - - REDIS_DB_SERVERS = { - 'local': {'host': 'localhost', 'port': 6379}, - 'chassis': {'host': 'redis_chassis.server', 'port': 6380}} - - def access_redis_db(self): - ''' - Connect to all required tables for all corresponding namespace databases - ''' - for namespace in self.namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - # Connet to redis STATE_DB to fetch data from its tables - self._state_db[asic_id] = daemon_base.db_connect('STATE_DB', namespace) - self._temperature_info_tbl[asic_id] = swsscommon.Table(self._state_db[asic_id], self.TEMPERATURE_INFO_TABLE) - self._system_health_info_tbl[asic_id] = swsscommon.Table(self._state_db[asic_id], self.SYSTEM_HEALTH_INFO_TABLE) - - def cleanup_DB_events(self): - redisDB_event_count = 75 - while redisDB_event_count >= 0: - #self._redis_chassis_db.delete(self.EVENT_ENTRY_KEY.format('{}'.format('*'))) - self._redis_chassis_db.delete(self.EVENT_ENTRY_KEY.format('{}'.format(redisDB_event_count))) - helper_logger.log_notice('done deleting redisDB_event_count:{} entry from chassis redisDB'.format(redisDB_event_count)) - redisDB_event_count = redisDB_event_count - 1 - - def __init__(self, id): - """ - Initializer of faultPublisher microservice(process) - """ - super(faultPublisher, self).__init__() - - self.stop_event = threading.Event() - self.wait_time = self.FM_INTERVAL - - self._thermal_zone = thermal_zone(cfg_filename=os.path.join(os.path.sep, 'opt', 'cisco', 'etc', 'thermal_zone.yaml')) - helper_logger.log_notice("thermal_zone: {}".format(self._thermal_zone)) - self._interval = self._thermal_zone.interval - helper_logger.log_notice("interval: {}".format(self._interval)) - - # Select redis chassis server details, card type and slot id from chassis bootstrap details - bootstrap = self._thermal_zone.platform.bootstrap - helper_logger.log_notice("bootstrap: {}".format(bootstrap)) - - try: - self._card_type = bootstrap['card_type'].split(':')[0] - slot_id = bootstrap['slot_id'] - # Distributed platforms host a specific chassis DB server on the RP - # Fixed platforms do not need this, and just use the default local host - if bootstrap['platform_type'] == 'distributed': - # Convert slot_id from pd to pi - self._pi_slot_id = bootstrap['chassis']['{}_pd_to_pi'.format(self._card_type.lower())][slot_id] - server = self.REDIS_DB_SERVERS['chassis'] - else: - self._pi_slot_id = slot_id - server = self.REDIS_DB_SERVERS['local'] - except KeyError as exc: - raise Exception('Insufficient platform bootstrap data for DB key: {}'.format(exc)) - self._redis_chassis_db = redis.Redis( - host=server['host'], - port=server['port'], - decode_responses=True, - db=swsscommon.CHASSIS_STATE_DB) - - # Fetch namespaces based on number of ASICs on the chassis or the board - # For instance, on a multi-asic chassis or board (RP,LC): - # - Detected namespaces: ['asic0', 'asic1', 'asic2']; num_asics 3; asic_id_list [0, 1, 2] - # For single-asic chassis or board, - # - Detected namespaces: ['']; num_asics 1; asic_id_list [] - # Load the namespace details first from the database_global.json file - if multi_asic.is_multi_asic(): - swsscommon.SonicDBConfig.initializeGlobalConfig() - self.namespaces = multi_asic.get_front_end_namespaces() - - # Initialize redisDB accessor method - self.access_redis_db() - - #TODO : ensure if this intial cleanup is neededfault_action_policy_info - # cleanup all 'events' i.e. EVENT_TABLE entries in chassis redisDB - self.cleanup_DB_events() - - def stop_fault_publisher_algorithm(self): - ''' - Stop fault publisher algorithm - ''' - global redisDB_event_count - self._algorithm_running = False - - # Delete the EVENT_TABLE contents from chassis_redis_db - # cleanup all 'events' i.e. EVENT_TABLE entries in chassis redisDB - helper_logger.log_notice('redisDB_event_count:{} at almost done stage'.format(redisDB_event_count)) - self.cleanup_DB_events() - - def stop(self): - ''' - Stop fault publisher instance - ''' - self.stop_fault_publisher_algorithm() - self._running = False - - def deinitialize(self): - ''' - Destroy fault publisher instance - ''' - self.stop() - self._thermal_zone = None - self._redis_chassis_db = None - self._card_type = None - self._pi_slot_id = None - - self._state_db = {} - self._temperature_info_tbl, self._system_health_info_tbl = {}, {} - - def map_dict_fvm(self, s, d): - for k, v in s.items(): - d[k] = v - - def populate_events_to_redisDB(self, cnt): - # Populate redis (chassis) DB with fault event entries - # redis only stores string data, NoneType must be converted to 'None' string - global redisDB_event_count - event_id = 50 - while cnt > 0: - helper_logger.log_notice("Received event_id{}".format(event_id)) - data = {'id': str(event_id), 'resource': 'fault-manager', 'text': 'device, component FM test', 'time-created': 'Dec52023', 'type-id': 'CUSTOM_EVPROFILE_CHANGE', 'severity': 'CRITICAL', 'action': 'reload'} - key = self.EVENT_ENTRY_KEY.format('{}'.format(event_id)) - self._redis_chassis_db.hset(key, mapping=data) - redisDB_event_count += 1 - event_id += 1 - cnt = cnt - 1 - - def parse_rcvd_events(self, event_obj, cnt): - global rc_of_events_received - global redisDB_event_count - - # subscribe to the events (from the event-pubisher framework) - # EVENTS_PUBLISHER_SOURCE: sonic-events-host, sonic-events-swss, sonic-event etc. - # are 'module' under their respective sonic-events-*yang or sonic-event.yang file. - # EVENTS_PUBLISHER_TAG: event-disk, if-state, EVENT etc. are 'container' under their - # respective sonic-events-*yang or sonic-event.yang file. - sh = events_init_subscriber(False, RECEIVE_TIMEOUT, None) - helper_logger.log_notice("events_init_subscriber HANDLE: {}".format(sh)) - - # Sleep ASYNC_CONN_WAIT to ensure async connectivity is complete. - #time.sleep(ASYNC_CONN_WAIT) - - #exp_params = dict(exp_event_params) - - # Signal main thread that subscriber is ready to receive - event_obj.set() - - evt_rcvd_cntr = 0 - evt_rcvd_key_matched = 0 - - helper_logger.log_notice("event_receive counter init'ed to {}".format(evt_rcvd_cntr)) - - while True: - p = event_receive_op_t() - rc = event_receive(sh, p) - - helper_logger.log_notice("event_receive()'s iteration:{} rc:{}".format(evt_rcvd_cntr, rc)) - evt_rcvd_cntr = evt_rcvd_cntr + 1 - if rc > 0: - helper_logger.log_notice("rc:{} denotes no event reveived within timeout! continue to look for next event".format(rc)) - continue - elif rc != 0: - helper_logger.log_notice("rc:{} denotes failure! abort event_receive()".format(rc)) - break - - helper_logger.log_notice("Events publisher id - exp:{} vs rcvd:{}".format(EVENTS_PUBLISHER_ID, p.key)) - if p.key != EVENTS_PUBLISHER_ID: - # received a different EVENTS_PUBLISHER_ID than expected - helper_logger.log_notice("Events publisher id mismatch! continue to look for next event") - continue - else: - # Events publisher identier (source and tag) matched - helper_logger.log_notice("Events publisher id MATCHED. proceed...") - - # In order to process the received event, ensure event payload has mandatory fields in it - if "id" not in p.params or "type-id" not in p.params or "severity" not in p.params: - helper_logger.log_notice("key or mandatory fields missing in received event!") - continue - - rcvd_event_params = {} - self.map_dict_fvm(p.params, rcvd_event_params) - - event_dict = {} - for k, v in exp_event_params.items(): - if k in rcvd_event_params: - # add the key,value to event_dict dictionary which would then be populated as - # EVENT_TABLE entry into redisDB via hset - event_dict[k] = rcvd_event_params[k] - if (rcvd_event_params[k] != v): - helper_logger.log_notice("key:{} value rcvd:{} != exp:{}".format(k, rcvd_event_params[k], v)) - rc = -1 - else: - helper_logger.log_notice("key:{} value rcvd:{} MATCHES exp:{}".format(k, rcvd_event_params[k], v)) - if k == 'id': - # add 'id' field as key to the EVENT_TABLE entry - key = self.EVENT_ENTRY_KEY.format('{}'.format(int(rcvd_event_params[k]))) - helper_logger.log_notice(":event_dict KEY is:{} k:{} v:{}".format(key, k, rcvd_event_params[k])) - helper_logger.log_notice("dict updated:{}".format(event_dict)) - else: - helper_logger.log_notice("key:{} is missing in rcvd_event_params".format(k)) - rc = -1 - - if (rc != 0): - helper_logger.log_notice("rc:{} denotes missing key or params mismatch for the rcvd event! ignore it".format(rc)) - - if p.missed_cnt != 0: - helper_logger.log_notice("Expect missed_cnt {} == 0 {}/{}".format(p.missed_cnt, evt_rcvd_cntr, cnt)) - break - - if p.publish_epoch_ms == 0: - helper_logger.log_notice("Expect publish_epoch_ms != 0 {}/{}".format(evt_rcvd_cntr, cnt)) - break - - # populate event_dict entry as EVENT_TABLE entry into redisDB - self._redis_chassis_db.hset(key, mapping=event_dict) - redisDB_event_count += 1 - helper_logger.log_notice("In total, parsed {} event entries and populated them all to redisDB".format(redisDB_event_count)) - - if (evt_rcvd_cntr == cnt): - rc_of_events_received = 0 - else: - helper_logger.log_notice("received events further parsing aborted {}/{}".format(evt_rcvd_cntr, cnt)) - - # Signal main thread that subscriber thread is done - event_obj.set() - events_deinit_subscriber(sh) - - helper_logger.log_notice("Received {}/{}".format(evt_rcvd_cntr, cnt)) - - def receive_events(self, cnt): - # Initialising event subscriber object - event_sub = threading.Event() - - # Start events' subscriber thread - thread_sub = threading.Thread(target=self.parse_rcvd_events, args=(event_sub, cnt)) - thread_sub.start() - helper_logger.log_notice("parse_rcvd_events THREAD started") - - # Wait until subscriber thread completes the async subscription - # Any event published prior to that could get lost! - # Subscriber would wait for ASYNC_CONN_WAIT. Wait additional 200ms - # for signal from test_receiver as ready. - event_sub.wait(ASYNC_CONN_WAIT + 0.2) - helper_logger.log_notice("WAITED {} time prior to event_sub clear".format(ASYNC_CONN_WAIT+0.2)) - event_sub.clear() - helper_logger.log_notice("event_sub clear through") - - def start_fault_publisher_algorithm(self, cnt): - ''' - Start fault publisher algorithm - ''' - self._algorithm_running = True - helper_logger.log_notice("Entered start_fault_publisher_algorithm...") - if self._algorithm_running and self._thermal_zone: - try: - helper_logger.log_notice("Initiating FP algorithm sub-tasks") - helper_logger.log_notice("Task1: Adding {} FP test events to chassis_server_db...".format(cnt)) - self.populate_events_to_redisDB(cnt) - helper_logger.log_notice("Task2: Spawning parse_rcvd_events THREAD") - self.receive_events(cnt) - helper_logger.log_notice("Task3: main THREAD returned to its NORMAL course") - except: - # TODO - self.stop_fault_publisher_algorithm() - raise - - # primary logic to run FDR (Fault Detection & Reporting) as fault publishing service - def run(self, cnt): - """ - Run main logic of this fault publisher service - :return: - """ - try: - if self: - helper_logger.log_notice("FP start_fault_publisher_algorithm starting up...") - self.start_fault_publisher_algorithm(cnt) - return True - except Exception as e: - helper_logger.log_error('Caught exception while executing FM run_policy() - {}'.format(repr(e))) - return False - -def main(): - - helper_logger.log_notice("FDR (fault publisher) service starting up...") - - parser=argparse.ArgumentParser( - description="Check events published, receive and parse them") - parser.add_argument('-n', "--cnt", default=0, type=int, - help="count of events to receive") - args = parser.parse_args() - - # Instantiate an object of class faultPublisher - fault_pub = faultPublisher(SYSLOG_IDENTIFIER) - - if not fault_pub.run(args.cnt): - helper_logger.log_notice("Shutting down FDR service with exit code ...") - fault_pub.deinitialize() - helper_logger.log_notice("FDR service exiting") - sys.exit(1) - -if __name__ == '__main__': - main() diff --git a/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.service b/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.service deleted file mode 100644 index f09b56e42..000000000 --- a/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.service +++ /dev/null @@ -1,16 +0,0 @@ -[Unit] -Description=Fault Publisher daemon -Requires=updategraph.service -After=updategraph.service -BindsTo=sonic.target -After=sonic.target -After=database-chassis.service - -[Service] -Type=simple -ExecStart=/usr/local/bin/faultpubd -Restart=always -RestartSec=30 - -[Install] -WantedBy=sonic.target diff --git a/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.timer b/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.timer deleted file mode 100644 index 5aad8bd3c..000000000 --- a/sonic-faultmgrd/scripts/sonic-host-services-data.faultpubd.timer +++ /dev/null @@ -1,11 +0,0 @@ -[Unit] -Description=Delays faultpubd daemon until SONiC, other services have started -PartOf=faultpubd.service - -[Timer] -OnUnitActiveSec=0 sec -OnBootSec=1min 30 sec -Unit=hostcfgd.service - -[Install] -WantedBy=timers.target sonic.target From b2ab17fe99fe4a2b50b3e993095963d7757b5ee1 Mon Sep 17 00:00:00 2001 From: Shyam Kumar Date: Thu, 8 Feb 2024 11:25:01 -0800 Subject: [PATCH 4/5] Updated FM infra sys reboot call with reboot cause - Determined reboot casue from the fault entry - Passed the reboot cause as an argument to system 'reboot' invocation - Updated the mechanism to fetch chassis type (fixed or modular) - Removed faultpubd micro-service and moved it out, as sonic FM HLD focuses on the Fault Management via faultmgrd Signed-off-by: Shyam Kumar --- sonic-faultmgrd/scripts/faultmgrd | 84 ++++++++----------------------- 1 file changed, 22 insertions(+), 62 deletions(-) diff --git a/sonic-faultmgrd/scripts/faultmgrd b/sonic-faultmgrd/scripts/faultmgrd index 2fcc5097f..ae2f83872 100755 --- a/sonic-faultmgrd/scripts/faultmgrd +++ b/sonic-faultmgrd/scripts/faultmgrd @@ -7,8 +7,7 @@ import argparse import json import redis -from cisco.pacific.thermal.thermal_zone import thermal_zone -from sonic_py_common import daemon_base, multi_asic, logger +from sonic_py_common import daemon_base, logger from swsscommon import swsscommon import sys @@ -58,18 +57,9 @@ class faultManager(): FM_POLICY_FILE = '/usr/share/sonic/platform/fault_policy.json' # microservice identifier - _thermal_zone = None _interval = 15 - _card_type = None - _pi_slot_id = None - _redis_chassis_db = None - _state_db = {} - _temperature_info_tbl = {} - _system_health_info_tbl = {} - TEMPERATURE_INFO_TABLE = "TEMPERATURE_INFO" - SYSTEM_HEALTH_INFO_TABLE = "SYSTEM_HEALTH_INFO" EVENT_ENTRY_KEY = 'EVENT_TABLE|{}' @@ -77,17 +67,6 @@ class faultManager(): 'local': {'host': 'localhost', 'port': 6379}, 'chassis': {'host': 'redis_chassis.server', 'port': 6380}} - def access_redis_db(self): - ''' - Connect to all required tables for all corresponding namespace databases - ''' - for namespace in self.namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - # Connet to redis STATE_DB to fetch data from its tables - self._state_db[asic_id] = daemon_base.db_connect('STATE_DB', namespace) - self._temperature_info_tbl[asic_id] = swsscommon.Table(self._state_db[asic_id], self.TEMPERATURE_INFO_TABLE) - self._system_health_info_tbl[asic_id] = swsscommon.Table(self._state_db[asic_id], self.SYSTEM_HEALTH_INFO_TABLE) - def determine_fault_action_policies(self): global json_data try: @@ -127,53 +106,29 @@ class faultManager(): self.stop_event = threading.Event() self.wait_time = self.FM_INTERVAL - - self._thermal_zone = thermal_zone(cfg_filename=os.path.join(os.path.sep, 'opt', 'cisco', 'etc', 'thermal_zone.yaml')) - helper_logger.log_notice("thermal_zone: {}".format(self._thermal_zone)) - self._interval = self._thermal_zone.interval - helper_logger.log_notice("interval: {}".format(self._interval)) - - # Select redis chassis server details, card type and slot id from chassis bootstrap details - bootstrap = self._thermal_zone.platform.bootstrap - helper_logger.log_notice("bootstrap: {}".format(bootstrap)) + chassis_type = 'fixed' try: - self._card_type = bootstrap['card_type'].split(':')[0] - slot_id = bootstrap['slot_id'] - # Distributed platforms host a specific chassis DB server on the RP + # Modular chassis (i.e. RP (SUPervisor) with LC, FC modules) host a specific + # chassis DB server on the RP. # Fixed platforms do not need this, and just use the default local host - if bootstrap['platform_type'] == 'distributed': - # Convert slot_id from pd to pi - self._pi_slot_id = bootstrap['chassis']['{}_pd_to_pi'.format(self._card_type.lower())][slot_id] - server = self.REDIS_DB_SERVERS['chassis'] - else: - self._pi_slot_id = slot_id + if chassis_type == 'fixed': server = self.REDIS_DB_SERVERS['local'] + else: + server = self.REDIS_DB_SERVERS['chassis'] except KeyError as exc: - raise Exception('Insufficient platform bootstrap data for DB key: {}'.format(exc)) + raise Exception('chassis (platform) type undefined! key: {}'.format(exc)) + self._redis_chassis_db = redis.Redis( host=server['host'], port=server['port'], decode_responses=True, db=swsscommon.CHASSIS_STATE_DB) - # Fetch namespaces based on number of ASICs on the chassis or the board - # For instance, on a multi-asic chassis or board (RP,LC): - # - Detected namespaces: ['asic0', 'asic1', 'asic2']; num_asics 3; asic_id_list [0, 1, 2] - # For single-asic chassis or board, - # - Detected namespaces: ['']; num_asics 1; asic_id_list [] - # Load the namespace details first from the database_global.json file - if multi_asic.is_multi_asic(): - swsscommon.SonicDBConfig.initializeGlobalConfig() - self.namespaces = multi_asic.get_front_end_namespaces() - - # Initialize redisDB accessor method - self.access_redis_db() - # Fault-Action policies self.determine_fault_action_policies() - #TODO : ensure if this intial cleanup is neededfault_action_policy_info + #TODO : ensure if this intial cleanup is needed # cleanup all 'events' i.e. EVENT_TABLE entries in chassis redisDB self.cleanup_DB_events() @@ -201,13 +156,8 @@ class faultManager(): Destroy fault manager instance ''' self.stop() - self._thermal_zone = None self._redis_chassis_db = None - self._card_type = None - self._pi_slot_id = None - self._state_db = {} - self._temperature_info_tbl, self._system_health_info_tbl = {}, {} def map_dict_fvm(self, s, d): for k, v in s.items(): @@ -242,7 +192,17 @@ class faultManager(): for fault_entry in entry['faults']: if ((fault_entry['type'] == evdata['type-id']) and (fault_entry['severity'] == evdata['severity'])): helper_logger.log_notice("KEYS MATCHED at fault_seq_id: {}".format(fault_entry)) - helper_logger.log_notice("Action to be taken: {}".format(fault_entry['action'])) + helper_logger.log_notice("Fault type:{}, cause:{}, severity:{}, occured@:{}".format(evdata['type-id'], evdata['text'], evdata['severity'], evdata['time-created'])) + helper_logger.log_notice("Action(s) to be taken: {}".format(fault_entry['action'])) + for action in fault_entry['action']: + if (action == 'reboot'): + helper_logger.log_notice("Initiate system reboot with cause: {}".format(evdata['text'])) + os.system('reboot -r "{}"'.format(evdata['text'])) + if (action == 'obfl'): + helper_logger.log_notice("log to OBFL: Fault type:{}, cause:{}, severity:{}, occured@:{}".format(evdata['type-id'], evdata['text'], evdata['severity'], evdata['time-created'])) + # TBA (To Be Added) once obfl filesystem support is in place + #if (action == 'syslog'): + # Already log above so no further action break except redis.exceptions.ConnectionError as exc: helper_logger.log_notice('state DB currently unavailable: {}'.format(exc)) @@ -283,7 +243,7 @@ class faultManager(): ''' self._algorithm_running = True helper_logger.log_notice("Entered start_fault_manager_algorithm...") - if self._algorithm_running and self._thermal_zone: + if self._algorithm_running: try: helper_logger.log_notice("Initiating FM algorithm sub-tasks") helper_logger.log_notice("Task1: spawning map_event_to_action THREAD") From 49e9d66d783353b7c25b5dac2261a7db3d549d1c Mon Sep 17 00:00:00 2001 From: Shyam Kumar Date: Tue, 13 Feb 2024 13:46:41 -0800 Subject: [PATCH 5/5] Enhanced FM flow w.r.t redisDB interface - Changed redisDB interface to state DB at global (host) - Updated FM communication with redis DB to subscriber model and listening to DB's EVENT_TABLE SET and DEL operations - Misc cleanup Signed-off-by: Shyam Kumar --- sonic-faultmgrd/scripts/faultmgrd | 280 +++++++++++++++--------------- 1 file changed, 141 insertions(+), 139 deletions(-) diff --git a/sonic-faultmgrd/scripts/faultmgrd b/sonic-faultmgrd/scripts/faultmgrd index ae2f83872..51200333a 100755 --- a/sonic-faultmgrd/scripts/faultmgrd +++ b/sonic-faultmgrd/scripts/faultmgrd @@ -21,83 +21,73 @@ import uuid ##################################### ## Host microservice fault_manager ## ##################################### - SYSLOG_IDENTIFIER = 'faultMgrd' helper_logger = logger.Logger(SYSLOG_IDENTIFIER) - -ASYNC_CONN_WAIT = 0.3 -# In case of no event from publisher(s), event subscriber's -# max time (in ms) to wait before checking for the next event -RECEIVE_TIMEOUT = 1000 -rc_of_events_received = -1 -redisDB_event_count = 0 -publish_cnt = 0 +SELECT_TIMEOUT = 1000 json_data = {} -######################################################################### -# 1. Presently, events (faults) are published by sonic-events*.yang files +############################################################################## +# Purpose: +######### +# Have a generic Fault Management (FM) infrastructure which can get/fetch all +# live system events (e.g. faults) and process them against the fault-action +# policy file to determine the right set of system-level action(s) needed. +# Then, perform those action(s). +# +# SONiC system's present state: +############################## +# 1. FDR (Fault Detector and Reporter) can publish events (faults) via event +# framework's event_publish() based on specific sonic-events*.yang file. # source location: /usr/local/yang-models/ -# 2. In near future, there would be a single events SCHEMA yang file -# named: sonic-event.yang -# 3. Subscribe to event-framework to recieve events -# 4. Look for the events periodically; as and when they are received, -# parse and process them; formulate an event (fault) entry locally -# 5. Add the event data as an entry in the new EVENT_TABLE of chassis -# redis-DB -# 6. Then fetch the event entry from this EVENT_TABLE, perform a lookup -# on event's id and severity in fault__policy info file (table) -# to find a match. -# 7. Once a match is found, take the action(s) as specified by the match -######################################################################### +# 2. In absence of eventDB at event-framework, events' consumer to listen to +# live streaming of events over ZMQ, and then parse them in real-time. +# In near future: +################ +# a. #1 above would have a generic event SCHEMA yang file sonic-event.yang +# for FDRs to publish their events via event-framework +# b. #2 would have eventDB to capture live events. This FM service can then +# subscriber to eventDB to get all those live events and then parse them +# to take needed action(s) +# Plan (this workflow): +###################### +# - Due to #2, fault_publisher (faultpubd) service populate events as +# EVENT_TABLE entry in host's STATE redisDB +# - FM subscribes to EVENT_TABLE of host's STATE DB to get the events. +# - Process them on SET and DEL EVENT_TABLE operations +# a) Parse them against sonic-event.yang file and formulate a local unique +# event (fault) entry for each notified EVENT_TABLE entry +# b) This entry contains entire fault payload (id, type, severity, cause, +# operation etc.) +# c) Parse the fault entry against the fault-action policy file to assess +# action(s) needed i.e. perform lookup for fault type and severity in +# policy file. +# d) Once a match is found, take the action(s) as specified by the match +############################################################################# class faultManager(): # Interval to run this microservice FM_INTERVAL = 15 # Policy file defining fault policies and their respective actions FM_POLICY_FILE = '/usr/share/sonic/platform/fault_policy.json' - # microservice identifier - - _interval = 15 - _redis_chassis_db = None - - EVENT_ENTRY_KEY = 'EVENT_TABLE|{}' - - REDIS_DB_SERVERS = { - 'local': {'host': 'localhost', 'port': 6379}, - 'chassis': {'host': 'redis_chassis.server', 'port': 6380}} + state_db = None def determine_fault_action_policies(self): global json_data try: # Load, parse fault_policy json file - helper_logger.log_notice("INIT data from JSON file: {}".format(json_data)) + helper_logger.log_notice("Populating data from JSON file:{}".format(json_data)) with open('./fault_policy.json', 'r') as f: json_data = json.load(f) - helper_logger.log_notice("data from JSON file: {}".format(json_data)) - helper_logger.log_notice("chassis name: {}".format(json_data['chassis'])) + helper_logger.log_notice("Populated data from JSON file:{}".format(json_data)) for entry in json_data['chassis']: - helper_logger.log_notice("chassis entry: {}".format(entry)) - helper_logger.log_notice("each entry name: {}".format(entry['name'])) - helper_logger.log_notice("each entry faults list: {}".format(entry['faults'])) + helper_logger.log_notice("chassis entry fetched: {}".format(entry)) for fault_entry in entry['faults']: - helper_logger.log_notice("TRAVERSING through fault entry: {}".format(fault_entry)) - helper_logger.log_notice("TRAVERSING inside each fault entry TYPE : {}".format(fault_entry['type'])) - helper_logger.log_notice("TRAVERSING inside each fault entry SEVERITY: {}".format(fault_entry['severity'])) - helper_logger.log_notice("TRAVERSING inside each fault entry ACTION: {}".format(fault_entry['action'])) - + helper_logger.log_notice("fault entry fetched: {}".format(fault_entry)) #TODO: correct this error name except redis.exceptions.ConnectionError as exc: helper_logger.log_notice('issue with opening JSON policy file or parsing its contents: {}'.format(exc)) - def cleanup_DB_events(self): - redisDB_event_count = 75 - while redisDB_event_count >= 0: - #self._redis_chassis_db.delete(self.EVENT_ENTRY_KEY.format('{}'.format('*'))) - self._redis_chassis_db.delete(self.EVENT_ENTRY_KEY.format('{}'.format(redisDB_event_count))) - helper_logger.log_notice('done deleting redisDB_event_count:{} entry from chassis redisDB'.format(redisDB_event_count)) - redisDB_event_count = redisDB_event_count - 1 - def __init__(self, id): """ Initializer of faultManager microservice(process) @@ -105,45 +95,16 @@ class faultManager(): super(faultManager, self).__init__() self.stop_event = threading.Event() - self.wait_time = self.FM_INTERVAL - chassis_type = 'fixed' - - try: - # Modular chassis (i.e. RP (SUPervisor) with LC, FC modules) host a specific - # chassis DB server on the RP. - # Fixed platforms do not need this, and just use the default local host - if chassis_type == 'fixed': - server = self.REDIS_DB_SERVERS['local'] - else: - server = self.REDIS_DB_SERVERS['chassis'] - except KeyError as exc: - raise Exception('chassis (platform) type undefined! key: {}'.format(exc)) - - self._redis_chassis_db = redis.Redis( - host=server['host'], - port=server['port'], - decode_responses=True, - db=swsscommon.CHASSIS_STATE_DB) # Fault-Action policies self.determine_fault_action_policies() - #TODO : ensure if this intial cleanup is needed - # cleanup all 'events' i.e. EVENT_TABLE entries in chassis redisDB - self.cleanup_DB_events() - def stop_fault_manager_algorithm(self): ''' Stop fault manager algorithm ''' - global redisDB_event_count self._algorithm_running = False - # Delete the EVENT_TABLE contents from chassis_redis_db - # cleanup all 'events' i.e. EVENT_TABLE entries in chassis redisDB - helper_logger.log_notice('redisDB_event_count:{} at almost done stage'.format(redisDB_event_count)) - self.cleanup_DB_events() - def stop(self): ''' Stop fault manager instance @@ -156,74 +117,115 @@ class faultManager(): Destroy fault manager instance ''' self.stop() - self._redis_chassis_db = None + self.state_db = None def map_dict_fvm(self, s, d): for k, v in s.items(): d[k] = v - def map_db_event_to_action(self, json_data): - # processing events from redisDB table + def analyze_db_events(self, event_obj, cnt): + + # Get events from EVENT_TABLE of stateDB and analyze them. + # Map each event against each of the fault policy. + # Once a match is found, take the needed action(s). + + global json_data + + # Connect to redis state DB + self.state_db = daemon_base.db_connect("STATE_DB") + # Subscribe to EVENT_TABLE notifications in state DB + sel = swsscommon.Select() + sst = swsscommon.SubscriberStateTable(self.state_db, 'EVENT_TABLE') + sst.db_name = self.state_db + sst.table_name = 'EVENT_TABLE' + sel.addSelectable(sst) + + # Listen indefinitely for changes to the EVENT_TABLE in state DB + event_entry_cache = {} + while True: + # Use timeout to prevent ignoring the signals to be handled here + # in signal handler() (e.g. SIGTERM for graceful shutdown) + (state, c) = sel.select(SELECT_TIMEOUT) + + if state == swsscommon.Select.TIMEOUT: + # Do not flood log when select times out + continue + if state != swsscommon.Select.OBJECT: + helper_logger.log_notice("sel.select() did not return swsscommon.Select.OBJECT") + continue + + # pop the data updates + (key, op, fvp) = sst.pop() + if not key: + break + fvp = dict(fvp) if fvp is not None else {} + helper_logger.log_notice("$$$ {} handle_event_table_updates() : op={} DB:{} Table:{} fvp {}".format( + key, op, sst.db_name, sst.table_name, fvp)) + + if 'id' not in fvp: + helper_logger.log_notice("alert - id field not found in received event_table entry!, setting to key in EVENT_TABLE|{}".format(key)) + fvp['id'] = key + helper_logger.log_notice("event id:fvp[{}]".format(fvp['id'])) + #fvp['key'] = key + fvp['op'] = op + if op == swsscommon.SET_COMMAND: + # save the received event_table entry, along with DB operation + event_entry_cache[key] = fvp + helper_logger.log_notice("event_entry_cache[{}]".format(event_entry_cache[key])) + helper_logger.log_notice("event_entry_cache after adding the new event_table entry with key:{} {}".format((key), event_entry_cache)) + helper_logger.log_notice('Map state DB EVENT_TABLE|{} fault entry to FM action...'.format(key)) + self.map_db_event_to_action(key, event_entry_cache, json_data) + elif op == swsscommon.DEL_COMMAND: + # remove the received event_table entry from the local cache + del event_entry_cache[key] + helper_logger.log_notice("event_entry_cache after del of tuple with key:{} {}".format((key), event_entry_cache)) + # Alternate way: + # deltuple = event_entry_cache.pop(key, "No key found") + # helper_logger.log_notice("tuple removed: {}".format(deltuple)) + + def map_db_event_to_action(self, key, event_entry_cache, json_data): + # processing fault event received in event_entry ''' - Retrieve events (faults) data from the redisDB and parse them against fault_policy JSON file + Retrieve fault data from the event_entry and parse them against fault_policy JSON file ''' try: - for key in self._redis_chassis_db.keys(pattern=self.EVENT_ENTRY_KEY.format('*')): - helper_logger.log_notice("KEY fetched: {} from EVENT_TABLE of chassis redisDB".format(key)) - event_data = {} - evdata = {} - data = self._redis_chassis_db.hgetall(key) - evdata['id'] = int(data['id']) - evdata['resource'] = data['resource'] - evdata['text'] = data['text'] - evdata['time-created'] = data['time-created'] - evdata['type-id'] = data['type-id'] - evdata['severity'] = data['severity'] - helper_logger.log_notice("Updating evdata[id]:{}".format(evdata['id'])) - #helper_logger.log_notice("KEY evdata[id]:{}".format(key[evdata['id']])) - event_data[evdata['id']] = evdata - helper_logger.log_notice("Received event: {} from chassis redisDB EVENT_TABLE".format(event_data[evdata['id']])) - helper_logger.log_notice(" event_data DICT from chassis redisDB EVENT_TABLE: {}".format(event_data)) - - # Iterate through the fault_action_policy_info dictionary to find a match for the received event (fault). - # This dictionary is derived from fault_action_policy_info json file. - for entry in json_data['chassis']: - for fault_entry in entry['faults']: - if ((fault_entry['type'] == evdata['type-id']) and (fault_entry['severity'] == evdata['severity'])): - helper_logger.log_notice("KEYS MATCHED at fault_seq_id: {}".format(fault_entry)) - helper_logger.log_notice("Fault type:{}, cause:{}, severity:{}, occured@:{}".format(evdata['type-id'], evdata['text'], evdata['severity'], evdata['time-created'])) - helper_logger.log_notice("Action(s) to be taken: {}".format(fault_entry['action'])) - for action in fault_entry['action']: - if (action == 'reboot'): - helper_logger.log_notice("Initiate system reboot with cause: {}".format(evdata['text'])) - os.system('reboot -r "{}"'.format(evdata['text'])) - if (action == 'obfl'): - helper_logger.log_notice("log to OBFL: Fault type:{}, cause:{}, severity:{}, occured@:{}".format(evdata['type-id'], evdata['text'], evdata['severity'], evdata['time-created'])) - # TBA (To Be Added) once obfl filesystem support is in place - #if (action == 'syslog'): - # Already log above so no further action - break + helper_logger.log_notice("Parse event_entry with key:{} {}".format(key, event_entry_cache[key])) + event_entry = {} + event_entry = event_entry_cache[key] + helper_logger.log_notice("Populated event_entry with key:{} {}".format(key, event_entry)) + + # Perform lookup for the received event(fault) match in fault_action_policy_info dictionary. + # Iterate through the fault_action_policy_info dictionary to find a match for the received event (fault). + # This dictionary is derived from fault_action_policy_info json file. + match = False + for entry in json_data['chassis']: + for fault_entry in entry['faults']: + if ((fault_entry['type'] == event_entry['type-id']) and (fault_entry['severity'] == event_entry['severity'])): + match = True + helper_logger.log_notice("Keys matched at fault_seq_id: {}".format(fault_entry)) + helper_logger.log_notice("Fault type:{}, cause:{}, severity:{}, occured@:{}" + .format(event_entry['type-id'], event_entry['text'], event_entry['severity'], event_entry['time-created'])) + helper_logger.log_notice("Action(s) to be taken: {}".format(fault_entry['action'])) + for action in fault_entry['action']: + #if (action == 'syslog'): + # Already logged above so no further action + if (action == 'obfl'): + helper_logger.log_notice("log to OBFL: Fault type:{}, cause:{}, severity:{}, occured@:{}" + .format(event_entry['type-id'], event_entry['text'], event_entry['severity'], event_entry['time-created'])) + # TBA (To Be Added) once obfl filesystem support is in place + if (action == 'reboot'): + helper_logger.log_notice("Initiate system reboot with cause: {}".format(event_entry['text'])) + os.system('reboot -r "{}"'.format(event_entry['text'])) + helper_logger.log_notice("Due to system reboot action request, shutting down FM service") + self.deinitialize() + sys.exit(1) + break + if not match: + helper_logger.log_notice("No match found in fault-policy JSON file for the received fault!") except redis.exceptions.ConnectionError as exc: helper_logger.log_notice('state DB currently unavailable: {}'.format(exc)) - def analyze_db_events(self, event_obj, cnt): - # get events from event redisDB, analyze them - # Map each event against each of the fault policy - # once a match is found, take the needed action(s) - - global json_data - iter = 0 - - while True: - # perform lookup for the received event(fault) match in fault_action_policy_info dictionary - # (which is derived from fault_policy.json file) - # key is EVENT_TABLE| from the 'key' above - iter += 1 - helper_logger.log_notice('Map redisDB fault events to actions: Iter: {}'.format(iter)) - self.map_db_event_to_action(json_data) - # wait sometime before scanning for the events again in eventDB - time.sleep(25) def map_event_to_action(self, cnt): # Initialising event consumer object @@ -232,10 +234,10 @@ class faultManager(): # Start events' consumer thread to consume events from eventDB thread_consume = threading.Thread(target=self.analyze_db_events, args=(event_consume, cnt)) thread_consume.start() - helper_logger.log_notice("analyze_db_events THREAD started") + helper_logger.log_notice("analyze_db_events thread started") event_consume.wait(1) event_consume.clear() - helper_logger.log_notice("event_consume clear through") + helper_logger.log_notice("event_consume clear call is through") def start_fault_manager_algorithm(self, cnt): '''