Skip to content

Commit

Permalink
Scripts: Add PCAP Parser (incl. Nonce brute-forcer)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuxuser committed Mar 4, 2021
1 parent 9a05029 commit 5aa195f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 68 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ httpx
aiortc
construct
dpkt
hexdump

wheel
flake8
Expand Down
164 changes: 96 additions & 68 deletions xcloud/scripts/pcap_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,95 +3,123 @@
"""
import argparse
import logging
from typing import Any, Optional
import struct
from typing import Any, Optional, Generator

import dpkt
from hexdump import hexdump
from aiortc import rtp
from aioice import stun
from construct.lib import containers

from ..protocol import packets, teredo, ipv6
from ..protocol import packets, teredo, ipv6, srtp_crypto


logging.basicConfig(level=logging.DEBUG)
containers.setGlobalPrintFullStrings(True)
LOG = logging.getLogger(__name__)

def get_info_stun(stun: stun.Message) -> None:
return f'STUN: {stun}'

def get_info_rtp(rtp: rtp.RtpPacket) -> None:
try:
payload_name = packets.PayloadType(rtp.payload_type)
except:
payload_name = '<UNKNOWN>'

return f'RTP: {payload_name.name} {rtp} SSRC={rtp.ssrc}'

def get_info_teredo(teredo: teredo.TeredoPacket) -> None:
info = f'TEREDO: {teredo}'
if teredo.ipv6.next_header != ipv6.NO_NEXT_HEADER:
data = teredo.ipv6.data
if type(data) == bytes:
raise ValueError(f'TEREDO contains unparsed-subpacket: {data}')
subpacket_info = get_info_general(data)
info += f'\n -> TEREDO-WRAPPED: {subpacket_info}'
return info


PACKET_TYPES = [
(stun.parse_message, get_info_stun),
(rtp.RtpPacket.parse, get_info_rtp),
(teredo.TeredoPacket.parse, get_info_teredo)
]

def get_info_general(packet: Any) -> Optional[str]:
if isinstance(packet, dpkt.udp.UDP):
data = bytes(packet.data)
for cls, info_func in PACKET_TYPES:
try:
instance = cls(data)
info = info_func(instance)
return info
except:
pass
elif isinstance(packet, bytes):
return '<RAW BYTES>'
else:
return '<UNHANDLED>'

def packet_filter(filepath):
with open(filepath, 'rb') as fh:
for ts, buf in dpkt.pcap.Reader(fh):
eth = dpkt.ethernet.Ethernet(buf)

# Make sure the Ethernet data contains an IP packet
if not isinstance(eth.data, dpkt.ip.IP):
continue

ip = eth.data
subpacket = ip.data
if not isinstance(subpacket, dpkt.udp.UDP):
continue

yield(subpacket, ts)


def parse_file(pcap_filepath: str) -> None:
for packet, timestamp in packet_filter(pcap_filepath):
info = get_info_general(packet)
if info:
print(info)
class XcloudPcapParser:
def __init__(self, srtp_key: Optional[str]):
self.crypto: Optional[srtp_crypto.SrtpContext] = None
if srtp_key:
self.crypto = srtp_crypto.SrtpContext.from_base64(srtp_key)

@property
def PACKET_TYPES(self):
return [
(stun.parse_message, self.get_info_stun),
(rtp.RtpPacket.parse, self.get_info_rtp),
(teredo.TeredoPacket.parse, self.get_info_teredo)
]

def get_info_stun(self, stun: stun.Message) -> None:
return f'STUN: {stun}'

def brute_force_nonce(self, nonce_orig: bytes) -> Generator:
for byte1 in range(0, 0xFF):
for byte2 in range(0, 0xFF):
nonce_transform = b''.join([nonce_orig[:5], struct.pack('!B', byte1), nonce_orig[6:11], struct.pack('!B', byte2)])
yield nonce_transform

def get_info_rtp(self, rtp: rtp.RtpPacket) -> None:
try:
payload_name = packets.PayloadType(rtp.payload_type)
except:
payload_name = '<UNKNOWN>'

info_str = f'RTP: {payload_name.name} {rtp} SSRC={rtp.ssrc}'
if self.crypto:
rtp_packet_serialized = rtp.serialize()
rtp_header, rtp_data = rtp_packet_serialized[:12], rtp_packet_serialized[12:]
nonce_orig = self.crypto.session_keys.nonce_key[2:]
for nonce_transformed in self.brute_force_nonce(nonce_orig):
try:
decrypted = self.crypto._decrypt(self.crypto.decryptor_ctx, nonce_transformed, rtp_data, rtp_header)
info_str += "\n" + hexdump(decrypted, result='return') + "\n"
except Exception:
pass
return info_str

def get_info_teredo(self, teredo: teredo.TeredoPacket) -> None:
info = f'TEREDO: {teredo}'
if teredo.ipv6.next_header != ipv6.NO_NEXT_HEADER:
data = teredo.ipv6.data
if type(data) == bytes:
raise ValueError(f'TEREDO contains unparsed-subpacket: {data}')
subpacket_info = self.get_info_general(data)
info += f'\n -> TEREDO-WRAPPED: {subpacket_info}'
return info

def get_info_general(self, packet: Any) -> Optional[str]:
if isinstance(packet, dpkt.udp.UDP):
data = bytes(packet.data)
for cls, info_func in self.PACKET_TYPES:
try:
instance = cls(data)
info = info_func(instance)
return info
except:
pass
elif isinstance(packet, bytes):
return '<RAW BYTES>'
else:
return '<UNHANDLED>'

def packet_filter(self, filepath):
with open(filepath, 'rb') as fh:
for ts, buf in dpkt.pcap.Reader(fh):
eth = dpkt.ethernet.Ethernet(buf)

# Make sure the Ethernet data contains an IP packet
if not isinstance(eth.data, dpkt.ip.IP):
continue

ip = eth.data
subpacket = ip.data
if not isinstance(subpacket, dpkt.udp.UDP):
continue

yield(subpacket, ts)


def parse_file(self, pcap_filepath: str) -> None:
for packet, timestamp in self.packet_filter(pcap_filepath):
info = self.get_info_general(packet)
if info:
print(info)

def main():
parser = argparse.ArgumentParser(
"XCloud PCAP parser",
description="PCAP Parser for XCloud network traffic"
)
parser.add_argument("filepath", help="Path to PCAP/NG file")
parser.add_argument("--key", "-k", help="SRTP key")
args = parser.parse_args()

parse_file(args.filepath)
pcap_parser = XcloudPcapParser(args.key)
pcap_parser.parse_file(args.filepath)


if __name__ == "__main__":
Expand Down

0 comments on commit 5aa195f

Please sign in to comment.