From fe0e4492b7b71d5206dae605715fa0b32737f73f Mon Sep 17 00:00:00 2001 From: Mikolaj Malecki Date: Tue, 14 Jan 2025 10:41:14 +0100 Subject: [PATCH] [core] Fix stalled connection that should break after rogue NAK/ACK reception --- scripts/srt-proxy.py | 168 +++++++++++++++++++++++++++++++++++++++++++ srtcore/core.cpp | 44 +++++++----- 2 files changed, 194 insertions(+), 18 deletions(-) create mode 100755 scripts/srt-proxy.py diff --git a/scripts/srt-proxy.py b/scripts/srt-proxy.py new file mode 100755 index 000000000..ad2e8ac34 --- /dev/null +++ b/scripts/srt-proxy.py @@ -0,0 +1,168 @@ +#!/usr/bin/python3 + +# This script is a testing tool that inserts a packet interceptor for +# a bond between two local ports and modify a packet that is sent over +# the UDP link used by the SRT connection. + +# Provided by: +# https://github.com/FelixSodermanNeti + +import socket, select, argparse + +class UDPProxy: + def __init__(self, caller_host, caller_port, listener_host, listener_port, break_at_pkt_NAK, break_at_pkt_ACK): + + # Listening socket configuration + self.caller_host = caller_host + self.caller_port = caller_port + self.caller_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.caller_socket.bind((self.caller_host, self.caller_port)) + + # Forwarding destination configuration + self.listener_host = listener_host + self.listener_port = listener_port + + # Client address tracking + self.client_address = None + + # Proxy state + self.running = False + + # Break SRT + self.sentPacketCounter = 0 + self.recivedAckCounter = 0 + self.PACKETBREAK_NAK = break_at_pkt_NAK + self.PACKETBREAK_ACK = break_at_pkt_ACK + + def start(self): + try: + self.running = True + print( + f"UDP Proxy started. Listening on {self.caller_host}:{self.caller_port}. " + f"Forwarding to {self.listener_host}:{self.listener_port}" + ) + + # Create forward socket + listener_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + # Use select for non-blocking I/O + while self.running: + readable, _, _ = select.select([self.caller_socket, listener_socket], [], [], 1) + + for sock in readable: + if sock == self.caller_socket: + # Receive from client + data, addr = sock.recvfrom(65535) + self.client_address = addr + + #Modify the NAK + processedData = data + # NAK + if(data[1] == 0x03 and len(data) == 24 and self.PACKETBREAK_NAK != 0): # IS A NAK, This should check for a 1 in the first byte aswell. + print("Modifying NAK.") + newBytes = b"\xff\xff\xff\xff" + processedData = data[:20] + newBytes + data[20 + len(newBytes):] + """ + print("NAK BEFORE:") + print(string_bytearray_in_rows(data=data)) + print("\n\nModifying NAK.\n\n") + print("ŃAK AFTER:") + print(string_bytearray_in_rows(data=processedData)) + """ + + # ACK + if(data[1] == 0x02 and len(data) == 44): # IS A ACK + self.recivedAckCounter += 1 + if (self.PACKETBREAK_ACK != 0 and self.recivedAckCounter == self.PACKETBREAK_ACK): + print("Modifying ACK.") + newBytes = b"\x7f\xff\xff\xff" + offset = 16 + processedData = data[:offset] + newBytes + data[offset + len(newBytes):] + """ + print("ACK BEFORE:") + print(string_bytearray_in_rows(data=data)) + print("\n\nModifying ACK.\n\n") + print("ACK AFTER:") + print(string_bytearray_in_rows(data=processedData)) + """ + listener_socket.sendto( + processedData, + (self.listener_host, self.listener_port) + ) + + elif sock == listener_socket: + # Receive from server + data, addr = sock.recvfrom(65535) + + # Drop a sequence of data packets (as to atleast 3 to trigger an NAK with a range) + if len(data) == 1332: # Packet with data + self.sentPacketCounter += 1 + if (self.PACKETBREAK_NAK != 0 and self.sentPacketCounter in range(self.PACKETBREAK_NAK, self.PACKETBREAK_NAK+5)): + print("Discarding packet.") + continue + + if not self.client_address: + print("No client to send data to!") + continue + + self.caller_socket.sendto( + data, + self.client_address + ) + + except Exception as e: + print(f"Proxy error: {e}") + + finally: + self.stop() + + def stop(self): + self.running = False + self.caller_socket.close() + print("UDP Proxy stopped.") + + +def string_bytearray_in_rows(data, bytes_per_row=4): + tmpString = "\n" + for i in range(0, len(data), bytes_per_row): + row = data[i:i+bytes_per_row] + tmpString += f"{i}: " + tmpString += (" ".join(f"{byte:02x}" for byte in row)) + tmpString += "\n" + tmpString += "\n" + return tmpString + +def main(): + # Parse command-line arguments + parser = argparse.ArgumentParser(description='UDP Proxy') + parser.add_argument('--caller-host', default='127.0.0.1', + help='Host to listen on (srt caller) (default: 127.0.0.1)') + parser.add_argument('--caller-port', type=int, required=True, + help='Port to listen on (srt caller)') + parser.add_argument('--listener-host', default='127.0.0.1', + help='Destination host to forward packets (srt listener)') + parser.add_argument('--listener-port', type=int, required=True, + help='Destination port to forward packets (srt listener)') + parser.add_argument('--break-at-pkt-NAK', type=int, default=0, + help='At what datapacket should the NAK -1 be sent (0=never)') + parser.add_argument('--break-at-pkt-ACK', type=int, default=0, + help='At what datapacket should the ACK -1 be sent (0=never)') + args = parser.parse_args() + + # Create and start proxy + proxy = UDPProxy( + args.caller_host, + args.caller_port, + args.listener_host, + args.listener_port, + args.break_at_pkt_NAK, + args.break_at_pkt_ACK + ) + + try: + proxy.start() + except KeyboardInterrupt: + proxy.stop() + +if __name__ == '__main__': + main() diff --git a/srtcore/core.cpp b/srtcore/core.cpp index eca2b2069..dc1e2d118 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -8483,39 +8483,44 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_ // Protect packet retransmission { - ScopedLock ack_lock(m_RecvAckLock); + UniqueLock ack_lock(m_RecvAckLock); // Check the validation of the ack if (CSeqNo::seqcmp(ackdata_seqno, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) { + ack_lock.unlock(); + // this should not happen: attack or bug LOGC(gglog.Error, log << CONID() << "ATTACK/IPE: incoming ack seq " << ackdata_seqno << " exceeds current " << m_iSndCurrSeqNo << " by " << (CSeqNo::seqoff(m_iSndCurrSeqNo, ackdata_seqno) - 1) << "!"); m_bBroken = true; m_iBrokenCounter = 0; + + updateBrokenConnection(); + completeBrokenConnectionDependencies(SRT_ESECFAIL); // LOCKS! return; } - if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0) - { - const int cwnd1 = std::min(m_iFlowWindowSize, m_iCongestionWindow); - const bool bWasStuck = cwnd1<= getFlightSpan(); - // Update Flow Window Size, must update before and together with m_iSndLastAck - m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT]; - m_iSndLastAck = ackdata_seqno; - m_tsLastRspAckTime = currtime; - m_iReXmitCount = 1; // Reset re-transmit count since last ACK - - const int cwnd = std::min(m_iFlowWindowSize, m_iCongestionWindow); - if (bWasStuck && cwnd > getFlightSpan()) + if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0) { - m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); - HLOGC(gglog.Debug, - log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize - << " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno); + const int cwnd1 = std::min(m_iFlowWindowSize, m_iCongestionWindow); + const bool bWasStuck = cwnd1<= getFlightSpan(); + // Update Flow Window Size, must update before and together with m_iSndLastAck + m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT]; + m_iSndLastAck = ackdata_seqno; + m_tsLastRspAckTime = currtime; + m_iReXmitCount = 1; // Reset re-transmit count since last ACK + + const int cwnd = std::min(m_iFlowWindowSize, m_iCongestionWindow); + if (bWasStuck && cwnd > getFlightSpan()) + { + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); + HLOGC(gglog.Debug, + log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize + << " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno); + } } - } /* * We must not ignore full ack received by peer @@ -8924,6 +8929,9 @@ void srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt) // this should not happen: attack or bug m_bBroken = true; m_iBrokenCounter = 0; + + updateBrokenConnection(); + completeBrokenConnectionDependencies(SRT_ESECFAIL); // LOCKS! return; }