Skip to content

Commit

Permalink
fix dual-tor test case failure based on latest code behavior (#9842)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcaiMR authored and mssonicbld committed Sep 12, 2023
1 parent d441c96 commit ba8953c
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 93 deletions.
12 changes: 10 additions & 2 deletions ansible/roles/test/files/ptftests/py3/dhcpv6_counter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def setUp(self):
self.dut_mac = self.test_params['dut_mac']
self.vlan_ip = self.test_params['vlan_ip']
self.client_mac = self.dataplane.get_mac(0, self.client_port_index)
self.loopback_ipv6 = self.test_params['loopback_ipv6']
self.is_dualtor = True if self.test_params['is_dualtor'] == 'True' else False
self.reference = 0

def generate_client_interace_ipv6_link_local_address(self, client_port_index):
Expand Down Expand Up @@ -131,7 +133,10 @@ def create_malformed_client_packet(self, message):

def create_server_packet(self, message):
packet = ptf.packet.Ether(dst=self.dut_mac)
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
if self.is_dualtor:
packet /= IPv6(src=self.server_ip, dst=self.loopback_ipv6)
else:
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
packet /= ptf.packet.UDP(sport=self.DHCP_SERVER_PORT,
dport=self.DHCP_SERVER_PORT)
packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip,
Expand All @@ -147,7 +152,10 @@ def create_server_packet(self, message):

def create_unknown_server_packet(self):
packet = ptf.packet.Ether(dst=self.dut_mac)
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
if self.is_dualtor:
packet /= IPv6(src=self.server_ip, dst=self.loopback_ipv6)
else:
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
packet /= ptf.packet.UDP(sport=self.DHCP_SERVER_PORT,
dport=self.DHCP_SERVER_PORT)
packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip,
Expand Down
114 changes: 43 additions & 71 deletions ansible/roles/test/files/ptftests/py3/dhcpv6_relay_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import ast
import socket
import subprocess
import scapy
# Packet Test Framework imports
Expand Down Expand Up @@ -27,7 +28,7 @@
DHCP6OptOptReq = scapy.layers.dhcp6.DHCP6OptOptReq
DHCP6OptElapsedTime = scapy.layers.dhcp6.DHCP6OptElapsedTime
DHCP6OptIA_NA = scapy.layers.dhcp6.DHCP6OptIA_NA
DUID_LLT = scapy.layers.dhcp6.DUID_LLT
DUID_LL = scapy.layers.dhcp6.DUID_LL
DHCP6OptIfaceId = scapy.layers.dhcp6.DHCP6OptIfaceId
DHCP6OptServerId = scapy.layers.dhcp6.DHCP6OptServerId

Expand Down Expand Up @@ -123,11 +124,11 @@ def setUp(self):
self.relay_iface_mac = self.test_params['relay_iface_mac']
self.relay_link_local = self.test_params['relay_link_local']
self.relay_linkaddr = '::'

self.vlan_ip = self.test_params['vlan_ip']

self.client_mac = self.dataplane.get_mac(0, self.client_port_index)
self.uplink_mac = self.test_params['uplink_mac']
self.loopback_ipv6 = self.test_params['loopback_ipv6']
self.is_dualtor = True if self.test_params['is_dualtor'] == 'True' else False

def generate_client_interace_ipv6_link_local_address(self, client_port_index):
# Shutdown and startup the client interface to generate a proper IPv6 link-local address
Expand Down Expand Up @@ -162,31 +163,27 @@ def create_dhcp_solicit_packet(self):
dport=self.DHCP_SERVER_PORT)
solicit_packet /= DHCP6_Solicit(trid=12345)
solicit_packet /= DHCP6OptClientId(
duid=DUID_LLT(lladdr=self.client_mac))
duid=DUID_LL(lladdr=self.client_mac))
solicit_packet /= DHCP6OptIA_NA()
solicit_packet /= DHCP6OptOptReq(reqopts=[23, 24, 29])
solicit_packet /= DHCP6OptElapsedTime(elapsedtime=0)

return solicit_packet

