diff --git a/tests/test_acl.py b/tests/test_acl.py index 4ab939fe49..cf7a0ce114 100644 --- a/tests/test_acl.py +++ b/tests/test_acl.py @@ -779,3 +779,116 @@ def test_V6AclTableDeletion(self, dvs): keys = atbl.getKeys() # only the default table was left assert len(keys) == 1 + + #helper function to verify if rule exists + def check_rule_existence(self, entry, rules, verifs): + for rule in rules: + ruleD = dict(rule) + #find the rule to match with based on priority + if ruleD["PRIORITY"] == entry['SAI_ACL_ENTRY_ATTR_PRIORITY']: + ruleIndex = rules.index(rule) + #use verification dictionary to match entry to rule + for key in verifs[ruleIndex]: + assert verifs[ruleIndex][key] == entry[key] + #found the rule + return True + #did not find the rule + return False + + def test_InsertAclRuleBetweenPriorities(self, dvs): + db = swsscommon.DBConnector(4, dvs.redis_sock, 0) + adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) + + bind_ports = ["Ethernet0", "Ethernet4"] + # create ACL_TABLE in config db + tbl = swsscommon.Table(db, "ACL_TABLE") + fvs = swsscommon.FieldValuePairs([("policy_desc", "test"), ("type", "L3"), ("ports", ",".join(bind_ports))]) + tbl.set("test_insert", fvs) + + time.sleep(2) + + num_rules = 0 + #create ACL rules + tbl = swsscommon.Table(db, "ACL_RULE") + rules = [ [("PRIORITY", "10"), ("PACKET_ACTION", "DROP"), ("SRC_IP", "10.0.0.0/32")], + [("PRIORITY", "20"), ("PACKET_ACTION", "DROP"), ("DST_IP", "104.44.94.0/23")], + [("PRIORITY", "30"), ("PACKET_ACTION", "DROP"), ("DST_IP", "192.168.0.16/32")], + [("PRIORITY", "40"), ("PACKET_ACTION", "FORWARD"), ("DST_IP", "100.64.0.0/10")] ] + #used to verify how ACL rules are programmed in ASICDB + verifs = [ {'SAI_ACL_ENTRY_ATTR_PRIORITY': '10', + 'SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP': '10.0.0.0&mask:255.255.255.255', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_DROP'}, + {'SAI_ACL_ENTRY_ATTR_PRIORITY': '20', + 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '104.44.94.0&mask:255.255.254.0', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_DROP'}, + {'SAI_ACL_ENTRY_ATTR_PRIORITY': '30', + 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '192.168.0.16&mask:255.255.255.255', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_DROP'}, + {'SAI_ACL_ENTRY_ATTR_PRIORITY': '40', + 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '100.64.0.0&mask:255.192.0.0', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'} ] + #insert rules + for rule in rules: + fvs = swsscommon.FieldValuePairs(rule) + num_rules += 1 + tbl.set( "test_insert|acl_test_rule%s" % num_rules, fvs) + + time.sleep(1) + + atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") + keys = atbl.getKeys() + + #assert that first set of rules are programmed + acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] + assert len(acl_entry) == num_rules + + #insert new rule with odd priority + tbl = swsscommon.Table(db, "ACL_RULE") + insertrule = [("PRIORITY", "21"), ("PACKET_ACTION", "DROP"), ("ETHER_TYPE", "4660")] + #create verification for that rule + verifs.append({'SAI_ACL_ENTRY_ATTR_PRIORITY': '21', + 'SAI_ACL_ENTRY_ATTR_FIELD_ETHER_TYPE': '4660&mask:0xffff', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_DROP'}) + rules.append(insertrule) + fvs = swsscommon.FieldValuePairs(insertrule) + num_rules += 1 + tbl.set("test_insert|acl_test_rule%s" % num_rules, fvs) + + time.sleep(1) + + #assert all rules are programmed + keys = atbl.getKeys() + acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] + assert len(acl_entry) == num_rules + + #match each entry to its corresponding verification + matched_rules = 0 + for entry in acl_entry: + (status, fvs) = atbl.get(entry) + assert status == True + assert len(fvs) == 6 + #helper function + if self.check_rule_existence(dict(fvs), rules, verifs): + matched_rules += 1 + + assert num_rules == matched_rules + + #cleanup + while num_rules > 0: + tbl._del("test_insert|acl_test_rule%s" % num_rules) + num_rules -= 1 + + time.sleep(1) + + (status, fvs) = atbl.get(acl_entry[0]) + assert status == False + + tbl = swsscommon.Table(db, "ACL_TABLE") + tbl._del("test_insert") + + time.sleep(1) + + atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") + keys = atbl.getKeys() + # only the default table was left + assert len(keys) == 1