diff --git a/debian/rules b/debian/rules index 4d2724d..a0cfa7f 100755 --- a/debian/rules +++ b/debian/rules @@ -2,7 +2,7 @@ PACKAGE_NAME := sonic-platform-pde TOPDIR := $(shell pwd) -PYTHON ?= python2 +PYTHON ?= python3 %: dh $@ diff --git a/sonic-pde-tests/sonic_pde_saiut/Makefile b/sonic-pde-tests/sonic_pde_saiut/Makefile index 06c6c9a..14580fb 100644 --- a/sonic-pde-tests/sonic_pde_saiut/Makefile +++ b/sonic-pde-tests/sonic_pde_saiut/Makefile @@ -2,7 +2,8 @@ SAI_INC := /usr/include/sai SAI_LIB := /usr/lib CC := gcc -CFLAGS := -fPIC -I/usr/include/python2.7 -I$(SAI_INC) +CFLAGS := -fPIC -I/usr/include/python3.9 -I$(SAI_INC) -I$(SAI_INC)/experimental + LDFLAGS := -L$(SAI_LIB) -lsai all: _saiut.so diff --git a/sonic-pde-tests/sonic_pde_saiut/saiut.h b/sonic-pde-tests/sonic_pde_saiut/saiut.h index 3a074ba..026b790 100644 --- a/sonic-pde-tests/sonic_pde_saiut/saiut.h +++ b/sonic-pde-tests/sonic_pde_saiut/saiut.h @@ -12,6 +12,8 @@ uint32_t portGetSpeed(sai_object_id_t port); sai_status_t portSetSpeed(sai_object_id_t port, uint32_t speed); uint64_t portGetCounter(sai_object_id_t port, sai_stat_id_t id); sai_status_t portClearCounter(sai_object_id_t port, sai_stat_id_t id); +sai_status_t portSetAttribute(sai_object_id_t port, sai_attribute_t *attr); +sai_status_t portGetAttribute(sai_object_id_t port, sai_attribute_t *attr); uint32_t switchGetPortNumber(void); sai_status_t switchShell(bool enable); diff --git a/sonic-pde-tests/sonic_pde_tests/conftest.py b/sonic-pde-tests/sonic_pde_tests/conftest.py index d552a82..cff58a4 100644 --- a/sonic-pde-tests/sonic_pde_tests/conftest.py +++ b/sonic-pde-tests/sonic_pde_tests/conftest.py @@ -26,10 +26,10 @@ def get_onie_pname(): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - id = pin.communicate()[0] + id = pin.communicate()[0].decode('ascii') id = id.strip() return id - + # Copy template platform config JSON to targeted platform JSON def create_platform_json(Pfname): INPUT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -40,7 +40,7 @@ def create_platform_json(Pfname): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - + # Copy template platform test JSON to targeted platform JSON def create_test_json(Tfname): INPUT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -51,7 +51,7 @@ def create_test_json(Tfname): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - + def pytest_runtest_setup(item): INPUT_DIR = os.path.dirname(os.path.abspath(__file__)) Pfname = os.path.join(INPUT_DIR, 'data/platform', "platform-" + get_onie_pname() + "-config.json") @@ -64,7 +64,7 @@ def pytest_runtest_setup(item): return else: create_test_json(Tfname) - + @pytest.fixture(scope='function',autouse='True') def json_config_data(): """ Loads json file """ @@ -92,4 +92,3 @@ def json_test_data(): contents=json.load(file_object) return contents - diff --git a/sonic-pde-tests/sonic_pde_tests/sai-shell b/sonic-pde-tests/sonic_pde_tests/sai-shell old mode 100755 new mode 100644 index 2a3e80d..401e753 --- a/sonic-pde-tests/sonic_pde_tests/sai-shell +++ b/sonic-pde-tests/sonic_pde_tests/sai-shell @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python3 import os.path import sys diff --git a/sonic-pde-tests/sonic_pde_tests/test_config.py b/sonic-pde-tests/sonic_pde_tests/test_config.py index ef98d44..2763630 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_config.py +++ b/sonic-pde-tests/sonic_pde_tests/test_config.py @@ -29,19 +29,27 @@ def test_for_required_bcm_config_file(json_test_data): assert config_file != {}, "Chipset configuration file not found" def test_for_required_bcm_config_settings(json_test_data): - """Test Purpose: Verify that the platform config.bcm file contains mandatory Silicon supported features. + """Test Purpose: Verify that the platform config.bcm file contains mandatory Silicon supported features + if adding in required list and do not contains any "not permitted" config setting Args: arg1 (json): test--config.json Example: - Parity should always be enabled for Broadcom switching silicon. + Parity_enable and parity_correction not permitted to be disabled for Broadcom switching silicon. "PLATFORM": { "CONFIG": { "required": { "config.bcm": [ - "parity_enable=1" + "#####=##", + ] + } + "CONFIG": { + "notpermitted": { + "config.bcm": [ + "parity_enable=0", + "parity_correction=0" ] } } @@ -64,10 +72,18 @@ def test_for_required_bcm_config_settings(json_test_data): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - cnt = pin.communicate()[0] - print cnt + cnt = pin.communicate()[0].decode('ascii') assert int(cnt)==1, "Required configuration property [" + pat[idx] + "] not detected" + pat = json_test_data["PLATFORM"]["CONFIG"]["notpermitted"]["config.bcm"] + for idx in range(len(pat)): + cmd = "cat " + config_file + " | grep -c -i " + pat[idx] + pin = subprocess.Popen(cmd, + shell=True, + close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + cnt = pin.communicate()[0].decode('ascii') - + assert int(cnt)==0, "configuration property [" + pat[idx] + "] is not allowed to be configured" diff --git a/sonic-pde-tests/sonic_pde_tests/test_cpld.py b/sonic-pde-tests/sonic_pde_tests/test_cpld.py index 9cf3fc3..227a538 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_cpld.py +++ b/sonic-pde-tests/sonic_pde_tests/test_cpld.py @@ -27,7 +27,7 @@ def load_platform_cpldutil(json_config_data): platform_cpldutil_class = getattr(platform_cpldutil_module,class_name) platform_cpldutil = platform_cpldutil_class() - except AttributeError, e: + except AttributeError as e: print("Failed to instantiate '%s' class: %s" % (class_name, str(e)), True) return diff --git a/sonic-pde-tests/sonic_pde_tests/test_eeprom.py b/sonic-pde-tests/sonic_pde_tests/test_eeprom.py index ee9b4fc..611a908 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_eeprom.py +++ b/sonic-pde-tests/sonic_pde_tests/test_eeprom.py @@ -1,130 +1,218 @@ -import pytest -import sys -import imp -import subprocess -import time - -PLATFORM_SPECIFIC_CLASS_NAME = "board" - -def test_for_eeprom_read(): - """Test Purpose: Verify that the EEPROM read is ok and chcksum is valid - """ - - module = imp.load_source("eeprom","/usr/share/sonic/platform/plugins/eeprom.py") - board = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME) - eeprom = board('',0,'',True) - assert eeprom.is_checksum_valid(eeprom.read_eeprom()),\ - "verify checksum is invalid".format(eeprom.read_eeprom()) - - -def test_for_eeprom_mac(json_config_data,json_test_data): - """Test Purpose: Verify that the MAC read from EEPROM is matching with the - test config JSON setting - - Args: - arg1 (json): platform--config.json - arg2 (json): test--config.json - - Example: - For a system that physically supports 1 syseeprom, the MAC address is - configured in the test--config.json - - test--config.json - "EEPROM": { - "mac": "00:11:22:33:44:55", - "ser": "AABBCCDDEEFF", - "model": "AAAA" - }, - """ - - module = imp.load_source("eeprom","/usr/share/sonic/platform/plugins/eeprom.py") - board = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME) - eeprom = board('',0,'',True) - for key in json_config_data: - assert eeprom.base_mac_addr(eeprom.read_eeprom()) == json_test_data[key]['EEPROM']['mac'],\ - "verify MAC is invalid".format(eeprom.read_eeprom()) - - -def test_for_eeprom_SER(json_config_data,json_test_data): - """Test Purpose: Verify that the SERIAL read from EEPROM is matching with the - test config JSON setting - - Args: - arg1 (json): platform--config.json - arg2 (json): test--config.json - - Example: - For a system that physically supports 1 syseeprom, the SERIAL number is - configured in the test--config.json - - test--config.json - "EEPROM": { - "mac": "00:11:22:33:44:55", - "ser": "AABBCCDDEEFF", - "model": "AAAA" - }, - """ - - module = imp.load_source("eeprom","/usr/share/sonic/platform/plugins/eeprom.py") - board = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME) - eeprom = board('',0,'',True) - - for key in json_config_data: - assert eeprom.serial_number_str(eeprom.read_eeprom()) == json_test_data[key]['EEPROM']['ser'],\ - "verify SER NUMBER is invalid".format(eeprom.read_eeprom()) - - -def test_for_eeprom_MODEL(json_config_data,json_test_data): - """Test Purpose: Verify that the MODEL read from EEPROM is matching with the - test config JSON setting - - Args: - arg1 (json): platform--config.json - arg2 (json): test--config.json - - Example: - For a system that physically supports 1 syseeprom, the MODEL is - configured in the test--config.json - - test--config.json - "EEPROM": { - "mac": "00:11:22:33:44:55", - "ser": "AABBCCDDEEFF", - "model": "AAAA" - }, - """ - module = imp.load_source("eeprom","/usr/share/sonic/platform/plugins/eeprom.py") - board = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME) - eeprom = board('',0,'',True) - - for key in json_config_data: - assert eeprom.modelstr(eeprom.read_eeprom()) == json_test_data[key]['EEPROM']['model'],\ - "verify Model Name is invalid".format(eeprom.read_eeprom()) - - -def test_for_platform_id(json_config_data,json_test_data): - module = imp.load_source("eeprom","/usr/share/sonic/platform/plugins/eeprom.py") - board = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME) - eeprom = board('',0,'',True) - - valid, t = eeprom.get_tlv_field(eeprom.read_eeprom(), \ - eeprom._TLV_CODE_PLATFORM_NAME) - assert valid, "Unable to get platform name from EEPROM" - - cmd = "cat /host/machine.conf | grep onie_platform | cut -d '=' -f 2" - pin = subprocess.Popen(cmd, - shell=True, - close_fds=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - id = pin.communicate()[0] - id = id.strip() - assert id == t[2], "platform id mismatch" - return - - - - -if __name__ == '__main__': - unittest.main() - +import pytest +import sys +import imp +import subprocess +import time + +PLATFORM_PATH = "/usr/share/sonic/platform" + + +# Global platform class instance +platform_eeprom_data = None +platform_eeprom = None +platform_chassis = None + + +# Loads platform module from source +def _wrapper_init(): + global platform_chassis + global platform_eeprom + global platform_eeprom_data + + # Load new platform api class + if platform_eeprom is None: + try: + import sonic_platform.platform + platform_chassis = sonic_platform.platform.Platform().get_chassis() + platform_eeprom = platform_chassis.get_eeprom() + platform_eeprom_data = platform_eeprom.read_eeprom() + except Exception as e: + print("Failed to load chassis due to {}".format(repr(e))) + +def is_valid_string(str): + for c in str: + v = ord(c) + if (v < 32) or (v > 126): + return False + return True + +def test_for_eeprom_read(): + """Test Purpose: Verify that the EEPROM read is ok and checksum is valid + """ + _wrapper_init() + assert platform_eeprom.is_checksum_valid(platform_eeprom_data), \ + "checksum failed" + +def test_for_eeprom_has_mac(): + _wrapper_init() + assert platform_eeprom.is_valid_tlvinfo_header(platform_eeprom_data), "Invalid TlvInfo format" + + has_mac = False + total_len = ((platform_eeprom_data[9]) << 8) | (platform_eeprom_data[10]) + tlv_index = platform_eeprom._TLV_INFO_HDR_LEN + tlv_end = platform_eeprom._TLV_INFO_HDR_LEN + total_len + while (tlv_index + 2) < len(platform_eeprom_data) and tlv_index < tlv_end: + assert platform_eeprom.is_valid_tlv(platform_eeprom_data[tlv_index:]), \ + "Invalid TLV @ {}".format(tlv_index) + tlv = platform_eeprom_data[tlv_index:tlv_index + 2 + (platform_eeprom_data[tlv_index + 1])] + if (tlv[0]) == platform_eeprom._TLV_CODE_CRC_32: + break + + if (tlv[0]) == platform_eeprom._TLV_CODE_MAC_BASE: + if (tlv[1]) == 6: + has_mac = True + break + tlv_index += (platform_eeprom_data[tlv_index+1]) + 2 + + assert has_mac, "MAC not found" + +def test_for_eeprom_mac(json_config_data,json_test_data): + """Test Purpose: Verify that the MAC read from EEPROM is matching with the + test config JSON setting + + Args: + arg1 (json): platform--config.json + arg2 (json): test--config.json + + Example: + For a system that physically supports 1 syseeprom, the MAC address is + configured in the test--config.json + + test--config.json + "EEPROM": { + "mac": "00:11:22:33:44:55", + "ser": "AABBCCDDEEFF", + "model": "AAAA" + }, + """ + _wrapper_init() + for key in json_config_data: + tst = json_test_data[key]['EEPROM']['mac'] + if platform_chassis is not None: + val = platform_chassis.get_base_mac() + else: + val = platform_eeprom.base_mac_addr(platform_eeprom_data) + assert val == tst + +def test_for_eeprom_has_sn(): + _wrapper_init() + assert platform_eeprom.is_valid_tlvinfo_header(platform_eeprom_data), "Invalid TlvInfo format" + + has_sn = False + total_len = ((platform_eeprom_data[9]) << 8) | (platform_eeprom_data[10]) + tlv_index = platform_eeprom._TLV_INFO_HDR_LEN + tlv_end = platform_eeprom._TLV_INFO_HDR_LEN + total_len + while (tlv_index + 2) < len(platform_eeprom_data) and tlv_index < tlv_end: + assert platform_eeprom.is_valid_tlv(platform_eeprom_data[tlv_index:]), \ + "Invalid TLV @ {}".format(tlv_index) + tlv = platform_eeprom_data[tlv_index:tlv_index + 2 + (platform_eeprom_data[tlv_index + 1])] + if (tlv[0]) == platform_eeprom._TLV_CODE_CRC_32: + break + + if (tlv[0]) == platform_eeprom._TLV_CODE_SERIAL_NUMBER: + if ((tlv[1]) <= 0): + break + + name, value = platform_eeprom.decoder(None, tlv) + value = value.strip() + if len(value) > 0 and is_valid_string(value): + has_sn = True + + break + tlv_index += (platform_eeprom_data[tlv_index+1]) + 2 + + assert has_sn, "serial number not found" + +def test_for_eeprom_sn(json_config_data,json_test_data): + """Test Purpose: Verify that the SERIAL read from EEPROM is matching with the + test config JSON setting + + Args: + arg1 (json): platform--config.json + arg2 (json): test--config.json + + Example: + For a system that physically supports 1 syseeprom, the SERIAL number is + configured in the test--config.json + + test--config.json + "EEPROM": { + "mac": "00:11:22:33:44:55", + "ser": "AABBCCDDEEFF", + "model": "AAAA" + }, + """ + _wrapper_init() + for key in json_config_data: + tst = json_test_data[key]['EEPROM']['ser'] + + if platform_chassis is not None: + val = platform_chassis.get_serial_number() + else: + val = platform_eeprom.serial_number_str(platform_eeprom_data) + + assert val == tst + +def test_for_eeprom_model(json_config_data,json_test_data): + """Test Purpose: Verify that the MODEL read from EEPROM is matching with the + test config JSON setting + + Args: + arg1 (json): platform--config.json + arg2 (json): test--config.json + + Example: + For a system that physically supports 1 syseeprom, the MODEL is + configured in the test--config.json + + test--config.json + "EEPROM": { + "mac": "00:11:22:33:44:55", + "ser": "AABBCCDDEEFF", + "model": "AAAA" + }, + """ + _wrapper_init() + for key in json_config_data: + tst = json_test_data[key]['EEPROM']['model'] + if platform_chassis is not None: + val = platform_chassis.get_name() + else: + val = platform_eeprom.modelstr(platform_eeprom_data) + + assert val == tst + +def test_for_platform_id(json_config_data,json_test_data): + _wrapper_init() + v, t = platform_eeprom.get_tlv_field(platform_eeprom_data, \ + platform_eeprom._TLV_CODE_PLATFORM_NAME) + assert v, "Unable to get platform name from EEPROM" + + cmd = "cat /host/machine.conf | grep onie_platform | cut -d '=' -f 2" + pin = subprocess.Popen(cmd, + shell=True, + close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + id = pin.communicate()[0].decode('ascii') + id = id.strip() + assert id == t[2].decode('ascii'), "platform id mismatch" + +def test_for_eeprom_junk_characters(): + _wrapper_init() + assert platform_eeprom.is_valid_tlvinfo_header(platform_eeprom_data), "Invalid TlvInfo format" + + total_len = ((platform_eeprom_data[9]) << 8) | (platform_eeprom_data[10]) + tlv_index = platform_eeprom._TLV_INFO_HDR_LEN + tlv_end = platform_eeprom._TLV_INFO_HDR_LEN + total_len + while (tlv_index + 2) < len(platform_eeprom_data) and tlv_index < tlv_end: + assert platform_eeprom.is_valid_tlv(platform_eeprom_data[tlv_index:]), \ + "Invalid TLV @ {}".format(tlv_index) + tlv = platform_eeprom_data[tlv_index:tlv_index + 2 + (platform_eeprom_data[tlv_index + 1])] + if (tlv[0]) == platform_eeprom._TLV_CODE_CRC_32: + break + + name, value = platform_eeprom.decoder(None, tlv) + #print("%-20s 0x%02X %3d %s" % (name, (tlv[0]), (tlv[1]), value)) + assert is_valid_string(name + value), "junk character detected" + tlv_index += (platform_eeprom_data[tlv_index+1]) + 2 diff --git a/sonic-pde-tests/sonic_pde_tests/test_fan.py b/sonic-pde-tests/sonic_pde_tests/test_fan.py index 2854363..b4ae86c 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_fan.py +++ b/sonic-pde-tests/sonic_pde_tests/test_fan.py @@ -7,95 +7,78 @@ DUTY_MIN = 50 DUTY_MAX = 100 - PLATFORM_PATH = "/usr/share/sonic/platform" -PLATFORM_SPECIFIC_MODULE_NAME = "fanutil" -PLATFORM_SPECIFIC_CLASS_NAME = "FanUtil" -# Global platform-specific fanutil class instance -platform_fanutil = None +# Global platform class instance platform_chassis = None -# Loads platform specific module from source +# Loads platform module from source def _wrapper_init(): global platform_chassis - global platform_fanutil # Load new platform api class - if platform_chassis is None: - try: - import sonic_platform.platform - platform_chassis = sonic_platform.platform.Platform().get_chassis() - except Exception as e: - print("Failed to load chassis due to {}".format(repr(e))) - - - # Load platform-specific fanutil class if platform_chassis is None: try: - module_file = "/".join([PLATFORM_PATH, "plugins", PLATFORM_SPECIFIC_MODULE_NAME + ".py"]) - module = imp.load_source(PLATFORM_SPECIFIC_MODULE_NAME, module_file) - platform_fanutil_class = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME) - platform_fanutil = platform_fanutil_class() + import sonic_platform.platform + platform_chassis = sonic_platform.platform.Platform().get_chassis() except Exception as e: - print("Failed to load fanutil due to {}".format(repr(e))) + print("Failed to load chassis due to {}".format(repr(e))) - assert (platform_chassis is not None) or (platform_fanutil is not None), "Unable to load platform module" - -# wrappers that are compliable with both new platform api and old-style plugin def _wrapper_get_num_fans(): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_num_fans() - except NotImplementedError: - pass - return platform_fanutil.get_num_fans() + try: + return platform_chassis.get_num_fans() + except NotImplementedError: + pass def _wrapper_get_fan_direction(index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_fan(index).get_direction() - except NotImplementedError: - pass - return platform_fanutil.get_direction(index+1) + try: + return platform_chassis.get_fan(index).get_direction() + except NotImplementedError: + pass + +def _wrapper_get_psu_direction(index): + _wrapper_init() + if platform_chassis is not None: + try: + return platform_chassis.get_psu(index)._fan_list[0].get_direction() + except NotImplementedError: + pass def _wrapper_get_fan_status(index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_fan(index).get_status() - except NotImplementedError: - pass - return platform_fanutil.get_status(index+1) + try: + return platform_chassis.get_fan(index).get_status() + except NotImplementedError: + pass def _wrapper_get_fan_presence(index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_fan(index).get_presence() - except NotImplementedError: - pass - return platform_fanutil.get_presence(index+1) + try: + return platform_chassis.get_fan(index).get_presence() + except NotImplementedError: + pass def _wrapper_get_fan_duty(index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_fan(index).get_speed() - except NotImplementedError: - pass - return platform_fanutil.get_speed(index+1) + try: + return platform_chassis.get_fan(index).get_speed() + except NotImplementedError: + pass def _wrapper_set_fan_duty(index, duty): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_fan(index).set_speed(duty) - except NotImplementedError: - pass - return platform_fanutil.set_speed(index+1) + try: + return platform_chassis.get_fan(index).set_speed(duty) + except NotImplementedError: + pass # test cases def test_for_num_fans(json_config_data): @@ -146,8 +129,11 @@ def test_for_fans_dir(json_config_data, json_test_data): for key in json_config_data: for x in json_test_data[key]['FAN']['present']: - assert _wrapper_get_fan_direction(x-1) == json_test_data[key]['FAN']['FAN'+str(x)]['direction'],\ + assert _wrapper_get_fan_direction(x-1).lower() == json_test_data[key]['FAN']['FAN'+str(x)]['direction'],\ "FAN{}: DIR mismatched".format(x) + for j in json_test_data[key]['PSU']['present']: + assert _wrapper_get_psu_direction(j-1).lower() == _wrapper_get_fan_direction(x-1).lower(),\ + "FAN{}: PSU{} DIR mismatched".format(x,j) def test_for_fans_status(json_config_data, json_test_data): @@ -232,6 +218,9 @@ def test_for_fans_duty(json_config_data, json_test_data): """ if json_config_data['PLATFORM']['modules']['FAN']['support'] == "false": pytest.skip("Skip the testing due to the python module is not supported in BSP") + + if json_config_data['PLATFORM']['modules']['FAN']['fan_duty_control_support'] == "false": + pytest.skip("Skip the testing due to the BMC platform fan control is not supported in BSP") # Only verify FAN Duty set ;; FAN Read will be verified in Thermal Test # start FAN speed/duty test diff --git a/sonic-pde-tests/sonic_pde_tests/test_leds.py b/sonic-pde-tests/sonic_pde_tests/test_leds.py index 929fe7b..82ee3ee 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_leds.py +++ b/sonic-pde-tests/sonic_pde_tests/test_leds.py @@ -28,7 +28,7 @@ def load_platform_sysledutil(json_config_data): platform_sysledutil_class = getattr(platform_sysledutil_module,class_name) platform_sysledutil = platform_sysledutil_class() - except AttributeError, e: + except AttributeError as e: print("Failed to instantiate '%s' class: %s" % (class_name, str(e)), True) return diff --git a/sonic-pde-tests/sonic_pde_tests/test_os.py b/sonic-pde-tests/sonic_pde_tests/test_os.py index bc52b9e..af7efb3 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_os.py +++ b/sonic-pde-tests/sonic_pde_tests/test_os.py @@ -110,7 +110,7 @@ def test_for_flooding_dmesg(json_test_data): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - cnt = pin.communicate()[0] + cnt = pin.communicate()[0].decode('ascii') cp1.append(int(cnt)) # sampling period @@ -124,7 +124,7 @@ def test_for_flooding_dmesg(json_test_data): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - cnt = pin.communicate()[0] + cnt = pin.communicate()[0].decode('ascii') cp2.append(int(cnt)) for idx in range(len(pat)): @@ -169,7 +169,7 @@ def test_for_flooding_syslog(json_test_data): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - cnt = pin.communicate()[0] + cnt = pin.communicate()[0].decode('ascii') cp1.append(int(cnt)) # sampling period @@ -183,10 +183,71 @@ def test_for_flooding_syslog(json_test_data): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - cnt = pin.communicate()[0] + cnt = pin.communicate()[0].decode('ascii') cp2.append(int(cnt)) for idx in range(len(pat)): assert cp2[idx] == cp1[idx], "flooding syslog [" + pat[idx] + "] detected" return + +def test_for_fstrim(): + """Test Purpose: Verify that if TRIM properlly enabled for SSDs + + Args: + None + + Note: + Periodic TRIM is always enabled on the SONiC platforms, and it launches + '/sbin/fstrim -av' weekly + """ + ssd = False + opts = subprocess.check_output("lsblk -l -o NAME,TYPE | grep disk | head -1", shell=True).decode('ascii') + disk = opts.split()[0] + + chk1 = subprocess.check_output("cat /sys/class/block/{0}/queue/discard_granularity".format(disk), shell=True).decode('ascii') + chk1 = chk1.strip() + chk2 = subprocess.check_output("cat /sys/class/block/{0}/queue/discard_max_bytes".format(disk), shell=True).decode('ascii') + chk2 = chk2.strip() + if (int(chk1, 10) == 0) or (int(chk2, 10) == 0): + pytest.skip("'{0}' does not support TRIM".format(disk)) + + if disk.startswith("nvme"): + ssd = True + else: + rota = subprocess.check_output("cat /sys/class/block/{0}/queue/rotational".format(disk), shell=True).decode('ascii') + rota = rota.strip() + ssd = (int(rota, 10) > 0) + + if not ssd: + pytest.skip("'{0}' is not a SSD".format(disk)) + + mtab = subprocess.check_output("cat /etc/mtab | grep '/dev/{0}' | head -1".format(disk), shell=True).decode('ascii') + opts = mtab.split()[3] + assert "discard" in opts, "{0}: discard/TRIM is not enabled".format(disk) + return + +def test_for_cpuload(): + """Test Purpose: Verify that if CPU load is not higher than expected + + Args: + None + + Note: + This test should only be launched when swss/syncd is disabled or without running traffic + """ + lower_bound = 90.0 + data = subprocess.check_output("iostat -c 1 3", shell=True).decode('ascii') + info = False + load = 0.0 + for row in data.split('\n'): + if row.startswith('avg-cpu'): + info = True + continue + if info: + load += float(row.split()[5]) + info = False + + load /= 3 + load = 100 - load + assert load < lower_bound, "CPU load is too high" diff --git a/sonic-pde-tests/sonic_pde_tests/test_psu.py b/sonic-pde-tests/sonic_pde_tests/test_psu.py index 01b057d..6ece19b 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_psu.py +++ b/sonic-pde-tests/sonic_pde_tests/test_psu.py @@ -5,112 +5,124 @@ import time PLATFORM_PATH = "/usr/share/sonic/platform" -PLATFORM_SPECIFIC_MODULE_NAME = "psuutil" -PLATFORM_SPECIFIC_CLASS_NAME = "PsuUtil" -platform_psuutil = None +# Global platform class instance platform_chassis = None -# Loads platform specific module from source +# Loads platform module from source def _wrapper_init(): - global platform_psuutil global platform_chassis # Load new platform api class if platform_chassis is None: - try: - import sonic_platform.platform - platform_chassis = sonic_platform.platform.Platform().get_chassis() - except Exception as e: - print("Failed to load chassis due to {}".format(repr(e))) + try: + import sonic_platform.platform + platform_chassis = sonic_platform.platform.Platform().get_chassis() + except Exception as e: + print("Failed to load chassis due to {}".format(repr(e))) - # Load platform-specific psuutil class - if platform_chassis is None: - try: - module_file = "/".join([PLATFORM_PATH, "plugins", PLATFORM_SPECIFIC_MODULE_NAME + ".py"]) - module = imp.load_source(PLATFORM_SPECIFIC_MODULE_NAME, module_file) - platform_psuutil_class = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME) - platform_psuutil = platform_psuutil_class() - except Exception as e: - print("Failed to load psuutil due to {}".format(repr(e))) - - assert (platform_chassis is not None) or (platform_psuutil is not None), "Unable to load platform module" - -# wrappers that are compliable with both new platform api and old-style plugin -def _wrapper_get_num_psus(): +def _wrapper_get_num_psu(): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_num_psus() - except NotImplementedError: - pass - return platform_psuutil.get_num_psus() + try: + return platform_chassis.get_num_psu() + except NotImplementedError: + pass -def _wrapper_get_psus_presence(psu_index): +def _wrapper_get_psu_presence(psu_index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_psu(psu_index).get_presence() - except NotImplementedError: - pass - return platform_psuutil.get_psu_presence(psu_index+1) + try: + return platform_chassis.get_psu(psu_index).get_presence() + except NotImplementedError: + pass -def _wrapper_get_psus_status(psu_index): +def _wrapper_get_psu_status(psu_index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_psu(psu_index).get_powergood_status() - except NotImplementedError: - pass - return platform_psuutil.get_psu_status(psu_index+1) + try: + return platform_chassis.get_psu(psu_index).get_powergood_status() + except NotImplementedError: + pass -def _wrapper_get_psus_serial(psu_index): +def _wrapper_get_psu_serial(psu_index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_psu(psu_index).get_serial() - except NotImplementedError: - pass - return platform_psuutil.get_serial(psu_index+1) + try: + return platform_chassis.get_psu(psu_index).get_serial() + except NotImplementedError: + pass -def _wrapper_get_psus_model(psu_index): +def _wrapper_get_psu_model(psu_index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_psu(psu_index).get_model() - except NotImplementedError: - pass - return platform_psuutil.get_model(psu_index+1) + try: + return platform_chassis.get_psu(psu_index).get_model() + except NotImplementedError: + pass -def _wrapper_get_psus_power(psu_index): +def _wrapper_get_psu_power(psu_index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_psu(psu_index).get_power() - except NotImplementedError: - pass - return platform_psuutil.get_output_power(psu_index+1) + try: + return platform_chassis.get_psu(psu_index).get_power() + except NotImplementedError: + pass -def _wrapper_get_psus_current(psu_index): +def _wrapper_get_psu_current(psu_index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_psu(psu_index).get_current() - except NotImplementedError: - pass - return platform_psuutil.get_output_current(psu_index+1) + try: + return platform_chassis.get_psu(psu_index).get_current() + except NotImplementedError: + pass -def _wrapper_get_psus_voltage(psu_index): +def _wrapper_get_psu_voltage(psu_index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_psu(psu_index).get_voltage() - except NotImplementedError: - pass - return platform_psuutil.get_output_voltage(psu_index+1) + try: + return platform_chassis.get_psu(psu_index).get_voltage() + except NotImplementedError: + pass +def _wrapper_get_psu_input_voltage(psu_index): + _wrapper_init() + if platform_chassis is not None: + try: + return platform_chassis.get_psu(psu_index).get_input_voltage() + except NotImplementedError: + pass +def _wrapper_get_psu_input_current(psu_index): + _wrapper_init() + if platform_chassis is not None: + try: + return platform_chassis.get_psu(psu_index).get_input_current() + except NotImplementedError: + pass +def _wrapper_get_psu_type(psu_index): + _wrapper_init() + if platform_chassis is not None: + try: + return platform_chassis.get_psu(psu_index).get_type() + except NotImplementedError: + pass + +def _wrapper_get_psu_capacity(psu_index): + _wrapper_init() + if platform_chassis is not None: + try: + return platform_chassis.get_psu(psu_index).get_capacity() + except NotImplementedError: + pass + +def is_valid_string(str): + for c in str: + v = ord(c) + if (v < 32) or (v > 126): + return False + return True def test_for_num_psus(json_config_data): """Test Purpose: Verify that the numer of PSUs reported as supported by the PSU plugin matches what the platform supports. @@ -128,7 +140,7 @@ def test_for_num_psus(json_config_data): } } """ - assert _wrapper_get_num_psus() == json_config_data['PLATFORM']['num_psus'],"System plugin reports that {} PSUs are supported in platform".format(platform_psuutil.get_num_psus()) + assert _wrapper_get_num_psu() == json_config_data['PLATFORM']['num_psus'],"System plugin reports that {} psu are supported in platform".format(_wrapper_get_num_psu()) def test_for_psu_present(json_config_data, json_test_data): """Test Purpose: Test Purpose: Verify that the PSUs that are present report as present in the PSU plugin. @@ -156,7 +168,7 @@ def test_for_psu_present(json_config_data, json_test_data): for key in json_config_data: psupresentlist = json_test_data[key]['PSU']['present'] for x in psupresentlist: - assert _wrapper_get_psus_presence(x-1) == True, "System plugin reported PSU {} was not present".format(x) + assert _wrapper_get_psu_presence(x-1) == True, "System plugin reported PSU {} was not present".format(x) def test_for_psu_notpresent(json_config_data, json_test_data): """Test Purpose: Verify that the PSUs that are not present report as not present in the PSU plugin. @@ -179,10 +191,10 @@ def test_for_psu_notpresent(json_config_data, json_test_data): } """ - num_psus = _wrapper_get_num_psus() + num_psus = _wrapper_get_num_psu() for key in json_config_data: for x in range (1, num_psus): - if _wrapper_get_psus_presence(x-1) == True: + if _wrapper_get_psu_presence(x-1) == True: Found = False; for y in json_test_data[key]['PSU']['present']: if x == y: @@ -218,7 +230,7 @@ def test_for_psu_status(json_config_data, json_test_data): for key in json_config_data: psupresentlist = json_test_data[key]['PSU']['present'] for x in psupresentlist: - assert _wrapper_get_psus_status(x-1) == json_test_data[key]['PSU']['status'][x-1], "System plugin reported PSU {} state did not match test state {}".format(x, json_test_data[key]['PSU']['status']) + assert _wrapper_get_psu_status(x-1) == json_test_data[key]['PSU']['status'][x-1], "System plugin reported PSU{} state did not match test state {}".format(x, json_test_data[key]['PSU']['status']) @@ -246,9 +258,10 @@ def test_for_psu_serial_num(json_config_data, json_test_data): for key in json_config_data: psupresentlist = json_test_data[key]['PSU']['present'] for x in psupresentlist: - if _wrapper_get_psus_status(x-1) : - assert _wrapper_get_psus_serial(x-1) == json_test_data[key]['PSU']['PSU'+str(x)]['psu_serial_num'], \ - "Verify PSU{} Serail number is invalid".format(x, _wrapper_get_psus_serial(x-1)) + if _wrapper_get_psu_status(x-1) : + assert _wrapper_get_psu_serial(x-1) == json_test_data[key]['PSU']['PSU'+str(x)]['psu_serial_num'], \ + "Verify PSU{} Serial number {} is invalid".format(x, _wrapper_get_psu_serial(x-1)) + assert is_valid_string(_wrapper_get_psu_serial(x-1)), "junk character in psu serial" def test_for_psu_model(json_config_data, json_test_data): @@ -275,9 +288,10 @@ def test_for_psu_model(json_config_data, json_test_data): for key in json_config_data: psupresentlist = json_test_data[key]['PSU']['present'] for x in psupresentlist: - if _wrapper_get_psus_status(x-1): - assert _wrapper_get_psus_model(x-1) == json_test_data[key]['PSU']['PSU'+str(x)]['model'], \ - "Verify PSU{} Model ID is invalid".format(x, _wrapper_get_psus_model(x-1)) + if _wrapper_get_psu_status(x-1): + assert _wrapper_get_psu_model(x-1) == json_test_data[key]['PSU']['PSU'+str(x)]['model'], \ + "Verify PSU{} Model ID {} is invalid".format(x, _wrapper_get_psu_model(x-1)) + assert is_valid_string(_wrapper_get_psu_model(x-1)), "junk character in psu model" def test_for_psu_voltage(json_config_data, json_test_data): """Test Purpose: Verify that the PSUs Output voltage is valid @@ -303,12 +317,12 @@ def test_for_psu_voltage(json_config_data, json_test_data): for key in json_config_data: psupresentlist = json_test_data[key]['PSU']['present'] for x in psupresentlist: - if _wrapper_get_psus_status(x-1): - assert _wrapper_get_psus_voltage(x-1) <= json_test_data[key]['PSU']['PSU'+str(x)]['output_voltage'] * 1.1, \ - "Verify PSU{} Output voltage is invalid too high".format(x, _wrapper_get_psus_voltage(x-1)) + if _wrapper_get_psu_status(x-1): + assert _wrapper_get_psu_voltage(x-1) <= json_test_data[key]['PSU']['PSU'+str(x)]['output_voltage'] * 1.1, \ + "Verify PSU{} Output voltage {} is either invalid or too high".format(x, _wrapper_get_psu_voltage(x-1)) - assert _wrapper_get_psus_voltage(x-1) >= json_test_data[key]['PSU']['PSU'+str(x)]['output_voltage'] * 0.9, \ - "Verify PSU{} Output voltage is invalid too low".format(x, _wrapper_get_psus_voltage(x-1)) + assert _wrapper_get_psu_voltage(x-1) >= json_test_data[key]['PSU']['PSU'+str(x)]['output_voltage'] * 0.9, \ + "Verify PSU{} Output voltage {} is either invalid or too low".format(x, _wrapper_get_psu_voltage(x-1)) def test_for_psu_current(json_config_data, json_test_data): @@ -326,9 +340,13 @@ def test_for_psu_current(json_config_data, json_test_data): for key in json_config_data: psupresentlist = json_test_data[key]['PSU']['present'] for x in psupresentlist: - if _wrapper_get_psus_status(x-1): - assert _wrapper_get_psus_current(x-1), \ - "Verify PSU{} output current is fail to read".format(x, _wrapper_get_psus_current(x-1)) + if _wrapper_get_psu_status(x-1): + assert _wrapper_get_psu_current(x-1) <= json_test_data[key]['PSU']['PSU'+str(x)]['output_current'] * 1.1, \ + "Verify PSU{} Output current {} is either invalid or too high".format(x, _wrapper_get_psu_current(x-1)) + + assert _wrapper_get_psu_current(x-1) >= json_test_data[key]['PSU']['PSU'+str(x)]['output_current'] * 0.9, \ + "Verify PSU{} Output current {} is either invalid or too low".format(x, _wrapper_get_psu_current(x-1)) + def test_for_psu_power(json_config_data, json_test_data): @@ -346,8 +364,124 @@ def test_for_psu_power(json_config_data, json_test_data): for key in json_config_data: psupresentlist = json_test_data[key]['PSU']['present'] for x in psupresentlist: - if _wrapper_get_psus_status(x-1): - assert _wrapper_get_psus_power(x-1), \ - "Verify PSU{} output power is fail to read".format(x, _wrapper_get_psus_power(x-1)) + if _wrapper_get_psu_status(x-1): + assert _wrapper_get_psu_power(x-1) <= json_test_data[key]['PSU']['PSU'+str(x)]['output_power'] * 1.1, \ + "Verify PSU{} Output Power {} is invalid too high".format(x, _wrapper_get_psu_power(x-1)) + + assert _wrapper_get_psu_power(x-1) >= json_test_data[key]['PSU']['PSU'+str(x)]['output_power'] * 0.9, \ + "Verify PSU{} Output Power {} is invalid too low".format(x, _wrapper_get_psu_power(x-1)) +def test_for_psu_input_voltage(json_config_data, json_test_data): + """Test Purpose: Verify that the PSUs Input voltage is valid + + Args: + arg1 (json): platform--config.json + arg2 (json): test--config.json + + Example: + For a system that only has power supply 1 present + + test--config.json + { + "PLATFORM": { + "PSU1": { + "input_voltage":"110" + } + """ + + if json_config_data['PLATFORM']['modules']['PSU']['support'] == "false": + pytest.skip("Skip the testing due to the openconfig API in python module is not supported in BSP") + + for key in json_config_data: + psupresentlist = json_test_data[key]['PSU']['present'] + for x in psupresentlist: + if _wrapper_get_psu_status(x-1): + assert _wrapper_get_psu_input_voltage(x-1) <= json_test_data[key]['PSU']['PSU'+str(x)]['input_voltage'] * 1.1, \ + "Verify PSU{} Input voltage {} is either invalid or too high".format(x, _wrapper_get_psu_input_voltage(x-1)) + + assert _wrapper_get_psu_input_voltage(x-1) >= json_test_data[key]['PSU']['PSU'+str(x)]['input_voltage'] * 0.9, \ + "Verify PSU{} Input voltage {} is either invalid or too low".format(x, _wrapper_get_psu_input_voltage(x-1)) + + +def test_for_psu_input_current(json_config_data, json_test_data): + """Test Purpose: Verify that the PSUs input current is able to read + + Args: + arg1 (json): platform--config.json + arg2 (json): test--config.json + + """ + + if json_config_data['PLATFORM']['modules']['PSU']['support'] == "false": + pytest.skip("Skip the testing due to the openconfig API in python module is not supported in BSP") + + for key in json_config_data: + psupresentlist = json_test_data[key]['PSU']['present'] + for x in psupresentlist: + if _wrapper_get_psu_status(x-1): + assert _wrapper_get_psu_input_current(x-1) <= json_test_data[key]['PSU']['PSU'+str(x)]['input_current'] * 1.1, \ + "Verify PSU{} Input current {} is either invalid or too high".format(x, _wrapper_get_psu_input_current(x-1)) + + assert _wrapper_get_psu_input_current(x-1) >= json_test_data[key]['PSU']['PSU'+str(x)]['input_current'] * 0.9, \ + "Verify PSU{} Input current {} is either invalid or too low".format(x, _wrapper_get_psu_input_current(x-1)) + + + +def test_for_psu_capacity(json_config_data, json_test_data): + """Test Purpose: Verify that the PSUs Capacity is valid + + Args: + arg1 (json): platform--config.json + arg2 (json): test--config.json + + Example: + For a system that only has power supply 1 present + + test--config.json + { + "PLATFORM": { + "PSU1": { + "capacity":"1000", + } + """ + + if json_config_data['PLATFORM']['modules']['PSU']['support'] == "false": + pytest.skip("Skip the testing due to the openconfig API in python module is not supported in BSP") + + for key in json_config_data: + psupresentlist = json_test_data[key]['PSU']['present'] + for x in psupresentlist: + if _wrapper_get_psu_status(x-1): + assert _wrapper_get_psu_capacity(x-1) == json_test_data[key]['PSU']['PSU'+str(x)]['capacity'], \ + "Verify PSU{} Capacity {} is invalid".format(x, _wrapper_get_psu_capacity(x-1)) + + + +def test_for_psu_type(json_config_data, json_test_data): + """Test Purpose: Verify that the PSUs Type is valid + + Args: + arg1 (json): platform--config.json + arg2 (json): test--config.json + + Example: + For a system that only has power supply 1 present + + test--config.json + { + "PLATFORM": { + "PSU1": { + "type":"AC", + } + """ + + if json_config_data['PLATFORM']['modules']['PSU']['support'] == "false": + pytest.skip("Skip the testing due to the openconfig API in python module is not supported in BSP") + + for key in json_config_data: + psupresentlist = json_test_data[key]['PSU']['present'] + for x in psupresentlist: + if _wrapper_get_psu_status(x-1): + assert _wrapper_get_psu_type(x-1) == json_test_data[key]['PSU']['PSU'+str(x)]['type'], \ + "Verify PSU{} Type {} is invalid".format(x, _wrapper_get_psu_type(x-1)) \ No newline at end of file diff --git a/sonic-pde-tests/sonic_pde_tests/test_sai.py b/sonic-pde-tests/sonic_pde_tests/test_sai.py index 02edfb2..e25c78a 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_sai.py +++ b/sonic-pde-tests/sonic_pde_tests/test_sai.py @@ -1,12 +1,13 @@ #!/usr/bin/python import sys +import json import time import pytest import subprocess import os.path -import saiut +from . import saiut PLATFORM_PATH = "/usr/share/sonic/platform" HWSKU_PATH = "/usr/share/sonic/hwsku" @@ -17,6 +18,32 @@ # Global port dictionaries port_dict = None +def parse_port_config_file(port_config_file): + ports = {} + port_alias_map = {} + # Default column definition + titles = ['name', 'lanes', 'alias', 'index'] + with open(port_config_file) as data: + for line in data: + if line.startswith('#'): + if "name" in line: + titles = line.strip('#').split() + continue; + tokens = line.split() + if len(tokens) < 2: + continue + name_index = titles.index('name') + name = tokens[name_index] + data = {} + for i, item in enumerate(tokens): + if i == name_index: + continue + data[titles[i]] = item + data.setdefault('alias', name) + ports[name] = data + port_alias_map[data['alias']] = name + return (ports, port_alias_map) + # Load the platform specific port dictionaries def load_platform_portdict(): global port_dict @@ -32,8 +59,8 @@ def load_platform_portdict(): if not line.startswith("#"): lst = line.split() if len(lst) >= 2: - key = lst[0]; - val = int(lst[1].split(',')[0], 10) + key = lst[0].strip() + val = lst[1].split(',') port_dict[key] = val line = file.readline() file.close() @@ -89,10 +116,66 @@ def test_for_switch_port_enumeration(): assert rv == saiut.SAI_STATUS_SUCCESS, "Unable to initialize the switch" for key in port_dict: - port = saiut.portGetId(port_dict[key]) + port = saiut.portGetId(int(port_dict[key][0], 10)) assert port != 0, "{}: port id not found".format(key) return +# Test for switch port loopback +def test_for_switch_port_loopback(): + """ + Test Purpose: Verify the per port packet transmission in MAC loopback mode + + Args: + None + """ + rv = load_platform_portdict() + assert rv is not None, "Unable to load the port dictionary" + + rv = start_switch_core() + assert rv == saiut.SAI_STATUS_SUCCESS, "Unable to initialize the switch" + + attr = saiut.sai_attribute_t() + + # Enable the port in MAC loopback mode, and transmit the packets + for key in port_dict: + port = saiut.portGetId(int(port_dict[key][0], 10)) + assert port != 0, "{}: port id not found".format(key) + + # Enable the port + attr.id = saiut.SAI_PORT_ATTR_ADMIN_STATE + attr.value.booldata = True + saiut.portSetAttribute(port, attr) + + # Enable MAC loopback mode + attr.id = saiut.SAI_PORT_ATTR_INTERNAL_LOOPBACK_MODE + attr.value.u32 = saiut.SAI_PORT_INTERNAL_LOOPBACK_MODE_MAC + saiut.portSetAttribute(port, attr) + + # Perform loopback packet transmission test + saiut.portClearCounter(port, saiut.SAI_PORT_STAT_ETHER_OUT_PKTS_512_TO_1023_OCTETS) + saiut.portClearCounter(port, saiut.SAI_PORT_STAT_ETHER_IN_PKTS_512_TO_1023_OCTETS) + saiut.bcmSendPackets(port, 99, 512) + + # Allow 2 seconds for counter updates + time.sleep(2) + + # Verify the packet counters, and exit loopback + for key in port_dict: + port = saiut.portGetId(int(port_dict[key][0], 10)) + assert port != 0, "{}: port id not found".format(key) + + xmit = saiut.portGetCounter(port, saiut.SAI_PORT_STAT_ETHER_OUT_PKTS_512_TO_1023_OCTETS) + recv = saiut.portGetCounter(port, saiut.SAI_PORT_STAT_ETHER_IN_PKTS_512_TO_1023_OCTETS) + + # Exit loopback mode + attr.id = saiut.SAI_PORT_ATTR_INTERNAL_LOOPBACK_MODE + attr.value.u32 = saiut.SAI_PORT_INTERNAL_LOOPBACK_MODE_NONE + saiut.portSetAttribute(port, attr) + + # Validate the test result + assert xmit >= 99, "{}: loopback tx failed".format(key) + assert recv >= 99, "{}: loopback rx failed".format(key) + # Test for switch port traffic def test_for_switch_port_traffic(json_test_data): """ @@ -126,148 +209,63 @@ def test_for_switch_port_traffic(json_test_data): assert rv == saiut.SAI_STATUS_SUCCESS, "Unable to initialize the switch" # Enable all the switch ports + attr = saiut.sai_attribute_t() for key in port_dict: - port = saiut.portGetId(port_dict[key]) + port = saiut.portGetId(int(port_dict[key][0], 10)) assert port != 0, "{}: port id not found".format(key) - saiut.portSetAdminState(port, True) - time.sleep(3) + + attr.id = saiut.SAI_PORT_ATTR_ADMIN_STATE + attr.value.booldata = True + saiut.portSetAttribute(port, attr) + + time.sleep(1) lnks = json_test_data['PLATFORM']['SELF_LOOPS'] for idx in range(0, len(lnks)): p1 = lnks[idx].split(':')[0] p2 = lnks[idx].split(':')[1] - pid1 = saiut.portGetId(port_dict[p1]) - pid2 = saiut.portGetId(port_dict[p2]) + pid1 = saiut.portGetId(int(port_dict[p1][0], 10)) + pid2 = saiut.portGetId(int(port_dict[p2][0], 10)) print("xmit: {} --> {}".format(p1, p2)); saiut.portClearCounter(pid1, saiut.SAI_PORT_STAT_ETHER_OUT_PKTS_512_TO_1023_OCTETS) saiut.portClearCounter(pid2, saiut.SAI_PORT_STAT_ETHER_IN_PKTS_512_TO_1023_OCTETS) - saiut.bcmSendPackets(pid1, 9, 512) + saiut.bcmSendPackets(pid1, 99, 512) time.sleep(3) c1 = saiut.portGetCounter(pid1, saiut.SAI_PORT_STAT_ETHER_OUT_PKTS_512_TO_1023_OCTETS) c2 = saiut.portGetCounter(pid2, saiut.SAI_PORT_STAT_ETHER_IN_PKTS_512_TO_1023_OCTETS) - assert c1 >= 9, "{} --> {}: tx failed".format(p1, p2) - assert c2 >= 9, "{} --> {}: rx failed".format(p1, p2) + assert c1 >= 99, "{} --> {}: tx failed".format(p1, p2) + assert c2 >= 99, "{} --> {}: rx failed".format(p1, p2) print("xmit: {} <-- {}".format(p1, p2)); saiut.portClearCounter(pid1, saiut.SAI_PORT_STAT_ETHER_IN_PKTS_512_TO_1023_OCTETS) saiut.portClearCounter(pid2, saiut.SAI_PORT_STAT_ETHER_OUT_PKTS_512_TO_1023_OCTETS) - saiut.bcmSendPackets(pid2, 9, 512) + saiut.bcmSendPackets(pid2, 99, 512) time.sleep(3) c1 = saiut.portGetCounter(pid1, saiut.SAI_PORT_STAT_ETHER_IN_PKTS_512_TO_1023_OCTETS) c2 = saiut.portGetCounter(pid2, saiut.SAI_PORT_STAT_ETHER_OUT_PKTS_512_TO_1023_OCTETS) - assert c1 >= 9, "{} <-- {}: rx failed".format(p1, p2) - assert c2 >= 9, "{} <-- {}: tx failed".format(p1, p2) + assert c1 >= 99, "{} <-- {}: rx failed".format(p1, p2) + assert c2 >= 99, "{} <-- {}: tx failed".format(p1, p2) -# Test for switch port breakout modes -def test_for_switch_port_breakout(json_test_data): - """ - Test Purpose: Verify the QSFP port breakout support - Args: - arg1 (json): test--config.json - Example: - For a system with 2 expandable ports - test--config.json - { - "PLATFORM": { - "PORT_BREAKOUT" : { - # port Name port modes ,, - "Ethernet128" : "1x100,4x10,4x25", - "Ethernet136" : "1x100,4x10,4x25" - } - } - } +# Test for medis setting json +def test_for_switch_media_setting_json(): """ - try: - intf = json_test_data['PLATFORM']['PORT_BREAKOUT'] - except KeyError: - pytest.skip("test configuration not found") + Test Purpose: Verify preemphasis media setting json - # load key-value-pairs from sai.profile - kvp = {} + Args: + None + """ try: - file = open(HWSKU_PATH + '/sai.profile', 'r') + with open(PLATFORM_PATH +'/media_settings.json') as data: + media_dict = json.load(data) except: - assert False, "Unable to open sai.profile" - line = file.readline() - while line is not None and len(line) > 0: - line = line.strip() - if not line.startswith("#"): - lst = line.split('=') - if len(lst) != 2: - continue - kvp[lst[0]] = lst[1] - line = file.readline() - file.close() + assert False, "Unable to open media_settings.json" - # make a backup of config.bcm and port_config.ini - file = kvp['SAI_INIT_CONFIG_FILE'] - rc = subprocess.call("cp -f {} {}.orig >/dev/null 2>&1".format(file, file), shell=True) - assert rc == 0, "Unable to make a backup of the config.bcm" - file = HWSKU_PATH + '/port_config.ini' - rc = subprocess.call("cp -f {} {}.orig >/dev/null 2>&1".format(file, file), shell=True) - assert rc == 0, "Unable to make a backup of the port_config.ini" - - # perform port breakout test - for intf in json_test_data['PLATFORM']['PORT_BREAKOUT']: - # mode = ,, - mode = json_test_data['PLATFORM']['PORT_BREAKOUT'][intf] - list = mode.split(',') - for i in range(1, len(list)): - print("port_breakout.py -p {} -o {}".format(intf, list[i])); - # root port mode - rc = subprocess.call("port_breakout.py -p {} -o {} >/dev/null 2>&1".format(intf, list[0]), shell=True) - assert rc == 0, "{}: Unable to do {}".format(intf, list[i]) - # target breakout mode - rc = subprocess.call("port_breakout.py -p {} -o {} >/dev/null 2>&1".format(intf, list[i]), shell=True) - assert rc == 0, "{}: Unable to do {}".format(intf, list[i]) - - # load sonic portmap - intf_dict = {} - file = open(HWSKU_PATH + '/port_config.ini', 'r') - line = file.readline() - while line is not None and len(line) > 0: - line = line.strip() - if line.startswith("#"): - line = file.readline() - continue - tok = line.split() - if len(tok) >= 2: - key = tok[0]; - val = int(tok[1].split(',')[0], 10) - assert key not in intf_dict, "Duplicated interface in port_config.ini" - intf_dict[key] = val - line = file.readline() - file.close() - - # load bcm portmap - brcm_dict = {} - file = open(kvp['SAI_INIT_CONFIG_FILE'], 'r') - line = file.readline() - while line is not None and len(line) > 0: - line = line.strip() - if line.startswith("#") or ("portmap_" not in line): - line = file.readline() - continue - # portmap_=:: - tok = line.split('=') - lport = int(tok[0].split('_')[1], 10) - pport = int(tok[1].split(':')[0], 10) - brcm_dict[pport] = lport - line = file.readline() - file.close() - - for port in intf_dict: - lane = intf_dict[port] - assert lane in brcm_dict, "{}: portmap not found".format(port) - - # restore the config.bcm and port_config.ini - file = kvp['SAI_INIT_CONFIG_FILE'] - rc = subprocess.call("cp -f {}.orig {} >/dev/null 2>&1".format(file, file), shell=True) - assert rc == 0, "Unable to restore the config.bcm" - file = HWSKU_PATH + '/port_config.ini' - rc = subprocess.call("cp -f {}.orig {} >/dev/null 2>&1".format(file, file), shell=True) - assert rc == 0, "Unable to restore the port_config.ini" + assert (media_dict is not None) , "media_dict is None " + + if 'GLOBAL_MEDIA_SETTINGS' not in media_dict and 'SPEED_MEDIA_SETTINGS' not in media_dict \ + and 'PORT_MEDIA_SETTINGS' not in media_dict: + assert False, "Required *_MEDIA_SETTINGS not defined" diff --git a/sonic-pde-tests/sonic_pde_tests/test_sfp.py b/sonic-pde-tests/sonic_pde_tests/test_sfp.py index 66d6483..58a8a2d 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_sfp.py +++ b/sonic-pde-tests/sonic_pde_tests/test_sfp.py @@ -12,38 +12,35 @@ raise ImportError("%s - required module not found" % str(e)) PLATFORM_PATH = "/usr/share/sonic/platform" -PLATFORM_SPECIFIC_MODULE_NAME = "sfputil" -PLATFORM_SPECIFIC_CLASS_NAME = "SfpUtil" -# Global platform-specific sfputil class instance +# Global class instance platform_sfputil = None +platform_chassis = None # Global port dictionaries port_dict = None +first_phy_port =1 -# Loads platform specific sfputil module from source -def load_platform_sfputil(): +# Loads platform module from source +def _wrapper_init(): + global platform_chassis global platform_sfputil - if platform_sfputil is not None: - return - - try: - module_file = "/".join([PLATFORM_PATH, "plugins", PLATFORM_SPECIFIC_MODULE_NAME + ".py"]) - module = imp.load_source(PLATFORM_SPECIFIC_MODULE_NAME, module_file) - except IOError, e: - print("Failed to load platform module '%s': %s" % (PLATFORM_SPECIFIC_MODULE_NAME, str(e)), True) - - assert module is not None - - try: - platform_sfputil_class = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME) - platform_sfputil = platform_sfputil_class() - except AttributeError, e: - print("Failed to instantiate '%s' class: %s" % (PLATFORM_SPECIFIC_CLASS_NAME, str(e)), True) - - assert platform_sfputil is not None - return + # Load new platform api class + if platform_chassis is None: + try: + import sonic_platform.platform + platform_chassis = sonic_platform.platform.Platform().get_chassis() + except Exception as e: + print("Failed to load chassis due to {}".format(repr(e))) + + #Load SfpUtilHelper class + if platform_sfputil is None: + try: + import sonic_platform_base.sonic_sfp.sfputilhelper + platform_sfputil = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper() + except Exception as e: + print("Failed to load chassis due to {}".format(repr(e))) # Get platform specific HwSKU path def get_hwsku_path(): @@ -78,6 +75,81 @@ def load_platform_portdict(): return port_dict +# Find out the physical port is 0-based or 1-based +def find_port_index_based(): + global first_phy_port + + # Load port info + try: + port_config_file_path = get_hwsku_path() + "/port_config.ini" + platform_sfputil.read_porttab_mappings(port_config_file_path) + except Exception as e: + print("Failed to read port info: %s".format(repr(e))) + + if platform_sfputil.logical_to_physical[platform_sfputil.logical[0]][0] == 0: + first_phy_port = 0 + elif platform_sfputil.logical_to_physical[platform_sfputil.logical[0]][0] == 1: + first_phy_port = 1 + else: + # default is 1-based + first_phy_port = 1 + + +def _wrapper_is_qsfp_port(physical_port): + _wrapper_init() + find_port_index_based() + if "QSFP" in platform_chassis.get_sfp(physical_port).get_transceiver_info().get("type","N/A"): + return True + else: + return False + +def _wrapper_get_transceiver_info_type(physical_port): + find_port_index_based() + try: + return platform_chassis.get_sfp(physical_port).get_transceiver_info().get("type","N/A") + except NotImplementedError: + pass + +def _wrapper_get_num_sfps(): + _wrapper_init() + load_platform_portdict() + try: + return platform_chassis.get_num_sfps() + except NotImplementedError: + pass + +def _wrapper_get_presence(physical_port): + _wrapper_init() + find_port_index_based() + try: + return platform_chassis.get_sfp(physical_port).get_presence() + except NotImplementedError: + pass + +def _wrapper_get_low_power_mode(physical_port): + _wrapper_init() + find_port_index_based() + try: + return platform_chassis.get_sfp(physical_port).get_lpmode() + except NotImplementedError: + pass + +def _wrapper_set_low_power_mode(physical_port, enable): + _wrapper_init() + find_port_index_based() + try: + return platform_chassis.get_sfp(physical_port).set_lpmode(enable) + except NotImplementedError: + pass + +def _wrapper_reset(physical_port): + _wrapper_init() + find_port_index_based() + try: + return platform_chassis.get_sfp(physical_port).reset() + except NotImplementedError: + pass + # Test for SFP port number def test_for_sfp_number(json_config_data): """Test Purpose: Verify that the numer of SFPs reported as supported by the SFP plugin matches what the platform supports. @@ -95,17 +167,8 @@ def test_for_sfp_number(json_config_data): } } """ - load_platform_sfputil() - load_platform_portdict() - num = 0 - for intf in natsorted(port_dict.keys()): - port = int(port_dict[intf]['index']) - if not platform_sfputil._is_valid_port(port): - continue - num += 1 - for plat in json_config_data: - assert num == int(json_config_data[plat]["num_sfps"]) + assert _wrapper_get_num_sfps() == int(json_config_data[plat]["num_sfps"]) return @@ -139,13 +202,10 @@ def test_for_sfp_present(json_config_data, json_test_data): } } """ - load_platform_sfputil() load_platform_portdict() for intf in natsorted(port_dict.keys()): port = int(port_dict[intf]['index']) - if not platform_sfputil._is_valid_port(port): - continue - bool = platform_sfputil.get_presence(port) + bool = _wrapper_get_presence(port) for plat in json_config_data: list = json_test_data[plat]['SFP']['present'] if port in list: @@ -184,35 +244,15 @@ def test_for_sfp_eeprom(json_config_data, json_test_data): } } """ - load_platform_sfputil() load_platform_portdict() for intf in natsorted(port_dict.keys()): port = int(port_dict[intf]['index']) - if not platform_sfputil._is_valid_port(port): - continue - bool = platform_sfputil.get_presence(port) + bool = _wrapper_get_presence(port) for plat in json_config_data: list = json_test_data[plat]['SFP']['present'] if port in list: assert bool == True, "SFP{} is not present unexpectedly".format(port) - code = 0 - data = platform_sfputil.get_eeprom_raw(port) - assert data != None, "SFP{}: unable to read EEPROM".format(port) - if port in platform_sfputil.osfp_ports: - #OSFP/QSFP-DD - for i in range(128, 222): - code += int(data[i], 16) - assert (code & 0xff) == int(data[222], 16), "check code error" - elif port in platform_sfputil.qsfp_ports: - #QSFP - for i in range(128, 191): - code += int(data[i], 16) - assert (code & 0xff) == int(data[191], 16), "check code error" - else: - #SFP/SFP+ - for i in range(0, 63): - code += int(data[i], 16) - assert (code & 0xff) == int(data[63], 16), "check code error" + assert _wrapper_get_transceiver_info_type(port) is not None,"SFP{} EEPROM is fail to get".format(port) else: assert bool == False, "SFP{} is present".format(port) return @@ -224,25 +264,23 @@ def test_for_sfp_lpmode(): Args: None """ - load_platform_sfputil() load_platform_portdict() for intf in natsorted(port_dict.keys()): port = int(port_dict[intf]['index']) - if not platform_sfputil._is_valid_port(port): - continue - if port not in platform_sfputil.qsfp_ports: - continue - try: - bool = platform_sfputil.get_low_power_mode(port) - except NotImplementedError: - assert False, (intf + ': get_low_power_mode() is not implemented') - if bool: - try: - platform_sfputil.set_low_power_mode(port, False) - except NotImplementedError: - assert False, (intf + ': low power detected while ' + - 'set_low_power_mode() is not implemented,' + - 'link errors are expected on high power modules') + if _wrapper_get_presence(port): + if not _wrapper_is_qsfp_port(port): + continue + try: + bool = _wrapper_get_low_power_mode(port) + except NotImplementedError: + assert False, (intf + ': get_low_power_mode() is not implemented') + if bool: + try: + _wrapper_set_low_power_mode(port, False) + except NotImplementedError: + assert False, (intf + ': low power detected while ' + + 'set_low_power_mode() is not implemented,' + + 'link errors are expected on high power modules') return # Test for SFP LPMODE @@ -252,18 +290,16 @@ def test_for_sfp_reset(): Args: None """ - load_platform_sfputil() load_platform_portdict() for intf in natsorted(port_dict.keys()): port = int(port_dict[intf]['index']) - if not platform_sfputil._is_valid_port(port): - continue - if port not in platform_sfputil.qsfp_ports: - continue - try: - bool = platform_sfputil.reset(port) - assert bool, "reset failed" - except NotImplementedError: + if _wrapper_get_presence(port): + if not _wrapper_is_qsfp_port(port): + continue + try: + bool = _wrapper_reset(port) + assert bool, "reset failed" + except NotImplementedError: # By defalt, it does no harm to have this unimplemented # This failure will only be observed when users try to manually # reset the module via CLI diff --git a/sonic-pde-tests/sonic_pde_tests/test_thermal.py b/sonic-pde-tests/sonic_pde_tests/test_thermal.py index 8e7cd02..0b18af5 100644 --- a/sonic-pde-tests/sonic_pde_tests/test_thermal.py +++ b/sonic-pde-tests/sonic_pde_tests/test_thermal.py @@ -5,20 +5,10 @@ import time PLATFORM_PATH = "/usr/share/sonic/platform" -PLATFORM_SPECIFIC_PSU_MODULE_NAME = "psuutil" -PLATFORM_SPECIFIC_PSU_CLASS_NAME = "PsuUtil" -PLATFORM_SPECIFIC_FAN_MODULE_NAME = "fanutil" -PLATFORM_SPECIFIC_FAN_CLASS_NAME = "FanUtil" - -platform_psuutil = None platform_chassis = None -platform_fanutil = None -# wrappers that are compliable with both new platform api and old-style plugin def _wrapper_init(): global platform_chassis - global platform_psuutil - global platform_fanutil # Load new platform api class if platform_chassis is None: @@ -28,29 +18,6 @@ def _wrapper_init(): except Exception as e: print("Failed to load chassis due to {}".format(repr(e))) - # Load platform-specific psuutil class - if platform_chassis is None: - try: - module_file = "/".join([PLATFORM_PATH, "plugins", PLATFORM_SPECIFIC_PSU_MODULE_NAME + ".py"]) - module = imp.load_source(PLATFORM_SPECIFIC_PSU_MODULE_NAME, module_file) - platform_psuutil_class = getattr(module, PLATFORM_SPECIFIC_PSU_CLASS_NAME) - platform_psuutil = platform_psuutil_class() - except Exception as e: - print("Failed to load psuutil due to {}".format(repr(e))) - - # Load platform-specific fanutil class - try: - module_file = "/".join([PLATFORM_PATH, "plugins", PLATFORM_SPECIFIC_FAN_MODULE_NAME + ".py"]) - module = imp.load_source(PLATFORM_SPECIFIC_FAN_MODULE_NAME, module_file) - platform_fanutil_class = getattr(module, PLATFORM_SPECIFIC_FAN_CLASS_NAME) - platform_fanutil = platform_fanutil_class() - except Exception as e: - print("Failed to load fanutil due to {}".format(repr(e))) - - assert (platform_chassis is not None) or (platform_psuutil is not None) or \ - (platform_fanutil is not None) , "Unable to load platform module" - - def _wrapper_get_num_temp(): _wrapper_init() if platform_chassis is not None: @@ -69,43 +36,64 @@ def _wrapper_get_temperature(index): pass return -255 -def _wrapper_get_fan_presence(index): + +def _wrapper_get_high_threshold(index): _wrapper_init() if platform_chassis is not None: try: - return platform_chassis.get_fan(index).get_presence() + return platform_chassis.get_thermal(index).get_high_threshold() except NotImplementedError: pass - return platform_fanutil.get_presence(index+1) + return -255 -def _wrapper_get_fan_duty(index): +def _wrapper_get_high_critical_threshold(index): _wrapper_init() if platform_chassis is not None: try: - return platform_chassis.get_fan(index).get_target_speed() + return platform_chassis.get_thermal(index).get_high_critical_threshold() except NotImplementedError: pass - return platform_fanutil.get_speed(index+1) + return -255 -def _wrapper_get_fan_direction(index): +def _wrapper_get_fan_name(index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_fan(index).get_direction() - except NotImplementedError: - pass - return platform_fanutil.get_direction(index+1) + try: + return platform_chassis.get_fan(index).get_name() + except NotImplementedError: + pass + +def _wrapper_get_fan_presence(index): + _wrapper_init() + if platform_chassis is not None: + try: + return platform_chassis.get_fan(index).get_presence() + except NotImplementedError: + pass -def _wrapper_get_psus_presence(index): +def _wrapper_get_fan_duty(index): _wrapper_init() if platform_chassis is not None: - try: - return platform_chassis.get_psu(index).get_presence() - except NotImplementedError: - pass - return platform_psuutil.get_psu_presence(index+1) + try: + return platform_chassis.get_fan(index).get_target_speed() + except NotImplementedError: + pass +def _wrapper_get_fan_direction(index): + _wrapper_init() + if platform_chassis is not None: + try: + return platform_chassis.get_fan(index).get_direction() + except NotImplementedError: + pass +def _wrapper_get_psu_presence(index): + _wrapper_init() + if platform_chassis is not None: + try: + return platform_chassis.get_psu(index).get_presence() + except NotImplementedError: + pass # test cases def test_for_num_temp(json_config_data): @@ -157,6 +145,57 @@ def test_for_temp_read(json_config_data): assert _wrapper_get_temperature(x) != -255, "tmp{}: invalid temperature".format(x) +def test_for_temp_high_threshold_read(json_config_data): + """Test Purpose: Verify that high threshold of each Temp sensors defined in the platform config + josn is able to read + + + Args: + arg1 (json): platform--config.json + + Example: + For a system that physically supports 4 TEMPs + + platform--config.json + { + "PLATFORM": { + num_temps": 4 + } + } + """ + if json_config_data['PLATFORM']['modules']['TEMP']['support'] == "false": + pytest.skip("Skip the testing due to the python module is not supported") + + for x in range(json_config_data['PLATFORM']['num_temps']): + print("tmp{}: {}".format(x, _wrapper_get_high_threshold(x))) + assert _wrapper_get_high_threshold(x) != -255, "tmp{}: invalid high threshold".format(x) + + +def test_for_temp_high_critical_threshold_read(json_config_data): + """Test Purpose: Verify that high critical threshold of each Temp sensors defined in the platform config + josn is able to read + + + Args: + arg1 (json): platform--config.json + + Example: + For a system that physically supports 4 TEMPs + + platform--config.json + { + "PLATFORM": { + num_temps": 4 + } + } + """ + if json_config_data['PLATFORM']['modules']['TEMP']['support'] == "false": + pytest.skip("Skip the testing due to the python module is not supported") + + for x in range(json_config_data['PLATFORM']['num_temps']): + print("tmp{}: {}".format(x, _wrapper_get_high_critical_threshold(x))) + assert _wrapper_get_high_critical_threshold(x) != -255, "tmp{}: invalid high critical threshold".format(x) + def test_for_thermal_daemon(json_config_data,json_test_data): if json_config_data['PLATFORM']['thermal_policy_support'] == "false": @@ -172,7 +211,7 @@ def test_for_thermal_daemon(json_config_data,json_test_data): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - cnt = pin.communicate()[0] + cnt = pin.communicate()[0].decode('ascii') cp1.append(int(cnt)) assert cp1, "flooding syslog [" + app + "] not detected" @@ -198,8 +237,8 @@ def test_for_thermal_policy(json_config_data,json_test_data): high = json_test_data['PLATFORM']['THERMAL_POLICY']['F2B'][str(index)][1] duty = json_test_data['PLATFORM']['THERMAL_POLICY']['F2B'][str(index)][2] - - if temp > high : + #When average temp bigger than high temp will jump to next policy table + if temp_avg > high : continue else : assert duty == _wrapper_get_fan_duty(x), \ @@ -228,11 +267,6 @@ def test_for_thermal_policy_fan_removed(json_config_data,json_test_data): if json_config_data['PLATFORM']['thermal_policy_support'] == "false": pytest.skip("Skip the testing due to thermal policy not supported") - - if _wrapper_init() is None: - pytest.skip("platform chassis not found") - return - duty = json_test_data['PLATFORM']['THERMAL_POLICY']['FAN_REMOVED_DUTY'] # test if any one of FAN removed @@ -275,8 +309,9 @@ def test_for_pmon_daemon(json_test_data): } """ cp1 = [] + cp2 = [] pat = json_test_data["PLATFORM"]["PMON"]["syslog"] - + for idx in range(len(pat)): cmd = "cat /var/log/syslog | grep -c -i " + "'" + pat[idx] + \ " entered RUNNING state" + "'" @@ -285,11 +320,11 @@ def test_for_pmon_daemon(json_test_data): close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - cnt = pin.communicate()[0] + cnt = pin.communicate()[0].decode('ascii') cp1.append(int(cnt)) - for idx1 in range(len(pat)): - assert cp1[idx1] != 0, "pmon syslog [" + pat[idx1] + "] not detected" + for index in range(len(pat)): + assert cp1[index] != 0, "pmon syslog [" + pat[index] + "] not detected" def test_for_pmon_fan_removed_event_log(json_config_data,json_test_data): @@ -309,14 +344,14 @@ def test_for_pmon_fan_removed_event_log(json_config_data,json_test_data): for x in range(json_config_data['PLATFORM']['num_fans']): if _wrapper_get_fan_presence(x) == False: Fan_removed = True - cmd = "cat /var/log/syslog | grep -c -i " + "'" + "FAN " + str(x+1) + \ - " removed" + "'" + fan_name = _wrapper_get_fan_name(x) + cmd = "cat /var/log/syslog | grep -c -i " + "'" + fan_name + " .*removed" + "'" pin = subprocess.Popen(cmd, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - cnt = pin.communicate()[0] + cnt = pin.communicate()[0].decode('ascii') assert int(cnt) != 0,\ "FAN" + str(x+1)+ "removed event log not detected" @@ -338,16 +373,15 @@ def test_for_pmon_psu_removed_event_log(json_config_data,json_test_data): Psu_removed = False for x in range(json_config_data['PLATFORM']['num_psus']): - if _wrapper_get_psus_presence(x) == False: - Psu_removed == True - cmd = "cat /var/log/syslog | grep -c -i " + "'" + "PSU " + str(x+1) + \ - " removed" + "'" + if _wrapper_get_psu_presence(x) == False: + Psu_removed = True + cmd = "cat /var/log/syslog | grep -c -i " + "'" + "PSU " + str(x+1) + " .*is not present." + "'" pin = subprocess.Popen(cmd, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - cnt = pin.communicate()[0] + cnt = pin.communicate()[0].decode('ascii') assert int(cnt) != 0,\ "PSU" + str(x+1)+ "removed event log not detected" diff --git a/sonic-pde-tests/sonic_pde_tests/test_watchdog.py b/sonic-pde-tests/sonic_pde_tests/test_watchdog.py new file mode 100644 index 0000000..c8796c7 --- /dev/null +++ b/sonic-pde-tests/sonic_pde_tests/test_watchdog.py @@ -0,0 +1,136 @@ +import pytest +import sys +import imp +import subprocess +import time + +VERSION = "1.0" +WATCHDOG_LOAD_ERROR = -1 +CHASSIS_LOAD_ERROR = -2 +WDT_COMMON_ERROR = -1 + +# Global platform watchdog class instance +platform_watchdog = None +platform_chassis = None + +# ==================== Methods for initialization ==================== + +# Loads platform specific watchdog module from source +def load_platform_watchdog(): + global platform_chassis + global platform_watchdog + + # Load 2.0 Watchdog class + if platform_chassis is None: + try: + import sonic_platform.platform + platform_chassis = sonic_platform.platform.Platform().get_chassis() + platform_watchdog = platform_chassis.get_watchdog() + + except Exception as e: + print("Failed to load 2.0 watchdog API due to {}".format(repr(e))) + +def _wrapper_get_watchdog_dev(): + load_platform_watchdog() + if platform_watchdog is not None: + try: + return platform_watchdog._get_wdt() + except NotImplementedError: + pass + +def _wrapper_arm_watchdog(sec): + load_platform_watchdog() + if platform_watchdog is not None: + try: + return platform_watchdog.arm(sec) + except NotImplementedError: + pass + +def _wrapper_get_watchdog_remaining_time(): + load_platform_watchdog() + if platform_watchdog is not None: + try: + return platform_watchdog.get_remaining_time() + except NotImplementedError: + pass + + +def _wrapper_watchdog_is_armed(): + load_platform_watchdog() + if platform_watchdog is not None: + try: + return platform_watchdog.is_armed() + except NotImplementedError: + pass + +def _wrapper_watchdog_disarm(): + load_platform_watchdog() + if platform_watchdog is not None: + try: + return platform_watchdog.disarm() + except NotImplementedError: + pass + +def test_for_get_watchdog_dev(): + """ + Test Purpose: Verify the watchdog dev can be get + + Args: + None + """ + assert _wrapper_get_watchdog_dev() != (0, None) , "watch dev fail to get" + +def test_for_arm_watchdog(json_config_data, json_test_data): + """ + Test Purpose: Verify the watchdog can be armed + + Args: + arg1 (json): platform--config.json + arg2 (json): test--config.json + + Example: + Test config JSON setting for Watchdog arm Timeout configured + + "WDT": + { + "arm":{ + "timeout":190 + } + } + """ + sec = json_test_data['PLATFORM']['WDT']['arm']['timeout'] + if _wrapper_arm_watchdog(sec) != WDT_COMMON_ERROR: + assert _wrapper_watchdog_is_armed() == True, "watchdog is not active" + +def test_for_get_watchdog_remaining_time(): + """ + Test Purpose: Verify the watchdog remaining time can be retrieved + + Args: + None + """ + assert _wrapper_get_watchdog_remaining_time() is not WDT_COMMON_ERROR, "get watchdog remaining time fail" + +def test_for_remaining_time_changed(): + """ + Test Purpose: Verify the watchdog remaining time is keep changing ( Couting down ) + + Args: + None + """ + Time1 = _wrapper_get_watchdog_remaining_time(); + time.sleep(5) + Time2 = _wrapper_get_watchdog_remaining_time(); + assert Time1 != Time2, "Watchdog remaining time is the same without changed" + + + +def test_for_disarm_watchdog(): + """ + Test Purpose: Verify the watchdog can be disarmed + + Args: + None + """ + if _wrapper_watchdog_disarm() == True: + assert _wrapper_watchdog_is_armed() == False, "watchdog is not disabled"