diff --git a/examples/README.md b/examples/README.md index daeb3a1..cccba4b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -5,6 +5,20 @@ These are self-contained examples for using the `umrx-v3-py` with Application Bo * [`switch_app_dfu.py`](./switch_app_dfu.py) * [`switch_app_mtp.py`](./switch_app_mtp.py) +## [`bma400`](https://www.bosch-sensortec.com/products/motion-sensors/accelerometers/bma400/) + +The examples in the [`bma400`](./bma400) folder +show different communication features for the +[BMA400 shuttle](https://www.bosch-sensortec.com/media/boschsensortec/downloads/shuttle_board_flyer/application_board_3_1/bst-bma400-sf000.pdf) +board: + +* [`bma400/bma400_i2c_read_write.py`](./bma400/bma400_i2c_read_write.py) +* [`bma400/bma400_i2c_polling_streaming.py`](./bma400/bma400_i2c_polling_streaming.py) +* [`bma400/bma400_i2c_interrupt_streaming.py`](./bma400/bma400_i2c_interrupt_streaming.py) +* [`bma400/bma400_spi_read_write.py`](./bma400/bma400_spi_read_write.py) +* [`bma400/bma400_spi_polling_streaming.py`](./bma400/bma400_spi_polling_streaming.py) +* [`bma400/bma400_spi_interrupt_streaming.py`](./bma400/bma400_spi_interrupt_streaming.py) + ## [`bmi088`](https://www.bosch-sensortec.com/products/motion-sensors/imus/bmi088/) diff --git a/examples/bma400/__init__.py b/examples/bma400/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/bma400/bma400_i2c_interrupt_streaming.py b/examples/bma400/bma400_i2c_interrupt_streaming.py new file mode 100644 index 0000000..28b6522 --- /dev/null +++ b/examples/bma400/bma400_i2c_interrupt_streaming.py @@ -0,0 +1,47 @@ +import logging +import struct +import sys +import time +from pathlib import Path + +from umrx_app_v3.shuttle_board.bma400.bma400_shuttle import BMA400Shuttle + + +def setup_logging(level: int = logging.DEBUG) -> logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + shuttle = BMA400Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_i2c() + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + assert shuttle.sensor.chip_id == 0x90 + shuttle.configure_interrupt_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_interrupt_streaming_multiple(includes_mcu_timestamp=False): + sensor_id, packet, time_stamp, payload = streaming + a_x, a_y, a_z, t_0, t_1, t_2 = struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + # This example is for Application Board 3.1 hardware + shuttle = BMA400Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_i2c() + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:04X}") + assert shuttle.sensor.chip_id == 0x90 + shuttle.configure_polling_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_polling_streaming_multiple(): + sensor_id, payload = streaming + a_x, a_y, a_z, t_0, t_1, t_2 = struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + shuttle = BMA400Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_i2c() + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + logger.info(f"err_reg=0x{shuttle.sensor.err_reg:02X}") + logger.info(f"status=0x{shuttle.sensor.status:02X}") + + logger.info(f"acceleration={shuttle.sensor.acc_data}") + logger.info(f"sensor_time={shuttle.sensor.sensor_time}") + shuttle.switch_on_accel() + time.sleep(0.1) + logger.info(f"acceleration={shuttle.sensor.acc_data}") + logger.info(f"sensor_time={shuttle.sensor.sensor_time}") diff --git a/examples/bma400/bma400_spi_interrupt_streaming.py b/examples/bma400/bma400_spi_interrupt_streaming.py new file mode 100644 index 0000000..536e9d2 --- /dev/null +++ b/examples/bma400/bma400_spi_interrupt_streaming.py @@ -0,0 +1,49 @@ +import logging +import struct +import sys +import time +from pathlib import Path + +from umrx_app_v3.shuttle_board.bma400.bma400_shuttle import BMA400Shuttle + + +def setup_logging(level: int = logging.DEBUG) -> logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + shuttle = BMA400Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_spi() + _ = shuttle.board.read_spi(shuttle.CS, 0, 1) # dummy read is required, do not delete + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + assert shuttle.sensor.chip_id == 0x90 + shuttle.configure_interrupt_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_interrupt_streaming_multiple(includes_mcu_timestamp=False): + sensor_id, packet, time_stamp, payload = streaming + a_x, a_y, a_z, t_0, t_1, t_2 = struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + shuttle = BMA400Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_spi() + _ = shuttle.board.read_spi(shuttle.CS, 0, 1) # dummy read is required, do not delete + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + assert shuttle.sensor.chip_id == 0x90 + shuttle.configure_polling_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_polling_streaming_multiple(): + sensor_id, payload = streaming + a_x, a_y, a_z, t_0, t_1, t_2 = struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + shuttle = BMA400Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_spi() + _ = shuttle.board.read_spi(shuttle.CS, 0, 1) # dummy read is required, do not delete + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + logger.info(f"err_reg=0x{shuttle.sensor.err_reg:02X}") + logger.info(f"status=0x{shuttle.sensor.status:02X}") + + logger.info(f"acceleration={shuttle.sensor.acc_data}") + logger.info(f"sensor_time={shuttle.sensor.sensor_time}") + shuttle.switch_on_accel() + time.sleep(0.1) + logger.info(f"acceleration={shuttle.sensor.acc_data}") + logger.info(f"sensor_time={shuttle.sensor.sensor_time}") diff --git a/src/umrx_app_v3/sensors/bma400.py b/src/umrx_app_v3/sensors/bma400.py new file mode 100644 index 0000000..05d061c --- /dev/null +++ b/src/umrx_app_v3/sensors/bma400.py @@ -0,0 +1,655 @@ +import struct +from collections.abc import Callable +from enum import Enum + + +class BMA400Addr(Enum): + chip_id = 0x00 + err_reg = 0x02 + status = 0x03 + acc_x_lsb = 0x04 + acc_x_msb = 0x05 + acc_y_lsb = 0x06 + acc_y_msb = 0x07 + acc_z_lsb = 0x08 + acc_z_msb = 0x09 + sensor_time_0 = 0x0A + sensor_time_1 = 0x0B + sensor_time_2 = 0x0C + event = 0x0D + int_stat_0 = 0x0E + int_stat_1 = 0x0F + int_stat_2 = 0x10 + temp_data = 0x11 + fifo_length_0 = 0x12 + fifo_length_1 = 0x13 + fifo_data = 0x14 + step_cnt_0 = 0x15 + step_cnt_1 = 0x16 + step_cnt_2 = 0x17 + step_stat = 0x18 + acc_config_0 = 0x19 + acc_config_1 = 0x1A + acc_config_2 = 0x1B + int_config_0 = 0x1F + int_config_1 = 0x20 + int1_map = 0x21 + int2_map = 0x22 + int12_map = 0x23 + int12_io_ctlr = 0x24 + fifo_config_0 = 0x26 + fifo_config_1 = 0x27 + fifo_config_2 = 0x28 + fifo_pwr_config = 0x29 + auto_low_pow_0 = 0x2A + auto_low_pow_1 = 0x2B + auto_wake_up_0 = 0x2C + auto_wake_up_1 = 0x2D + wkup_int_config_0 = 0x2F + wkup_int_config_1 = 0x30 + wkup_int_config_2 = 0x31 + wkup_int_config_3 = 0x32 + wkup_int_config_4 = 0x33 + orient_ch_config_0 = 0x35 + orient_ch_config_1 = 0x36 + orient_ch_config_3 = 0x38 + orient_ch_config_4 = 0x39 + orient_ch_config_5 = 0x3A + orient_ch_config_6 = 0x3B + orient_ch_config_7 = 0x3C + orient_ch_config_8 = 0x3D + orient_ch_config_9 = 0x3E + gen1_int_config_0 = 0x3F + gen1_int_config_1 = 0x40 + gen1_int_config_2 = 0x41 + gen1_int_config_3 = 0x42 + gen1_int_config_31 = 0x43 + gen1_int_config_4 = 0x44 + gen1_int_config_5 = 0x45 + gen1_int_config_6 = 0x46 + gen1_int_config_7 = 0x47 + gen1_int_config_8 = 0x48 + gen1_int_config_9 = 0x49 + gen2_int_config_0 = 0x4A + gen2_int_config_1 = 0x4B + gen2_int_config_2 = 0x4C + gen2_int_config_3 = 0x4D + gen2_int_config_31 = 0x4E + gen2_int_config_4 = 0x4F + gen2_int_config_5 = 0x50 + gen2_int_config_6 = 0x51 + gen2_int_config_7 = 0x52 + gen2_int_config_8 = 0x53 + gen2_int_config_9 = 0x54 + acth_config_0 = 0x55 + acth_config_1 = 0x56 + tap_config = 0x57 + tap_config_1 = 0x58 + if_conf = 0x7C + self_test = 0x7D + cmd = 0x7E + + +class BMA400: + def __init__(self) -> None: + self.read: Callable | None = None + self.write: Callable | None = None + + def assign_callbacks(self, read_callback: Callable, write_callback: Callable) -> None: + self.read = read_callback + self.write = write_callback + + @staticmethod + def sign_convert_accel(a_x: int, a_y: int, a_z: int) -> tuple[int, int, int]: + if a_x > 2047: + a_x -= 4096 + if a_y > 2047: + a_y -= 4096 + if a_z > 2047: + a_z -= 4096 + return a_x, a_y, a_z + + @property + def chip_id(self) -> int: + return self.read(BMA400Addr.chip_id) + + @property + def err_reg(self) -> int: + return self.read(BMA400Addr.err_reg) + + @property + def status(self) -> int: + return self.read(BMA400Addr.status) + + @property + def acc_data(self) -> tuple[int, int, int]: + payload = self.read(BMA400Addr.acc_x_lsb, 6) + a_x, a_y, a_z = struct.unpack(" float: + b_0, b_1, b_2 = self.read(BMA400Addr.sensor_time_0, 3) + return ((b_2 << 16) | (b_1 << 8) | b_0) * 39.0625e-6 + + @property + def event(self) -> int: + return self.read(BMA400Addr.event) + + @property + def int_stat_0(self) -> int: + return self.read(BMA400Addr.int_stat_0) + + @property + def int_stat_1(self) -> int: + return self.read(BMA400Addr.int_stat_1) + + @property + def int_stat_2(self) -> int: + return self.read(BMA400Addr.int_stat_2) + + @property + def temp_data(self) -> int: + return self.read(BMA400Addr.temp_data) + + @property + def fifo_length_0(self) -> int: + return self.read(BMA400Addr.fifo_length_0) + + @property + def fifo_length_1(self) -> int: + return self.read(BMA400Addr.fifo_length_1) + + @property + def fifo_data(self) -> int: + return self.read(BMA400Addr.fifo_data) + + @property + def step_count(self) -> int: + b_0, b_1, b_2 = self.read(BMA400Addr.step_cnt_0, 3) + return (b_2 << 16) | (b_1 << 8) | b_0 + + @property + def step_stat(self) -> int: + return self.read(BMA400Addr.step_stat) + + @property + def acc_config_0(self) -> int: + return self.read(BMA400Addr.acc_config_0) + + @acc_config_0.setter + def acc_config_0(self, value: int) -> None: + self.write(BMA400Addr.acc_config_0, value) + + @property + def acc_config_1(self) -> int: + return self.read(BMA400Addr.acc_config_1) + + @acc_config_1.setter + def acc_config_1(self, value: int) -> None: + self.write(BMA400Addr.acc_config_1, value) + + @property + def acc_config_2(self) -> int: + return self.read(BMA400Addr.acc_config_2) + + @acc_config_2.setter + def acc_config_2(self, value: int) -> None: + self.write(BMA400Addr.acc_config_2, value) + + @property + def int_config_0(self) -> int: + return self.read(BMA400Addr.int_config_0) + + @int_config_0.setter + def int_config_0(self, value: int) -> None: + self.write(BMA400Addr.int_config_0, value) + + @property + def int_config_1(self) -> int: + return self.read(BMA400Addr.int_config_1) + + @int_config_1.setter + def int_config_1(self, value: int) -> None: + self.write(BMA400Addr.int_config_1, value) + + @property + def int1_map(self) -> int: + return self.read(BMA400Addr.int1_map) + + @int1_map.setter + def int1_map(self, value: int) -> None: + self.write(BMA400Addr.int1_map, value) + + @property + def int2_map(self) -> int: + return self.read(BMA400Addr.int2_map) + + @int2_map.setter + def int2_map(self, value: int) -> None: + self.write(BMA400Addr.int2_map, value) + + @property + def int12_map(self) -> int: + return self.read(BMA400Addr.int12_map) + + @int12_map.setter + def int12_map(self, value: int) -> None: + self.write(BMA400Addr.int12_map, value) + + @property + def int12_io_ctlr(self) -> int: + return self.read(BMA400Addr.int12_io_ctlr) + + @int12_io_ctlr.setter + def int12_io_ctlr(self, value: int) -> None: + self.write(BMA400Addr.int12_io_ctlr, value) + + @property + def fifo_config_0(self) -> int: + return self.read(BMA400Addr.fifo_config_0) + + @fifo_config_0.setter + def fifo_config_0(self, value: int) -> None: + self.write(BMA400Addr.fifo_config_0, value) + + @property + def fifo_config_1(self) -> int: + return self.read(BMA400Addr.fifo_config_1) + + @fifo_config_1.setter + def fifo_config_1(self, value: int) -> None: + self.write(BMA400Addr.fifo_config_1, value) + + @property + def fifo_config_2(self) -> int: + return self.read(BMA400Addr.fifo_config_2) + + @fifo_config_2.setter + def fifo_config_2(self, value: int) -> None: + self.write(BMA400Addr.fifo_config_2, value) + + @property + def fifo_pwr_config(self) -> int: + return self.read(BMA400Addr.fifo_pwr_config) + + @fifo_pwr_config.setter + def fifo_pwr_config(self, value: int) -> None: + self.write(BMA400Addr.fifo_pwr_config, value) + + @property + def auto_low_pow_0(self) -> int: + return self.read(BMA400Addr.auto_low_pow_0) + + @auto_low_pow_0.setter + def auto_low_pow_0(self, value: int) -> None: + self.write(BMA400Addr.auto_low_pow_0, value) + + @property + def auto_low_pow_1(self) -> int: + return self.read(BMA400Addr.auto_low_pow_1) + + @auto_low_pow_1.setter + def auto_low_pow_1(self, value: int) -> None: + self.write(BMA400Addr.auto_low_pow_1, value) + + @property + def auto_wake_up_0(self) -> int: + return self.read(BMA400Addr.auto_wake_up_0) + + @auto_wake_up_0.setter + def auto_wake_up_0(self, value: int) -> None: + self.write(BMA400Addr.auto_wake_up_0, value) + + @property + def auto_wake_up_1(self) -> int: + return self.read(BMA400Addr.auto_wake_up_1) + + @auto_wake_up_1.setter + def auto_wake_up_1(self, value: int) -> None: + self.write(BMA400Addr.auto_wake_up_1, value) + + @property + def wkup_int_config_0(self) -> int: + return self.read(BMA400Addr.wkup_int_config_0) + + @wkup_int_config_0.setter + def wkup_int_config_0(self, value: int) -> None: + self.write(BMA400Addr.wkup_int_config_0, value) + + @property + def wkup_int_config_1(self) -> int: + return self.read(BMA400Addr.wkup_int_config_1) + + @wkup_int_config_1.setter + def wkup_int_config_1(self, value: int) -> None: + self.write(BMA400Addr.wkup_int_config_1, value) + + @property + def wkup_int_config_2(self) -> int: + return self.read(BMA400Addr.wkup_int_config_2) + + @wkup_int_config_2.setter + def wkup_int_config_2(self, value: int) -> None: + self.write(BMA400Addr.wkup_int_config_2, value) + + @property + def wkup_int_config_3(self) -> int: + return self.read(BMA400Addr.wkup_int_config_3) + + @wkup_int_config_3.setter + def wkup_int_config_3(self, value: int) -> None: + self.write(BMA400Addr.wkup_int_config_3, value) + + @property + def wkup_int_config_4(self) -> int: + return self.read(BMA400Addr.wkup_int_config_4) + + @wkup_int_config_4.setter + def wkup_int_config_4(self, value: int) -> None: + self.write(BMA400Addr.wkup_int_config_4, value) + + @property + def orient_ch_config_0(self) -> int: + return self.read(BMA400Addr.orient_ch_config_0) + + @orient_ch_config_0.setter + def orient_ch_config_0(self, value: int) -> None: + self.write(BMA400Addr.orient_ch_config_0, value) + + @property + def orient_ch_config_1(self) -> int: + return self.read(BMA400Addr.orient_ch_config_1) + + @orient_ch_config_1.setter + def orient_ch_config_1(self, value: int) -> None: + self.write(BMA400Addr.orient_ch_config_1, value) + + @property + def orient_ch_config_3(self) -> int: + return self.read(BMA400Addr.orient_ch_config_3) + + @orient_ch_config_3.setter + def orient_ch_config_3(self, value: int) -> None: + self.write(BMA400Addr.orient_ch_config_3, value) + + @property + def orient_ch_config_4(self) -> int: + return self.read(BMA400Addr.orient_ch_config_4) + + @orient_ch_config_4.setter + def orient_ch_config_4(self, value: int) -> None: + self.write(BMA400Addr.orient_ch_config_4, value) + + @property + def orient_ch_config_5(self) -> int: + return self.read(BMA400Addr.orient_ch_config_5) + + @orient_ch_config_5.setter + def orient_ch_config_5(self, value: int) -> None: + self.write(BMA400Addr.orient_ch_config_5, value) + + @property + def orient_ch_config_6(self) -> int: + return self.read(BMA400Addr.orient_ch_config_6) + + @orient_ch_config_6.setter + def orient_ch_config_6(self, value: int) -> None: + self.write(BMA400Addr.orient_ch_config_6, value) + + @property + def orient_ch_config_7(self) -> int: + return self.read(BMA400Addr.orient_ch_config_7) + + @orient_ch_config_7.setter + def orient_ch_config_7(self, value: int) -> None: + self.write(BMA400Addr.orient_ch_config_7, value) + + @property + def orient_ch_config_8(self) -> int: + return self.read(BMA400Addr.orient_ch_config_8) + + @orient_ch_config_8.setter + def orient_ch_config_8(self, value: int) -> None: + self.write(BMA400Addr.orient_ch_config_8, value) + + @property + def orient_ch_config_9(self) -> int: + return self.read(BMA400Addr.orient_ch_config_9) + + @orient_ch_config_9.setter + def orient_ch_config_9(self, value: int) -> None: + self.write(BMA400Addr.orient_ch_config_9, value) + + @property + def gen1_int_config_0(self) -> int: + return self.read(BMA400Addr.gen1_int_config_0) + + @gen1_int_config_0.setter + def gen1_int_config_0(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_0, value) + + @property + def gen1_int_config_1(self) -> int: + return self.read(BMA400Addr.gen1_int_config_1) + + @gen1_int_config_1.setter + def gen1_int_config_1(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_1, value) + + @property + def gen1_int_config_2(self) -> int: + return self.read(BMA400Addr.gen1_int_config_2) + + @gen1_int_config_2.setter + def gen1_int_config_2(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_2, value) + + @property + def gen1_int_config_3(self) -> int: + return self.read(BMA400Addr.gen1_int_config_3) + + @gen1_int_config_3.setter + def gen1_int_config_3(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_3, value) + + @property + def gen1_int_config_31(self) -> int: + return self.read(BMA400Addr.gen1_int_config_31) + + @gen1_int_config_31.setter + def gen1_int_config_31(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_31, value) + + @property + def gen1_int_config_4(self) -> int: + return self.read(BMA400Addr.gen1_int_config_4) + + @gen1_int_config_4.setter + def gen1_int_config_4(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_4, value) + + @property + def gen1_int_config_5(self) -> int: + return self.read(BMA400Addr.gen1_int_config_5) + + @gen1_int_config_5.setter + def gen1_int_config_5(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_5, value) + + @property + def gen1_int_config_6(self) -> int: + return self.read(BMA400Addr.gen1_int_config_6) + + @gen1_int_config_6.setter + def gen1_int_config_6(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_6, value) + + @property + def gen1_int_config_7(self) -> int: + return self.read(BMA400Addr.gen1_int_config_7) + + @gen1_int_config_7.setter + def gen1_int_config_7(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_7, value) + + @property + def gen1_int_config_8(self) -> int: + return self.read(BMA400Addr.gen1_int_config_8) + + @gen1_int_config_8.setter + def gen1_int_config_8(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_8, value) + + @property + def gen1_int_config_9(self) -> int: + return self.read(BMA400Addr.gen1_int_config_9) + + @gen1_int_config_9.setter + def gen1_int_config_9(self, value: int) -> None: + self.write(BMA400Addr.gen1_int_config_9, value) + + @property + def gen2_int_config_0(self) -> int: + return self.read(BMA400Addr.gen2_int_config_0) + + @gen2_int_config_0.setter + def gen2_int_config_0(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_0, value) + + @property + def gen2_int_config_1(self) -> int: + return self.read(BMA400Addr.gen2_int_config_1) + + @gen2_int_config_1.setter + def gen2_int_config_1(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_1, value) + + @property + def gen2_int_config_2(self) -> int: + return self.read(BMA400Addr.gen2_int_config_2) + + @gen2_int_config_2.setter + def gen2_int_config_2(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_2, value) + + @property + def gen2_int_config_3(self) -> int: + return self.read(BMA400Addr.gen2_int_config_3) + + @gen2_int_config_3.setter + def gen2_int_config_3(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_3, value) + + @property + def gen2_int_config_31(self) -> int: + return self.read(BMA400Addr.gen2_int_config_31) + + @gen2_int_config_31.setter + def gen2_int_config_31(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_31, value) + + @property + def gen2_int_config_4(self) -> int: + return self.read(BMA400Addr.gen2_int_config_4) + + @gen2_int_config_4.setter + def gen2_int_config_4(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_4, value) + + @property + def gen2_int_config_5(self) -> int: + return self.read(BMA400Addr.gen2_int_config_5) + + @gen2_int_config_5.setter + def gen2_int_config_5(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_5, value) + + @property + def gen2_int_config_6(self) -> int: + return self.read(BMA400Addr.gen2_int_config_6) + + @gen2_int_config_6.setter + def gen2_int_config_6(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_6, value) + + @property + def gen2_int_config_7(self) -> int: + return self.read(BMA400Addr.gen2_int_config_7) + + @gen2_int_config_7.setter + def gen2_int_config_7(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_7, value) + + @property + def gen2_int_config_8(self) -> int: + return self.read(BMA400Addr.gen2_int_config_8) + + @gen2_int_config_8.setter + def gen2_int_config_8(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_8, value) + + @property + def gen2_int_config_9(self) -> int: + return self.read(BMA400Addr.gen2_int_config_9) + + @gen2_int_config_9.setter + def gen2_int_config_9(self, value: int) -> None: + self.write(BMA400Addr.gen2_int_config_9, value) + + @property + def acth_config_0(self) -> int: + return self.read(BMA400Addr.acth_config_0) + + @acth_config_0.setter + def acth_config_0(self, value: int) -> None: + self.write(BMA400Addr.acth_config_0, value) + + @property + def acth_config_1(self) -> int: + return self.read(BMA400Addr.acth_config_1) + + @acth_config_1.setter + def acth_config_1(self, value: int) -> None: + self.write(BMA400Addr.acth_config_1, value) + + @property + def tap_config(self) -> int: + return self.read(BMA400Addr.tap_config) + + @tap_config.setter + def tap_config(self, value: int) -> None: + self.write(BMA400Addr.tap_config, value) + + @property + def tap_config_1(self) -> int: + return self.read(BMA400Addr.tap_config_1) + + @tap_config_1.setter + def tap_config_1(self, value: int) -> None: + self.write(BMA400Addr.tap_config_1, value) + + @property + def if_conf(self) -> int: + return self.read(BMA400Addr.if_conf) + + @if_conf.setter + def if_conf(self, value: int) -> None: + self.write(BMA400Addr.if_conf, value) + + @property + def self_test(self) -> int: + return self.read(BMA400Addr.self_test) + + @self_test.setter + def self_test(self, value: int) -> None: + self.write(BMA400Addr.self_test, value) + + @property + def cmd(self) -> int: + return self.read(BMA400Addr.cmd) + + @cmd.setter + def cmd(self, value: int) -> None: + self.write(BMA400Addr.cmd, value) diff --git a/src/umrx_app_v3/shuttle_board/bma400/__init__.py b/src/umrx_app_v3/shuttle_board/bma400/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/umrx_app_v3/shuttle_board/bma400/bma400_shuttle.py b/src/umrx_app_v3/shuttle_board/bma400/bma400_shuttle.py new file mode 100644 index 0000000..cccf441 --- /dev/null +++ b/src/umrx_app_v3/shuttle_board/bma400/bma400_shuttle.py @@ -0,0 +1,223 @@ +import logging +import time +from array import array +from typing import Any, Self + +from umrx_app_v3.mcu_board.app_board_v3_rev0 import ApplicationBoardV3Rev0 +from umrx_app_v3.mcu_board.app_board_v3_rev1 import ApplicationBoardV3Rev1 +from umrx_app_v3.mcu_board.bst_app_board import ApplicationBoard +from umrx_app_v3.mcu_board.bst_protocol_constants import ( + I2CMode, + MultiIOPin, + PinDirection, + PinValue, + SPIBus, + StreamingSamplingUnit, +) +from umrx_app_v3.mcu_board.commands.spi import SPIConfigureCmd +from umrx_app_v3.sensors.bma400 import BMA400, BMA400Addr + +logger = logging.getLogger(__name__) + + +class BMA400ShuttleError(Exception): ... + + +class BMA400Shuttle: + # 1-wire PROM + SHUTTLE_ID = 0x1A1 + # Pins + SDO = MultiIOPin.MINI_SHUTTLE_PIN_2_3 + CS = MultiIOPin.MINI_SHUTTLE_PIN_2_1 + INT1 = MultiIOPin.MINI_SHUTTLE_PIN_1_6 + INT2 = MultiIOPin.MINI_SHUTTLE_PIN_1_7 + # I2C addresses + I2C_DEFAULT_ADDRESS = 0x14 + I2C_ALTERNATIVE_ADDRESS = 0x15 + + def __init__(self, **kw: Any) -> None: + self.board: ApplicationBoard | None = kw["board"] if kw.get("board") else None + self.sensor: BMA400 = BMA400() + self.is_initialized: bool = False + self.is_i2c_configured: bool = False + self.is_spi_configured: bool = False + self.is_polling_streaming_configured: bool = False + self.is_interrupt_streaming_configured: bool = False + + def attach_to(self, board: ApplicationBoard) -> None: + self.board = board + + @classmethod + def on_hardware_v3_rev0(cls) -> Self: + return cls(board=ApplicationBoardV3Rev0()) + + @classmethod + def on_hardware_v3_rev1(cls) -> Self: + return cls(board=ApplicationBoardV3Rev1()) + + def initialize(self) -> None: + self.board.initialize() + self.board.start_communication() + self.is_initialized = True + + def check_connected_hw(self) -> None: + board_info = self.board.board_info + if board_info.shuttle_id != self.SHUTTLE_ID: + error_message = f"Expect shuttle_id={self.SHUTTLE_ID} got {board_info.shuttle_id}" + raise BMA400ShuttleError(error_message) + + def assign_sensor_callbacks(self) -> None: + self.sensor.assign_callbacks(read_callback=self.read_register, write_callback=self.write_register) + + def configure_i2c(self) -> None: + self.board.set_vdd_vddio(0.0, 0.0) + time.sleep(0.1) + self.board.set_pin_config(self.SDO, PinDirection.OUTPUT, PinValue.LOW) + self.board.set_pin_config(self.CS, PinDirection.OUTPUT, PinValue.HIGH) + self.board.set_vdd_vddio(3.3, 3.3) + time.sleep(0.01) + self.board.configure_i2c(I2CMode.FAST_MODE) + self.assign_sensor_callbacks() + self.is_i2c_configured = True + self.is_spi_configured = False + + def configure_spi(self) -> None: + self.board.set_vdd_vddio(0.0, 0.0) + time.sleep(0.1) + self.board.set_pin_config(self.CS, PinDirection.OUTPUT, PinValue.HIGH) + self.board.set_vdd_vddio(3.3, 3.3) + time.sleep(0.2) + if isinstance(self.board, ApplicationBoardV3Rev1): + SPIConfigureCmd.set_bus(SPIBus.BUS_1) + self.board.configure_spi() + self.assign_sensor_callbacks() + self.is_spi_configured = True + self.is_i2c_configured = False + + def read_register(self, reg_addr: int, bytes_to_read: int = 1) -> array[int] | int: + if isinstance(reg_addr, BMA400Addr): + reg_addr = reg_addr.value + if self.is_i2c_configured: + values = self.board.read_i2c(self.I2C_DEFAULT_ADDRESS, reg_addr, bytes_to_read) + if bytes_to_read == 1: + return values[0] + return values + if self.is_spi_configured: + if bytes_to_read == 1: + return self.read_single_register_spi(reg_addr) + return self.read_multiple_spi(reg_addr, bytes_to_read) + + error_message = "Configure I2C or SPI protocol prior to reading registers" + raise BMA400ShuttleError(error_message) + + def read_single_register_spi(self, reg_addr: int) -> int: + values = self.board.read_spi(self.CS, reg_addr, 2) + return values[1] + + def read_multiple_spi(self, start_register_addr: int, bytes_to_read: int) -> array[int]: + values = self.board.read_spi(self.CS, start_register_addr, bytes_to_read + 1) + return values[1:] + + def write_register(self, reg_addr: int, value: int) -> None: + if isinstance(reg_addr, BMA400Addr): + reg_addr = reg_addr.value + if self.is_i2c_configured: + return self.board.write_i2c(self.I2C_DEFAULT_ADDRESS, reg_addr, array("B", (value,))) + if self.is_spi_configured: + return self.board.write_spi(self.CS, reg_addr, array("B", (value,))) + error_message = "Configure I2C or SPI protocol prior to reading registers" + raise BMA400ShuttleError(error_message) + + def _configure_i2c_polling_streaming( + self, + sampling_time: int, + sampling_unit: StreamingSamplingUnit, + ) -> None: + self.board.streaming_polling_set_i2c_channel( + i2c_address=self.I2C_DEFAULT_ADDRESS, + sampling_time=sampling_time, + sampling_unit=sampling_unit, + register_address=BMA400Addr.acc_x_lsb.value, + bytes_to_read=(6 + 3), + ) + self.board.configure_streaming_polling(interface="i2c") + self.is_polling_streaming_configured = True + + def _configure_spi_polling_streaming( + self, + sampling_time: int, + sampling_unit: StreamingSamplingUnit, + ) -> None: + self.board.streaming_polling_set_spi_channel( + cs_pin=self.CS, + sampling_time=sampling_time, + sampling_unit=sampling_unit, + register_address=BMA400Addr.acc_x_lsb.value, + bytes_to_read=(1 + 6 + 3), + ) + self.board.configure_streaming_polling(interface="spi") + self.is_polling_streaming_configured = True + + def switch_on_accel(self) -> None: + power_normal = 0x02 + self.sensor.acc_config_0 = power_normal << 0 + acc_odr = 0x0B # 800 Hz + self.sensor.acc_config_1 = acc_odr << 0 + + def configure_polling_streaming( + self, + sampling_time: int = 1, + sampling_unit: StreamingSamplingUnit = StreamingSamplingUnit.MILLI_SECOND, + ) -> None: + self.switch_on_accel() + if self.is_i2c_configured: + return self._configure_i2c_polling_streaming(sampling_time, sampling_unit) + if self.is_spi_configured: + return self._configure_spi_polling_streaming(sampling_time, sampling_unit) + error_message = "Configure I2C or SPI protocol first" + raise BMA400ShuttleError(error_message) + + def _configure_i2c_interrupt_streaming(self) -> None: + self.board.streaming_interrupt_set_i2c_channel( + interrupt_pin=self.INT1, + i2c_address=BMA400Shuttle.I2C_DEFAULT_ADDRESS, + register_address=BMA400Addr.acc_x_lsb.value, + bytes_to_read=(6 + 3), + ) + self.board.configure_streaming_interrupt(interface="i2c") + self.is_interrupt_streaming_configured = True + + def _configure_spi_interrupt_streaming(self) -> None: + self.board.streaming_interrupt_set_spi_channel( + interrupt_pin=self.INT1, + cs_pin=self.CS, + register_address=BMA400Addr.acc_x_lsb.value, + bytes_to_read=(6 + 3 + 1), + ) + self.board.configure_streaming_interrupt(interface="spi") + self.is_interrupt_streaming_configured = True + + def configure_interrupt_streaming(self) -> None: + self.switch_on_accel() + self.sensor.int_config_0 = 1 << 7 + self.sensor.int1_map = 1 << 7 + time.sleep(0.02) + if self.is_i2c_configured: + return self._configure_i2c_interrupt_streaming() + if self.is_spi_configured: + return self._configure_spi_interrupt_streaming() + error_message = "Configure I2C or SPI protocol first" + raise BMA400ShuttleError(error_message) + + def start_streaming(self) -> None: + if self.is_polling_streaming_configured: + return self.board.start_polling_streaming() + if self.is_interrupt_streaming_configured: + return self.board.start_interrupt_streaming() + error_message = "Configure polling or interrupt streaming before streaming start" + raise BMA400ShuttleError(error_message) + + def stop_streaming(self) -> None: + self.board.stop_polling_streaming() + time.sleep(0.15) + self.board.stop_interrupt_streaming() diff --git a/tests/conftest.py b/tests/conftest.py index 524cb55..3d0a63b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,6 +20,7 @@ StreamingPollingCmd, ) from umrx_app_v3.mcu_board.commands.timer import TimerCmd +from umrx_app_v3.sensors.bma400 import BMA400 from umrx_app_v3.sensors.bmi088 import BMI088 from umrx_app_v3.sensors.bmi323 import BMI323 from umrx_app_v3.sensors.bmp390 import BMP390 @@ -169,3 +170,8 @@ def bmi323() -> BMI323: @pytest.fixture(scope="session", autouse=True) def bmp390() -> BMP390: return BMP390() + + +@pytest.fixture(scope="session", autouse=True) +def bma400() -> BMA400: + return BMA400() diff --git a/tests/sensors/test_bma400.py b/tests/sensors/test_bma400.py new file mode 100644 index 0000000..b7fb4ea --- /dev/null +++ b/tests/sensors/test_bma400.py @@ -0,0 +1,30 @@ +import struct +from unittest.mock import patch + +from umrx_app_v3.sensors.bma400 import BMA400 + + +def test_bma400_read_properties(bma400: BMA400) -> None: + all_properties = {key: value for key, value in bma400.__class__.__dict__.items() if isinstance(value, property)} + write_only_properties = [ + key for key, value in all_properties.items() if (value.fset is not None) and (value.fget is None) + ] + readable_properties = all_properties.keys() - write_only_properties + + for readable_property in readable_properties: + with ( + patch.object(bma400, "read", return_value=(1, 2, 3)) as mocked_read, + patch.object(struct, "unpack", return_value=(1, 2, 3)), + ): + getattr(bma400, readable_property) + mocked_read.assert_called_once() + + +def test_bma400_write_properties(bma400: BMA400) -> None: + all_properties = {key: value for key, value in bma400.__class__.__dict__.items() if isinstance(value, property)} + writable_properties = [key for key, value in all_properties.items() if value.fset is not None] + + for writable_property in writable_properties: + with patch.object(bma400, "write") as mocked_write: + setattr(bma400, writable_property, 123) + mocked_write.assert_called_once()