Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[action] [PR:9842] Fix dual-tor dhcpv6 relay test cases failure in latest image #9941

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions ansible/roles/test/files/ptftests/py3/dhcpv6_counter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def setUp(self):
self.dut_mac = self.test_params['dut_mac']
self.vlan_ip = self.test_params['vlan_ip']
self.client_mac = self.dataplane.get_mac(0, self.client_port_index)
self.loopback_ipv6 = self.test_params['loopback_ipv6']
self.is_dualtor = True if self.test_params['is_dualtor'] == 'True' else False
self.reference = 0

def generate_client_interace_ipv6_link_local_address(self, client_port_index):
Expand Down Expand Up @@ -131,7 +133,10 @@ def create_malformed_client_packet(self, message):

def create_server_packet(self, message):
packet = ptf.packet.Ether(dst=self.dut_mac)
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
if self.is_dualtor:
packet /= IPv6(src=self.server_ip, dst=self.loopback_ipv6)
else:
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
packet /= ptf.packet.UDP(sport=self.DHCP_SERVER_PORT,
dport=self.DHCP_SERVER_PORT)
packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip,
Expand All @@ -147,7 +152,10 @@ def create_server_packet(self, message):

def create_unknown_server_packet(self):
packet = ptf.packet.Ether(dst=self.dut_mac)
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
if self.is_dualtor:
packet /= IPv6(src=self.server_ip, dst=self.loopback_ipv6)
else:
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
packet /= ptf.packet.UDP(sport=self.DHCP_SERVER_PORT,
dport=self.DHCP_SERVER_PORT)
packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip,
Expand Down
114 changes: 43 additions & 71 deletions ansible/roles/test/files/ptftests/py3/dhcpv6_relay_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import ast
import socket
import subprocess
import scapy
# Packet Test Framework imports
Expand Down Expand Up @@ -27,7 +28,7 @@
DHCP6OptOptReq = scapy.layers.dhcp6.DHCP6OptOptReq
DHCP6OptElapsedTime = scapy.layers.dhcp6.DHCP6OptElapsedTime
DHCP6OptIA_NA = scapy.layers.dhcp6.DHCP6OptIA_NA
DUID_LLT = scapy.layers.dhcp6.DUID_LLT
DUID_LL = scapy.layers.dhcp6.DUID_LL
DHCP6OptIfaceId = scapy.layers.dhcp6.DHCP6OptIfaceId
DHCP6OptServerId = scapy.layers.dhcp6.DHCP6OptServerId

Expand Down Expand Up @@ -123,11 +124,11 @@ def setUp(self):
self.relay_iface_mac = self.test_params['relay_iface_mac']
self.relay_link_local = self.test_params['relay_link_local']
self.relay_linkaddr = '::'

self.vlan_ip = self.test_params['vlan_ip']

self.client_mac = self.dataplane.get_mac(0, self.client_port_index)
self.uplink_mac = self.test_params['uplink_mac']
self.loopback_ipv6 = self.test_params['loopback_ipv6']
self.is_dualtor = True if self.test_params['is_dualtor'] == 'True' else False

def generate_client_interace_ipv6_link_local_address(self, client_port_index):
# Shutdown and startup the client interface to generate a proper IPv6 link-local address
Expand Down Expand Up @@ -162,31 +163,27 @@ def create_dhcp_solicit_packet(self):
dport=self.DHCP_SERVER_PORT)
solicit_packet /= DHCP6_Solicit(trid=12345)
solicit_packet /= DHCP6OptClientId(
duid=DUID_LLT(lladdr=self.client_mac))
duid=DUID_LL(lladdr=self.client_mac))
solicit_packet /= DHCP6OptIA_NA()
solicit_packet /= DHCP6OptOptReq(reqopts=[23, 24, 29])
solicit_packet /= DHCP6OptElapsedTime(elapsedtime=0)

return solicit_packet

