Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend IP-in-IP test to check IPv6 in IPv4, IPv4 in IPv6, IPv6 in IP… #682

Merged
merged 2 commits into from
Aug 24, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 196 additions & 75 deletions ansible/roles/test/files/ptftests/IP_decap_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@
import fib

class DecapPacketTest(BaseTest):
""" IP in IP decapsulation test """

# Default source IP to use for inner packet
DEFAULT_INNER_V4_PKT_SRC_IP = '1.1.1.1'
DEFAULT_INNER_V6_PKT_SRC_IP = '1::1'

# Default source and destination IPs to use
# for triple encapsulated packets
DEFAULT_INNER2_V4_PKT_SRC_IP = '4.4.4.4'
DEFAULT_INNER2_V6_PKT_SRC_IP = '4::4'
DEFAULT_INNER2_V4_PKT_DST_IP = '3.3.3.3'
DEFAULT_INNER2_V6_PKT_DST_IP = '3::3'

def __init__(self):
'''
@summary: constructor
Expand All @@ -68,106 +81,214 @@ def setUp(self):
self.src_ports = [0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 36, 37, 38, 39, 40, 41, 42, 48, 52, 53, 54, 55, 56, 57, 58]
if self.test_params['testbed_type'] == 't0-116':
self.src_ports = range(0, 24) + range(32, 120)

# which type of tunneled trafic to test (IPv4 in IPv4, IPv6 in IPv4, IPv6 in IPv4, IPv6 in IPv6)
self.test_outer_ipv4 = self.test_params.get('outer_ipv4', True)
self.test_outer_ipv6 = self.test_params.get('outer_ipv6', True)
self.test_inner_ipv4 = self.test_params.get('inner_ipv4', True)
self.test_inner_ipv6 = self.test_params.get('inner_ipv6', True)

#-----------------------------------------------------------------

def send_and_verify(self, dst_ip, expected_ports, src_port, triple_encap = False):
'''
@summary: This function builds encap packet, send and verify their arrival.
@dst_ip: the destination ip for the inner IP header
@expected_ports: list of ports that a packet can arrived from
@src_port: the physical port that the packet will be sent from
@triple_encap: True to send triple encapsulated packet
'''
#setting parameters

def create_ipv4_inner_pkt_only(self, src_ip, dst_ip, tos, encap=False):
"""Creates an IP only packet for the test
@param src_ip: source ip
@param dst_ip: destination ip
@param tos: type of service field
@param encap: build encapsulated packet.
If @encap is True the return packet would be:
IP(@src_ip, @dst_ip, @tos) / IP(dst_ip=4.4.4.4, src_ip=3.3.3.3) / TCP()
"""

inner_pkt = simple_ip_only_packet(ip_dst=dst_ip, ip_src=src_ip, ip_ttl=64, ip_tos=tos)
if encap:
inner_pkt2 = self.create_ipv4_inner_pkt_only(self.DEFAULT_INNER2_V4_PKT_SRC_IP,
self.DEFAULT_INNER2_V4_PKT_DST_IP,
0)
inner_pkt = simple_ipv4ip_packet(ip_src=src_ip,
ip_dst=dst_ip,
ip_tos=tos,
ip_ttl=64,
inner_frame=inner_pkt2).getlayer(scapy.IP) # get only the IP layer

return inner_pkt

#-----------------------------------------------------------------

def create_ipv6_inner_pkt_only(self, src_ip, dst_ip, tc, encap=False):
"""Creates an IPv6 only packet for the test
@param src_ip: source ip
@param dst_ip: destination ip
@param tc: traffic class
@param encap: build encapsulated packet.
If @encap is True the return packet would be:
IP(@src_ip, @dst_ip, @tc) / IP(dst_ip=4::4, src_ip=3::3) / TCP()
"""

# no ptf function to build simple ipv6 only packet
# so use simple_tcpv6_packet function which builds the same packet
# with TCP header as simple_ip_only_packet but extract away Ethernet
inner_pkt = simple_tcpv6_packet(ipv6_dst=dst_ip, ipv6_src=src_ip, ipv6_hlim=64, ipv6_tc=tc).getlayer(scapy.IPv6)
if encap:
inner_pkt2 = self.create_ipv6_inner_pkt_only(self.DEFAULT_INNER2_V6_PKT_SRC_IP,
self.DEFAULT_INNER2_V6_PKT_DST_IP,
0)
inner_pkt = simple_ipv6ip_packet(ipv6_src=src_ip,
ipv6_dst=dst_ip,
ipv6_tc=tc,
ipv6_hlim=64,
inner_frame=inner_pkt2).getlayer(scapy.IPv6) # get only the IP layer

return inner_pkt

#-----------------------------------------------------------------

def create_encap_packet(self, dst_ip, outer_pkt='ipv4', triple_encap=False):
"""Creates an IPv4/IPv6 encapsulated packet in @outer_pkt packet
@param dst_ip: Destination IP for inner packet. Depending @dst_ip IPv4 or IPv6 packet will be created
@param outer_pkt: Outer packet type to encapsulate inner packet in (ipv4/ipv6)
@param triple_encap: Whether to build triple encapsulated packet
@return: built packet and expected packet to match after decapsulation"""

src_mac = self.dataplane.get_mac(0, 0)
dst_mac = '00:11:22:33:44:55'
inner_src_ip = '2.2.2.2'
router_mac = self.test_params['router_mac']
dscp_in = random.randint(0, 32)
tos_in = dscp_in << 2
# TC for IPv6, ToS for IPv4
tc_in = tos_in = dscp_in << 2
dscp_out = random.randint(0, 32)
tos_out = dscp_out << 2
tc_out = tos_out = dscp_out << 2
if ("pipe" == self.test_params['dscp_mode']):
exp_tos = tos_in
exp_tc = exp_tos = tc_in
elif("uniform" == self.test_params['dscp_mode']):
exp_tos = tos_out
exp_tc = exp_tos = tc_out
else:
print("ERROR: no dscp is configured")
exit()

default_packet_len = 100
default_packet_add_header_len = 114

#building the packets and the expected packets
if (not triple_encap):
#for the double encap packet we will use IP header with TCP header without mac
inner_pkt = simple_ip_only_packet(ip_dst=dst_ip, ip_src=inner_src_ip, ip_ttl=64, ip_tos=tos_in)
#after the decap process the retuning packet will be normal tcp packet, The TTL is taked from the inner layer and redused by one
exp_pkt = simple_tcp_packet(pktlen=default_packet_add_header_len,
eth_dst=dst_mac,
eth_src=router_mac,
ip_dst=dst_ip,
ip_src=inner_src_ip,
ip_tos=exp_tos,
ip_ttl=63)
if ipaddress.ip_address(unicode(dst_ip)).version == 6:
inner_src_ip = self.DEFAULT_INNER_V6_PKT_SRC_IP
# build inner packet, if triple_encap is True inner_pkt would be double encapsulated
inner_pkt = self.create_ipv6_inner_pkt_only(inner_src_ip, dst_ip, tos_in, triple_encap)

# build expected packet based on inner packet
# set the correct L2 fields
exp_pkt = scapy.Ether(dst=dst_mac, src=router_mac) / inner_pkt

# set expected TC value
exp_pkt['IPv6'].tc = exp_tc
# decrement TTL
exp_pkt['IPv6'].hlim -= 1
else:
inner_src_ip = self.DEFAULT_INNER_V4_PKT_SRC_IP
# build inner packet, if triple_encap is True inner_pkt would be double encapsulated
inner_pkt = self.create_ipv4_inner_pkt_only(inner_src_ip, dst_ip, tos_in, triple_encap)

# build expected packet based on inner packet
# set the correct L2 fields
exp_pkt = scapy.Ether(dst=dst_mac, src=router_mac) / inner_pkt

# set expected ToS value
exp_pkt['IP'].tos = exp_tos
# decrement TTL
exp_pkt['IP'].ttl -= 1


if outer_pkt == 'ipv4':
pkt = simple_ipv4ip_packet(
eth_dst=router_mac,
eth_src=src_mac,
ip_src='1.1.1.1',
ip_dst=self.test_params['lo_ip'],
ip_tos=tos_out,
ip_ttl=random.randint(2, 63),
inner_frame=inner_pkt)
elif outer_pkt == 'ipv6':
pkt = simple_ipv6ip_packet(
eth_dst=router_mac,
eth_src=src_mac,
ipv6_src='1::1',
ipv6_dst=self.test_params['lo_ipv6'],
ipv6_tc=tc_out,
ipv6_hlim=random.randint(2, 63),
inner_frame=inner_pkt)
else:
#Building triple encap packet with SCAPY, because there is no PTF function for it, I use the defualt values for the TCP header
tcp_hdr = scapy.TCP(sport=1234, dport=80, flags="S", chksum=0)
inner_pkt2 = scapy.IP(src='4.4.4.4', dst='3.3.3.3', tos=0, ttl=64, id=1, ihl=None) / tcp_hdr
inner_pkt = scapy.IP(src=inner_src_ip, dst=dst_ip, tos=tos_in, ttl=64, id=1, ihl=None,proto=4) / inner_pkt2
inner_pkt = inner_pkt/("".join([chr(x) for x in xrange(default_packet_len - len(inner_pkt))]))
#The expected packet is also built by scapy, and the TTL is taked from the inner layer and redused by one
exp_pkt = scapy.Ether(dst=dst_mac, src=router_mac)/inner_pkt
exp_pkt['IP'].tos = exp_tos #this parameter is taken by the decap rule configuration
exp_pkt['IP'].ttl = 63

pkt = simple_ipv4ip_packet(
eth_dst=router_mac,
eth_src=src_mac,
ip_src='1.1.1.1',
ip_dst=self.test_params['lo_ip'],
ip_tos=tos_out,
ip_ttl=random.randint(2, 63),
inner_frame=inner_pkt)
raise Exception("ERROR: invalid outer packet type ", outer_pkt)


return pkt, exp_pkt

#-----------------------------------------------------------------

def send_and_verify(self, dst_ip, expected_ports, src_port, outer_pkt='ipv4', triple_encap=False):
'''
@summary: This function builds encap packet, send and verify their arrival.
@dst_ip: the destination ip for the inner IP header
@expected_ports: list of ports that a packet can arrived from
@src_port: the physical port that the packet will be sent from
@triple_encap: True to send triple encapsulated packet
'''

pkt, exp_pkt = self.create_encap_packet(dst_ip, outer_pkt, triple_encap)

#send and verify the return packets
masked_exp_pkt = Mask(exp_pkt)
masked_exp_pkt.set_do_not_care_scapy(scapy.Ether, "dst")
masked_exp_pkt.set_do_not_care_scapy(scapy.Ether, "src")

#send and verify the return packets
send_packet(self, src_port, pkt)
logging.info(".....Sending packet from port" + str(src_port) + " to " + dst_ip + ", Triple_encap: " + str(triple_encap))
(matched, received) = verify_packet_any_port(self, masked_exp_pkt, expected_ports)
logging.info(".....Sending packet from port" + str(src_port) + " to " +
dst_ip + ", Triple_encap: " + str(triple_encap))
matched, received = verify_packet_any_port(self, masked_exp_pkt, expected_ports)
assert received
return (matched, received)
return matched, received

#-----------------------------------------------------------------

def runTest(self):
"""
@summary: Send double and triple encapsulated packets for each range of IPv4 and
expect the packet to be received from one of the expected ports
"""
# IPv4 Test
for ip_range in self.fib.ipv4_ranges():
# Get the expected list of ports that would receive the packets
exp_port_list = self.fib[ip_range.get_first_ip()].get_next_hop_list()
# Choose random one source port from all ports excluding the expected ones
src_port = random.choice([port for port in self.src_ports if port not in exp_port_list])

if not len(exp_port_list):
continue

logging.info("Check IP range:" + str(ip_range) + " on " + str(exp_port_list) + "...")
# Send a packet with the first IP in the range
self.send_and_verify(ip_range.get_first_ip(), exp_port_list, src_port)
self.send_and_verify(ip_range.get_first_ip(), exp_port_list, src_port, True)
# Send a packet with the last IP in the range
if ip_range.length() > 1:
self.send_and_verify(ip_range.get_last_ip(), exp_port_list, src_port)
self.send_and_verify(ip_range.get_last_ip(), exp_port_list, src_port, True)
# Send a packet with a random IP in the range
if ip_range.length() > 2:
self.send_and_verify(ip_range.get_random_ip(), exp_port_list, src_port)
self.send_and_verify(ip_range.get_random_ip(), exp_port_list, src_port, True)

ip_ranges = []

if self.test_inner_ipv4:
ip_ranges += self.fib.ipv4_ranges()
if self.test_inner_ipv6:
ip_ranges += self.fib.ipv6_ranges()

outer_pkt_family_type = []
if self.test_outer_ipv4:
outer_pkt_family_type.append('ipv4')
if self.test_outer_ipv6:
outer_pkt_family_type.append('ipv6')

# First test one tunnel with all ip ranges
for pkt_type in outer_pkt_family_type:
for ip_range in ip_ranges:
# Get the expected list of ports that would receive the packets
exp_port_list = self.fib[ip_range.get_first_ip()].get_next_hop_list()
# Choose random one source port from all ports excluding the expected ones
src_port = random.choice([port for port in self.src_ports if port not in exp_port_list])

if not len(exp_port_list):
continue

logging.info("Check " + pkt_type.replace('ip', 'IP') + " tunneled traffic on IP range:" +
str(ip_range) + " on " + str(exp_port_list) + "...")
# Send a packet with the first IP in the range
self.send_and_verify(ip_range.get_first_ip(), exp_port_list, src_port, pkt_type)
self.send_and_verify(ip_range.get_first_ip(), exp_port_list, src_port, pkt_type, True)
# Send a packet with the last IP in the range
if ip_range.length() > 1:
self.send_and_verify(ip_range.get_last_ip(), exp_port_list, src_port, pkt_type)
self.send_and_verify(ip_range.get_last_ip(), exp_port_list, src_port, pkt_type, True)
# Send a packet with a random IP in the range
if ip_range.length() > 2:
self.send_and_verify(ip_range.get_random_ip(), exp_port_list, src_port, pkt_type)
self.send_and_verify(ip_range.get_random_ip(), exp_port_list, src_port, pkt_type, True)

#---------------------------------------------------------------------


Loading