# order: relay forward option list sequence
# 0 - increase order, 1 - decrease order
def create_dhcp_solicit_relay_forward_packet(self, order):
def create_dhcp_solicit_relay_forward_packet(self):
solicit_relay_forward_packet = packet.Ether(src=self.uplink_mac)
solicit_relay_forward_packet /= IPv6()
solicit_relay_forward_packet /= packet.UDP(
sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
solicit_relay_forward_packet /= DHCP6_RelayForward(msgtype=12, linkaddr=self.vlan_ip,
peeraddr=self.client_link_local)
if order == 1:
solicit_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()

solicit_relay_forward_packet /= DHCP6OptRelayMsg(message=[DHCP6_Solicit(trid=12345) /
DHCP6OptClientId(duid=DUID_LLT(lladdr=self.client_mac)) /
DHCP6OptClientId(duid=DUID_LL(lladdr=self.client_mac)) /
DHCP6OptIA_NA()/DHCP6OptOptReq(reqopts=[23, 24, 29]) /
DHCP6OptElapsedTime(elapsedtime=0)])
if order == 0:
solicit_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()
if self.is_dualtor:
solicit_relay_forward_packet /= DHCP6OptIfaceId(ifaceid=socket.inet_pton(socket.AF_INET6, self.vlan_ip))
solicit_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()

return solicit_relay_forward_packet

Expand All @@ -203,8 +200,10 @@ def create_dhcp_advertise_packet(self):

def create_dhcp_advertise_relay_reply_packet(self):
advertise_relay_reply_packet = packet.Ether(dst=self.uplink_mac)
advertise_relay_reply_packet /= IPv6(
src=self.server_ip, dst=self.relay_iface_ip)
if self.is_dualtor:
advertise_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.loopback_ipv6)
else:
advertise_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
advertise_relay_reply_packet /= packet.UDP(
sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
advertise_relay_reply_packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip,
Expand All @@ -224,22 +223,18 @@ def create_dhcp_request_packet(self):

return request_packet

# order: relay forward option list sequence
# 0 - increase order, 1 - decrease order
def create_dhcp_request_relay_forward_packet(self, order):
def create_dhcp_request_relay_forward_packet(self):
request_relay_forward_packet = packet.Ether(src=self.uplink_mac)
request_relay_forward_packet /= IPv6()
request_relay_forward_packet /= packet.UDP(
sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
request_relay_forward_packet /= DHCP6_RelayForward(msgtype=12, linkaddr=self.vlan_ip,
peeraddr=self.client_link_local)
if order == 1:
request_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()

request_relay_forward_packet /= DHCP6OptRelayMsg(
message=[DHCP6_Request(trid=12345)])
if order == 0:
request_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()
if self.is_dualtor:
request_relay_forward_packet /= DHCP6OptIfaceId(ifaceid=socket.inet_pton(socket.AF_INET6, self.vlan_ip))
request_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()

return request_relay_forward_packet

Expand All @@ -256,15 +251,16 @@ def create_dhcp_reply_packet(self):

def create_dhcp_reply_relay_reply_packet(self):
reply_relay_reply_packet = packet.Ether(dst=self.uplink_mac)
reply_relay_reply_packet /= IPv6(src=self.server_ip,
dst=self.relay_iface_ip)
if self.is_dualtor:
reply_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.loopback_ipv6)
else:
reply_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
reply_relay_reply_packet /= packet.UDP(
sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
reply_relay_reply_packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip,
peeraddr=self.client_link_local)
reply_relay_reply_packet /= DHCP6OptRelayMsg(
message=[DHCP6_Reply(trid=12345)])
reply_relay_reply_packet /= DHCP6OptIfaceId(ifaceid=self.vlan_ip)

return reply_relay_reply_packet

Expand Down Expand Up @@ -295,6 +291,8 @@ def create_dhcp_relayed_relay_packet(self):
relayed_relay_packet /= DHCP6_RelayForward(msgtype=12, hopcount=1, linkaddr=self.relay_linkaddr,
peeraddr=self.client_link_local)
relayed_relay_packet /= DHCP6OptRelayMsg(message=[packet_inside])
if self.is_dualtor:
relayed_relay_packet /= DHCP6OptIfaceId(ifaceid=socket.inet_pton(socket.AF_INET6, self.vlan_ip))

return relayed_relay_packet

Expand All @@ -309,7 +307,7 @@ def create_dhcp_relay_relay_reply_packet(self):
packet_inside = DHCP6_RelayReply(
msgtype=13, linkaddr=self.vlan_ip, peeraddr=self.client_link_local)
packet_inside /= DHCP6OptRelayMsg(message=[DHCP6_Reply(trid=12345)])
relay_relay_reply_packet /= DHCP6OptServerId(duid=DUID_LLT(lladdr="00:11:22:33:44:55"))
relay_relay_reply_packet /= DHCP6OptServerId(duid=DUID_LL(lladdr="00:11:22:33:44:55"))
relay_relay_reply_packet /= DHCP6OptRelayMsg(message=[packet_inside])

return relay_relay_reply_packet
Expand Down Expand Up @@ -341,10 +339,9 @@ def client_send_solicit(self):

# Verify that the DHCP relay actually received and relayed the DHCPv6 SOLICIT message to all of
# its known DHCP servers.
def verify_relayed_solicit_relay_forward(self, try_count=0):
def verify_relayed_solicit_relay_forward(self):
# Create a packet resembling a DHCPv6 RELAY-FORWARD encapsulating SOLICIT packet
solicit_relay_forward_packet = self.create_dhcp_solicit_relay_forward_packet(
order=try_count)
solicit_relay_forward_packet = self.create_dhcp_solicit_relay_forward_packet()

# Mask off fields we don't care about matching
masked_packet = Mask(solicit_relay_forward_packet)
Expand All @@ -362,18 +359,8 @@ def verify_relayed_solicit_relay_forward(self, try_count=0):
masked_packet.set_do_not_care_scapy(
DHCP6OptClientLinkLayerAddr, "clladdr")

# Count the number of these packets received on the ports connected to our leaves
solicit_count = testutils.count_matched_packets_all_ports(self, masked_packet,
self.server_port_indices, timeout=4.0)

if try_count == 0:
if solicit_count >= 1:
return True
return False

self.assertTrue(solicit_count >= 1,
"Failed: Solicit count of %d" % solicit_count)
return True
# verify packets received on the ports connected to our leaves
testutils.verify_packet_any_port(self, masked_packet, self.server_port_indices)

# Simulate a DHCP server sending a DHCPv6 RELAY-REPLY encapsulating ADVERTISE packet message to client.
# We do this by injecting a RELAY-REPLY encapsulating ADVERTISE message on the link connected to one
Expand All @@ -394,8 +381,7 @@ def verify_relayed_advertise(self):
# Mask off fields we don't care about matching
masked_packet = Mask(advertise_packet)
masked_packet.set_do_not_care_scapy(IPv6, "fl")
# dual tor uses relay_iface_ip as ip src
masked_packet.set_do_not_care_scapy(IPv6, "src")
# dual tor uses loopback0 ipv6 address as source
masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
masked_packet.set_do_not_care_scapy(packet.UDP, "len")

Expand All @@ -410,10 +396,9 @@ def client_send_request(self):

# Verify that the DHCP relay actually received and relayed the DHCPv6 REQUEST message to all of
# its known DHCP servers.
def verify_relayed_request_relay_forward(self, try_count=0):
def verify_relayed_request_relay_forward(self):
# Create a packet resembling a DHCPv6 RELAY-FORWARD encapsulating REQUEST packet
request_relay_forward_packet = self.create_dhcp_request_relay_forward_packet(
try_count)
request_relay_forward_packet = self.create_dhcp_request_relay_forward_packet()

# Mask off fields we don't care about matching
masked_packet = Mask(request_relay_forward_packet)
Expand All @@ -431,18 +416,8 @@ def verify_relayed_request_relay_forward(self, try_count=0):
masked_packet.set_do_not_care_scapy(
DHCP6OptClientLinkLayerAddr, "clladdr")

# Count the number of these packets received on the ports connected to our leaves
request_count = testutils.count_matched_packets_all_ports(self, masked_packet,
self.server_port_indices, timeout=4.0)

if try_count == 0:
if request_count >= 1:
return True
return False

self.assertTrue(request_count >= 1,
"Failed: Request count of %d" % request_count)
return True
# verify packets received on the ports connected to our leaves
testutils.verify_packet_any_port(self, masked_packet, self.server_port_indices)

# Simulate a DHCP server sending a DHCPv6 RELAY-REPLY encapsulating REPLY packet message to client.
def server_send_reply_relay_reply(self):
Expand All @@ -461,8 +436,7 @@ def verify_relayed_reply(self):
# Mask off fields we don't care about matching
masked_packet = Mask(reply_packet)
masked_packet.set_do_not_care_scapy(IPv6, "fl")
# dual tor uses relay_iface_ip as ip src
masked_packet.set_do_not_care_scapy(IPv6, "src")
# dual tor uses loopback0 ipv6 address as source
masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
masked_packet.set_do_not_care_scapy(packet.UDP, "len")

Expand Down Expand Up @@ -515,30 +489,28 @@ def verify_relay_relay_reply(self):
# Mask off fields we don't care about matching
masked_packet = Mask(relay_reply_packet)
masked_packet.set_do_not_care_scapy(IPv6, "fl")
# dual tor uses relay_iface_ip as ip src
masked_packet.set_do_not_care_scapy(IPv6, "src")
# dual tor uses loopback0 ipv6 address as source
masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
masked_packet.set_do_not_care_scapy(packet.UDP, "len")

# NOTE: verify_packet() will fail for us via an assert, so no need to check a return value here
testutils.verify_packet(self, masked_packet, self.client_port_index)

def runTest(self):
for x in range(2):
self.client_send_solicit()
if self.verify_relayed_solicit_relay_forward(x):
break
self.client_send_solicit()
self.verify_relayed_solicit_relay_forward()

self.server_send_advertise_relay_reply()
self.verify_relayed_advertise()
for x in range(2):
self.client_send_request()
if self.verify_relayed_request_relay_forward(x):
break

self.client_send_request()
self.verify_relayed_request_relay_forward()

self.server_send_reply_relay_reply()
self.verify_relayed_reply()

self.client_send_relayed_relay_forward()
self.verify_relayed_relay_forward()

self.server_send_relay_relay_reply()
self.verify_relay_relay_reply()
Loading

0 comments on commit ba8953c

Please sign in to comment.