Skip to content

Commit

Permalink
Add test case for gnmi subscription of bmp table
Browse files Browse the repository at this point in the history
  • Loading branch information
FengPan-Frank committed Feb 20, 2025
1 parent a172d78 commit ecb1a03
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions tests/bmp/test_bmp_statedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re
import json
from bmp.helper import enable_bmp_neighbor_table, enable_bmp_rib_in_table, enable_bmp_rib_out_table
from netaddr import valid_ipv4

logger = logging.getLogger(__name__)

Expand All @@ -12,6 +13,17 @@
pytest.mark.disable_loganalyzer
]

def get_mgmt_ip(duthost):
config_facts = duthost.get_running_config_facts()
mgmt_interfaces = config_facts.get("MGMT_INTERFACE", {})
mgmt_ip = None

for mgmt_interface, ip_configs in mgmt_interfaces.items():
for ip_addr_with_prefix in ip_configs.keys():
ip_addr = ip_addr_with_prefix.split("/")[0]
if valid_ipv4(ip_addr):
mgmt_ip = ip_addr
return mgmt_ip

def check_dut_bmp_neighbor_status(duthost, neighbor_addr, expected_state, max_attempts=12, retry_interval=3):
for i in range(max_attempts + 1):
Expand Down Expand Up @@ -126,3 +138,25 @@ def test_bmp_population(duthosts, rand_one_dut_hostname, localhost):
enable_bmp_rib_out_table(duthost)
for idx, neighbor_v6addr in enumerate(neighbor_v6addrs):
check_dut_bmp_rib_out_status(duthost, neighbor_v6addr)


def test_bmp_gnmi_subscription(duthosts, rand_one_dut_hostname, localhost):
duthost = duthosts[rand_one_dut_hostname]

# neighbor table - ipv4 neighbor
# only pick-up sent_cap attributes for typical check first.
enable_bmp_neighbor_table(duthost)
neighbor_addrs = get_neighbors(duthost)
for idx, neighbor_addr in enumerate(neighbor_addrs):
check_dut_bmp_neighbor_status(duthost, neighbor_addr, "sent_cap")

# neighbor table subscription
dut_ip = get_mgmt_ip(duthost)
cmd = "~/gnmi_get -xpath_target BMP_STATE_DB -logtostderr -insecure -v 2 -qt s -q \
\"BGP_NEIGHBOR_TABLE\" -streaming_type ON_CHANGE"

gnmi_out = duthost.shell(cmd)['stdout']
result = str(gnmi_out)
inerrors_match = re.search("BGP_NEIGHBOR_TABLE", result)
assert(inerrors_match is not None,
"BGP_NEIGHBOR_TABLE not found in gnmi output")

0 comments on commit ecb1a03

Please sign in to comment.