# order: relay forward option list sequence
# 0 - increase order, 1 - decrease order
def create_dhcp_solicit_relay_forward_packet(self, order):
def create_dhcp_solicit_relay_forward_packet(self):
solicit_relay_forward_packet = packet.Ether(src=self.uplink_mac)
solicit_relay_forward_packet /= IPv6()
solicit_relay_forward_packet /= packet.UDP(
sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
solicit_relay_forward_packet /= DHCP6_RelayForward(msgtype=12, linkaddr=self.vlan_ip,
peeraddr=self.client_link_local)
if order == 1:
solicit_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()

solicit_relay_forward_packet /= DHCP6OptRelayMsg(message=[DHCP6_Solicit(trid=12345) /
DHCP6OptClientId(duid=DUID_LLT(lladdr=self.client_mac)) /
DHCP6OptClientId(duid=DUID_LL(lladdr=self.client_mac)) /
DHCP6OptIA_NA()/DHCP6OptOptReq(reqopts=[23, 24, 29]) /
DHCP6OptElapsedTime(elapsedtime=0)])
if order == 0:
solicit_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()
if self.is_dualtor:
solicit_relay_forward_packet /= DHCP6OptIfaceId(ifaceid=socket.inet_pton(socket.AF_INET6, self.vlan_ip))
solicit_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()

return solicit_relay_forward_packet

Expand All @@ -203,8 +200,10 @@ def create_dhcp_advertise_packet(self):

def create_dhcp_advertise_relay_reply_packet(self):
advertise_relay_reply_packet = packet.Ether(dst=self.uplink_mac)
advertise_relay_reply_packet /= IPv6(
src=self.server_ip, dst=self.relay_iface_ip)
if self.is_dualtor:
advertise_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.loopback_ipv6)
else:
advertise_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
advertise_relay_reply_packet /= packet.UDP(
sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
advertise_relay_reply_packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip,
Expand All @@ -224,22 +223,18 @@ def create_dhcp_request_packet(self):

return request_packet

# order: relay forward option list sequence
# 0 - increase order, 1 - decrease order
def create_dhcp_request_relay_forward_packet(self, order):
def create_dhcp_request_relay_forward_packet(self):
request_relay_forward_packet = packet.Ether(src=self.uplink_mac)
request_relay_forward_packet /= IPv6()
request_relay_forward_packet /= packet.UDP(
sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
request_relay_forward_packet /= DHCP6_RelayForward(msgtype=12, linkaddr=self.vlan_ip,
peeraddr=self.client_link_local)
if order == 1:
request_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()

request_relay_forward_packet /= DHCP6OptRelayMsg(
message=[DHCP6_Request(trid=12345)])
if order == 0:
request_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()
if self.is_dualtor:
request_relay_forward_packet /= DHCP6OptIfaceId(ifaceid=socket.inet_pton(socket.AF_INET6, self.vlan_ip))
request_relay_forward_packet /= DHCP6OptClientLinkLayerAddr()

return request_relay_forward_packet

Expand All @@ -256,15 +251,16 @@ def create_dhcp_reply_packet(self):

def create_dhcp_reply_relay_reply_packet(self):
reply_relay_reply_packet = packet.Ether(dst=self.uplink_mac)
reply_relay_reply_packet /= IPv6(src=self.server_ip,
dst=self.relay_iface_ip)
if self.is_dualtor:
reply_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.loopback_ipv6)
else:
reply_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
reply_relay_reply_packet /= packet.UDP(
sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
reply_relay_reply_packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip,
peeraddr=self.client_link_local)
reply_relay_reply_packet /= DHCP6OptRelayMsg(
message=[DHCP6_Reply(trid=12345)])
reply_relay_reply_packet /= DHCP6OptIfaceId(ifaceid=self.vlan_ip)

return reply_relay_reply_packet

Expand Down Expand Up @@ -295,6 +291,8 @@ def create_dhcp_relayed_relay_packet(self):
relayed_relay_packet /= DHCP6_RelayForward(msgtype=12, hopcount=1, linkaddr=self.relay_linkaddr,
peeraddr=self.client_link_local)
relayed_relay_packet /= DHCP6OptRelayMsg(message=[packet_inside])
if self.is_dualtor:
relayed_relay_packet /= DHCP6OptIfaceId(ifaceid=socket.inet_pton(socket.AF_INET6, self.vlan_ip))

return relayed_relay_packet

Expand All @@ -309,7 +307,7 @@ def create_dhcp_relay_relay_reply_packet(self):
packet_inside = DHCP6_RelayReply(
msgtype=13, linkaddr=self.vlan_ip, peeraddr=self.client_link_local)
packet_inside /= DHCP6OptRelayMsg(message=[DHCP6_Reply(trid=12345)])
relay_relay_reply_packet /= DHCP6OptServerId(duid=DUID_LLT(lladdr="00:11:22:33:44:55"))
relay_relay_reply_packet /= DHCP6OptServerId(duid=DUID_LL(lladdr="00:11:22:33:44:55"))
relay_relay_reply_packet /= DHCP6OptRelayMsg(message=[packet_inside])

return relay_relay_reply_packet
Expand Down Expand Up @@ -341,10 +339,9 @@ def client_send_solicit(self):

# Verify that the DHCP relay actually received and relayed the DHCPv6 SOLICIT message to all of
# its known DHCP servers.
def verify_relayed_solicit_relay_forward(self, try_count=0):
def verify_relayed_solicit_relay_forward(self):
# Create a packet resembling a DHCPv6 RELAY-FORWARD encapsulating SOLICIT packet
solicit_relay_forward_packet = self.create_dhcp_solicit_relay_forward_packet(
order=try_count)
solicit_relay_forward_packet = self.create_dhcp_solicit_relay_forward_packet()

# Mask off fields we don't care about matching
masked_packet = Mask(solicit_relay_forward_packet)
Expand All @@ -362,18 +359,8 @@ def verify_relayed_solicit_relay_forward(self, try_count=0):
masked_packet.set_do_not_care_scapy(
DHCP6OptClientLinkLayerAddr, "clladdr")

# Count the number of these packets received on the ports connected to our leaves
solicit_count = testutils.count_matched_packets_all_ports(self, masked_packet,
self.server_port_indices, timeout=4.0)

if try_count == 0:
if solicit_count >= 1:
return True
return False

self.assertTrue(solicit_count >= 1,
"Failed: Solicit count of %d" % solicit_count)
return True
# verify packets received on the ports connected to our leaves
testutils.verify_packet_any_port(self, masked_packet, self.server_port_indices)

# Simulate a DHCP server sending a DHCPv6 RELAY-REPLY encapsulating ADVERTISE packet message to client.
# We do this by injecting a RELAY-REPLY encapsulating ADVERTISE message on the link connected to one
Expand All @@ -394,8 +381,7 @@ def verify_relayed_advertise(self):
# Mask off fields we don't care about matching
masked_packet = Mask(advertise_packet)
masked_packet.set_do_not_care_scapy(IPv6, "fl")
# dual tor uses relay_iface_ip as ip src
masked_packet.set_do_not_care_scapy(IPv6, "src")
# dual tor uses loopback0 ipv6 address as source
masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
masked_packet.set_do_not_care_scapy(packet.UDP, "len")

Expand All @@ -410,10 +396,9 @@ def client_send_request(self):

# Verify that the DHCP relay actually received and relayed the DHCPv6 REQUEST message to all of
# its known DHCP servers.
def verify_relayed_request_relay_forward(self, try_count=0):
def verify_relayed_request_relay_forward(self):
# Create a packet resembling a DHCPv6 RELAY-FORWARD encapsulating REQUEST packet
request_relay_forward_packet = self.create_dhcp_request_relay_forward_packet(
try_count)
request_relay_forward_packet = self.create_dhcp_request_relay_forward_packet()

# Mask off fields we don't care about matching
masked_packet = Mask(request_relay_forward_packet)
Expand All @@ -431,18 +416,8 @@ def verify_relayed_request_relay_forward(self, try_count=0):
masked_packet.set_do_not_care_scapy(
DHCP6OptClientLinkLayerAddr, "clladdr")

# Count the number of these packets received on the ports connected to our leaves
request_count = testutils.count_matched_packets_all_ports(self, masked_packet,
self.server_port_indices, timeout=4.0)

if try_count == 0:
if request_count >= 1:
return True
return False

self.assertTrue(request_count >= 1,
"Failed: Request count of %d" % request_count)
return True
# verify packets received on the ports connected to our leaves
testutils.verify_packet_any_port(self, masked_packet, self.server_port_indices)

# Simulate a DHCP server sending a DHCPv6 RELAY-REPLY encapsulating REPLY packet message to client.
def server_send_reply_relay_reply(self):
Expand All @@ -461,8 +436,7 @@ def verify_relayed_reply(self):
# Mask off fields we don't care about matching
masked_packet = Mask(reply_packet)
masked_packet.set_do_not_care_scapy(IPv6, "fl")
# dual tor uses relay_iface_ip as ip src
masked_packet.set_do_not_care_scapy(IPv6, "src")
# dual tor uses loopback0 ipv6 address as source
masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
masked_packet.set_do_not_care_scapy(packet.UDP, "len")

Expand Down Expand Up @@ -515,30 +489,28 @@ def verify_relay_relay_reply(self):
# Mask off fields we don't care about matching
masked_packet = Mask(relay_reply_packet)
masked_packet.set_do_not_care_scapy(IPv6, "fl")
# dual tor uses relay_iface_ip as ip src
masked_packet.set_do_not_care_scapy(IPv6, "src")
# dual tor uses loopback0 ipv6 address as source
masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
masked_packet.set_do_not_care_scapy(packet.UDP, "len")

# NOTE: verify_packet() will fail for us via an assert, so no need to check a return value here
testutils.verify_packet(self, masked_packet, self.client_port_index)

def runTest(self):
for x in range(2):
self.client_send_solicit()
if self.verify_relayed_solicit_relay_forward(x):
break
self.client_send_solicit()
self.verify_relayed_solicit_relay_forward()

self.server_send_advertise_relay_reply()
self.verify_relayed_advertise()
for x in range(2):
self.client_send_request()
if self.verify_relayed_request_relay_forward(x):
break

self.client_send_request()
self.verify_relayed_request_relay_forward()

self.server_send_reply_relay_reply()
self.verify_relayed_reply()

self.client_send_relayed_relay_forward()
self.verify_relayed_relay_forward()

self.server_send_relay_relay_reply()
self.verify_relay_relay_reply()
Loading