Skip to content

Commit

Permalink
Add software reset function for UDS scanner (#4605)
Browse files Browse the repository at this point in the history
  • Loading branch information
polybassa authored Dec 11, 2024
1 parent d10b482 commit 4fbf6fb
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 44 deletions.
10 changes: 8 additions & 2 deletions scapy/contrib/automotive/scanner/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def __init__(
socket, # type: Optional[_SocketUnion]
reset_handler=None, # type: Optional[Callable[[], None]]
reconnect_handler=None, # type: Optional[Callable[[], _SocketUnion]] # noqa: E501
test_cases=None,
# type: Optional[List[Union[AutomotiveTestCaseABC, Type[AutomotiveTestCaseABC]]]] # noqa: E501
test_cases=None, # type: Optional[List[Union[AutomotiveTestCaseABC, Type[AutomotiveTestCaseABC]]]] # noqa: E501
software_reset_handler=None, # type: Optional[Callable[[_SocketUnion], None]] # noqa: E501
**kwargs # type: Optional[Dict[str, Any]]
): # type: (...) -> None

Expand All @@ -82,6 +82,7 @@ def __init__(
self.target_state = self._initial_ecu_state
self.reset_handler = reset_handler
self.reconnect_handler = reconnect_handler
self.software_reset_handler = software_reset_handler

self.cleanup_functions = list() # type: List[_CleanupCallable]

Expand Down Expand Up @@ -152,6 +153,11 @@ def reset_target(self):
log_automotive.info("Target reset")
if self.reset_handler:
self.reset_handler()
elif self.software_reset_handler:
if self.socket and self.socket.closed:
self.reconnect()
if self.socket:
self.software_reset_handler(self.socket)
self.target_state = self._initial_ecu_state

def reconnect(self):
Expand Down
88 changes: 52 additions & 36 deletions scapy/contrib/automotive/uds_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,15 @@
# This file is part of Scapy
# See https://scapy.net/ for more information
# Copyright (C) Nils Weiss <[email protected]>

# scapy.contrib.description = UDS AutomotiveTestCaseExecutor
# scapy.contrib.status = loads

from abc import ABC
import struct
import random
import time
import itertools
import copy
import inspect

import itertools
import logging
import random
import struct
import time
from abc import ABC
from collections import defaultdict

from scapy.compat import orb
from scapy.contrib.automotive import log_automotive
from scapy.packet import Raw, Packet
from scapy.error import Scapy_Exception
from scapy.contrib.automotive.uds import UDS, UDS_NR, UDS_DSC, UDS_TP, \
UDS_RDBI, UDS_WDBI, UDS_SA, UDS_RC, UDS_IOCBI, UDS_RMBA, UDS_ER, \
UDS_TesterPresentSender, UDS_CC, UDS_RDBPI, UDS_RD, UDS_TD

from scapy.contrib.automotive.ecu import EcuState
from scapy.contrib.automotive.scanner.enumerator import ServiceEnumerator, \
_AutomotiveTestCaseScanResult, _AutomotiveTestCaseFilteredScanResult, \
StateGeneratingServiceEnumerator
from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCaseABC, \
_SocketUnion, _TransitionTuple, StateGenerator
from scapy.contrib.automotive.scanner.configuration import \
AutomotiveTestCaseExecutorConfiguration # noqa: E501
from scapy.contrib.automotive.scanner.graph import _Edge
from scapy.contrib.automotive.scanner.staged_test_case import StagedAutomotiveTestCase # noqa: E501
from scapy.contrib.automotive.scanner.executor import AutomotiveTestCaseExecutor # noqa: E501

# TODO: Refactor this import
from scapy.contrib.automotive.uds_ecu_states import * # noqa: F401, F403

# typing imports
from typing import (
Dict,
Expand All @@ -54,6 +26,29 @@
Sequence,
)

from scapy.compat import orb
from scapy.contrib.automotive import log_automotive
from scapy.contrib.automotive.ecu import EcuState
from scapy.contrib.automotive.scanner.configuration import \
AutomotiveTestCaseExecutorConfiguration # noqa: E501
from scapy.contrib.automotive.scanner.enumerator import ServiceEnumerator, \
_AutomotiveTestCaseScanResult, _AutomotiveTestCaseFilteredScanResult, \
StateGeneratingServiceEnumerator
from scapy.contrib.automotive.scanner.executor import AutomotiveTestCaseExecutor # noqa: E501
from scapy.contrib.automotive.scanner.graph import _Edge
from scapy.contrib.automotive.scanner.staged_test_case import StagedAutomotiveTestCase # noqa: E501
from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCaseABC, \
_SocketUnion, _TransitionTuple, StateGenerator
from scapy.contrib.automotive.uds import UDS, UDS_NR, UDS_DSC, UDS_TP, \
UDS_RDBI, UDS_WDBI, UDS_SA, UDS_RC, UDS_IOCBI, UDS_RMBA, UDS_ER, \
UDS_TesterPresentSender, UDS_CC, UDS_RDBPI, UDS_RD, UDS_TD
# TODO: Refactor this import
from scapy.contrib.automotive.uds_ecu_states import * # noqa: F401, F403
from scapy.error import Scapy_Exception
from scapy.packet import Raw, Packet

# scapy.contrib.description = UDS AutomotiveTestCaseExecutor
# scapy.contrib.status = loads

# Definition outside the class UDS_RMBASequentialEnumerator
# to allow pickling
Expand Down Expand Up @@ -872,8 +867,8 @@ def _get_table_entry_y(self, tup):
resp = tup[2]
if resp is not None:
return "0x%04x: %s" % \
(tup[1].dataIdentifier,
repr(resp.payload))
(tup[1].dataIdentifier,
repr(resp.payload))
else:
return "0x%04x: No response" % tup[1].dataIdentifier

Expand Down Expand Up @@ -1252,3 +1247,24 @@ def default_test_case_clss(self):
return [UDS_ServiceEnumerator, UDS_DSCEnumerator, UDS_TPEnumerator,
UDS_SAEnumerator, UDS_WDBISelectiveEnumerator,
UDS_RMBAEnumerator, UDS_RCEnumerator, UDS_IOCBIEnumerator]


def uds_software_reset(connection, # type: _SocketUnion
logger=log_automotive # type: logging.Logger
): # type: (...) -> None
logger.debug("Reset procedure of target started.")
resp = connection.sr1(UDS() / UDS_ER(resetType=1),
timeout=5,
verbose=False)
if resp and resp.service != 0x7f:
logger.debug("Reset procedure of target complete")
return

logger.debug("Couldn't reset target with UDS_ER. "
"At least try to set target back to DefaultSession")
resp = connection.sr1(UDS() / UDS_DSC(b"\x01"), verbose=False, timeout=5)
if resp and resp.service != 0x7f:
logger.debug("Target in DefaultSession")
return

logger.error("Target not in DefaultSession. Software reset failed.")
191 changes: 185 additions & 6 deletions test/contrib/automotive/scanner/uds_scanner.uts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ conf.debug_dissector = False

= Define Testfunction

def executeScannerInVirtualEnvironment(supported_responses, enumerators, unstable_socket=True, **kwargs):
def executeScannerInVirtualEnvironment(supported_responses, enumerators, unstable_socket=True, software_reset=False, **kwargs):
tester_obj_pipe = ObjectPipe(name="TesterPipe")
ecu_obj_pipe = ObjectPipe(name="ECUPipe")
TesterSocket = UnstableSocket if unstable_socket else TestSocket
Expand Down Expand Up @@ -58,11 +58,26 @@ def executeScannerInVirtualEnvironment(supported_responses, enumerators, unstabl
sim = threading.Thread(target=answering_machine_thread)
try:
sim.start()
scanner = UDS_Scanner(
tester, reset_handler=reset, reconnect_handler=reconnect,
test_cases=enumerators, timeout=0.1,
retry_if_none_received=True, unittest=True,
**kwargs)
if software_reset:
scanner = UDS_Scanner(
tester,
software_reset_handler=uds_software_reset,
reconnect_handler=reconnect,
test_cases=enumerators,
timeout=0.1,
retry_if_none_received=True,
unittest=True,
**kwargs)
else:
scanner = UDS_Scanner(
tester,
reset_handler=reset,
reconnect_handler=reconnect,
test_cases=enumerators,
timeout=0.1,
retry_if_none_received=True,
unittest=True,
**kwargs)
for i in range(12):
print("Starting scan")
scanner.scan(timeout=10)
Expand Down Expand Up @@ -258,6 +273,170 @@ assert "serviceNotSupported received 75 times" in result
assert "serviceNotSupportedInActiveSession received 19 times" in result
assert "securityAccessDenied received 2 times" in result

= Simulate ECU and run Scanner with software resert

responses = ([EcuResponse(None, [UDS()/UDS_DSCPR(b"\x01")])]
+ mEcu.supported_responses)

scanner = executeScannerInVirtualEnvironment(
responses,
[UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator],
software_reset=True,
UDS_DSCEnumerator_kwargs={"scan_range": range(5), "delay_state_change": 0,
"overwrite_timeout": False},
UDS_SA_XOR_Enumerator_kwargs={"scan_range": range(5)},
UDS_ServiceEnumerator_kwargs={"scan_range": [0x10, 0x11, 0x14, 0x19, 0x22,
0x23, 0x24, 0x27, 0x28, 0x29,
0x2A, 0x2C, 0x2E, 0x2F, 0x31,
0x34, 0x35, 0x36, 0x37, 0x38,
0x3D, 0x3E, 0x83, 0x84, 0x85,
0x87],
"request_length": 1})

scanner.show_testcases()
scanner.show_testcases_status()
assert len(scanner.state_paths) == 6
assert scanner.scan_completed
assert scanner.progress() > 0.95

assert EcuState(session=1) in scanner.final_states
assert EcuState(session=2, tp=1) in scanner.final_states
assert EcuState(session=1, tp=1) in scanner.final_states
assert EcuState(session=3, tp=1) in scanner.final_states
assert EcuState(session=2, tp=1, security_level=2) in scanner.final_states
assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states

#################### UDS_SA_XOR_Enumerator ################
tc = scanner.configuration.test_cases[0]

assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()

assert len(tc.results_with_negative_response) == 24
assert len(tc.results_with_positive_response) >= 6
assert len(tc.scanned_states) == 6

result = tc.show(dump=True)

assert "serviceNotSupportedInActiveSession received 5 times" in result
assert "incorrectMessageLengthOrInvalidFormat received 14 times" in result

################# UDS_DSCEnumerator #####################
tc = scanner.configuration.test_cases[1]

assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()

assert len(tc.results_with_negative_response) == 17
assert len(tc.results_with_positive_response) == 13
assert len(tc.scanned_states) == 6

result = tc.show(dump=True)

assert "incorrectMessageLengthOrInvalidFormat received 14 times" in result

###################### UDS_ServiceEnumerator ###################
tc = scanner.configuration.test_cases[2]

assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()

assert len(tc.results_with_negative_response) == 156
assert len(tc.results_with_positive_response) == 0
assert len(tc.scanned_states) == 6

result = tc.show(dump=True)

assert "incorrectMessageLengthOrInvalidFormat received 34 times" in result
assert "serviceNotSupported received 75 times" in result
assert "serviceNotSupportedInActiveSession received 19 times" in result
assert "securityAccessDenied received 2 times" in result

= Simulate ECU and run Scanner with software resert 2

responses = ([EcuResponse(None, [UDS()/UDS_ERPR(b"\x01")])]
+ mEcu.supported_responses)

scanner = executeScannerInVirtualEnvironment(
responses,
[UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator],
software_reset=True,
UDS_DSCEnumerator_kwargs={"scan_range": range(5), "delay_state_change": 0,
"overwrite_timeout": False},
UDS_SA_XOR_Enumerator_kwargs={"scan_range": range(5)},
UDS_ServiceEnumerator_kwargs={"scan_range": [0x10, 0x11, 0x14, 0x19, 0x22,
0x23, 0x24, 0x27, 0x28, 0x29,
0x2A, 0x2C, 0x2E, 0x2F, 0x31,
0x34, 0x35, 0x36, 0x37, 0x38,
0x3D, 0x3E, 0x83, 0x84, 0x85,
0x87],
"request_length": 1})

scanner.show_testcases()
scanner.show_testcases_status()
assert len(scanner.state_paths) == 5
assert scanner.scan_completed
assert scanner.progress() > 0.95

assert EcuState(session=1) in scanner.final_states
assert EcuState(session=2, tp=1) in scanner.final_states
assert EcuState(session=3, tp=1) in scanner.final_states
assert EcuState(session=2, tp=1, security_level=2) in scanner.final_states
assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states

#################### UDS_SA_XOR_Enumerator ################
tc = scanner.configuration.test_cases[0]

assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()

assert len(tc.results_with_negative_response) == 19
assert len(tc.results_with_positive_response) >= 6
assert len(tc.scanned_states) == 5

result = tc.show(dump=True)

assert "serviceNotSupportedInActiveSession received 5 times" in result
assert "incorrectMessageLengthOrInvalidFormat received 14 times" in result

################# UDS_DSCEnumerator #####################
tc = scanner.configuration.test_cases[1]

assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()

assert len(tc.results_with_negative_response) == 20
assert len(tc.results_with_positive_response) == 5
assert len(tc.scanned_states) == 5

result = tc.show(dump=True)

assert "incorrectMessageLengthOrInvalidFormat received 20 times" in result

###################### UDS_ServiceEnumerator ###################
tc = scanner.configuration.test_cases[2]

assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()

assert len(tc.results_with_negative_response) == 130
assert len(tc.results_with_positive_response) == 0
assert len(tc.scanned_states) == 5

result = tc.show(dump=True)

assert "incorrectMessageLengthOrInvalidFormat received 34 times" in result
assert "serviceNotSupported received 75 times" in result
assert "serviceNotSupportedInActiveSession received 19 times" in result
assert "securityAccessDenied received 2 times" in result


= UDS_ServiceEnumerator

def req_handler(resp, req):
Expand Down

0 comments on commit 4fbf6fb

Please sign in to comment.