diff --git a/apollo_fpga/__init__.py b/apollo_fpga/__init__.py index 8025093..9450eab 100644 --- a/apollo_fpga/__init__.py +++ b/apollo_fpga/__init__.py @@ -38,6 +38,10 @@ class ApolloDebugger: APOLLO_USB_IDS = [(0x1d50, 0x615c)] LUNA_USB_IDS = [(0x1d50, 0x615b)] + # Add pid.codes VID/PID pairs with PID from 0x0001 to 0x0010 + for i in range(16): + LUNA_USB_IDS += [(0x1209, i+1)] + # If we have a LUNA_USB_IDS variable, we can use it to find the LUNA device. if os.getenv("LUNA_USB_IDS"): LUNA_USB_IDS += [tuple([int(x, 16) for x in os.getenv("LUNA_USB_IDS").split(":")])] diff --git a/apollo_fpga/commands/cli.py b/apollo_fpga/commands/cli.py index aa0c0cb..81f5048 100755 --- a/apollo_fpga/commands/cli.py +++ b/apollo_fpga/commands/cli.py @@ -16,12 +16,23 @@ import logging import argparse from collections import namedtuple +import xdg.BaseDirectory +from functools import partial from apollo_fpga import ApolloDebugger from apollo_fpga.jtag import JTAGChain, JTAGPatternError -from apollo_fpga.ecp5 import ECP5_JTAGProgrammer +from apollo_fpga.ecp5 import ECP5_JTAGProgrammer, ECP5FlashBridgeProgrammer from apollo_fpga.onboard_jtag import * +try: + from amaranth.build.run import LocalBuildProducts + from luna.gateware.platform import get_appropriate_platform + from apollo_fpga.gateware.flash_bridge import FlashBridge, FlashBridgeConnection +except ImportError: + flash_fast_enable = False +else: + flash_fast_enable = True + # # Common JEDEC manufacturer IDs for SPI flash chips. @@ -161,6 +172,44 @@ def program_flash(device, args): programmer.flash(bitstream, offset=offset) + +def program_flash_fast(device, args, *, platform): + + # Retrieve a FlashBridge cached bitstream or build it + plan = platform.build(FlashBridge(), do_build=False) + cache_dir = os.path.join( + xdg.BaseDirectory.save_cache_path('apollo'), 'build', plan.digest().hex() + ) + if os.path.exists(cache_dir): + products = LocalBuildProducts(cache_dir) + else: + products = plan.execute_local(cache_dir) + + # Configure flash bridge + with device.jtag as jtag: + programmer = device.create_jtag_programmer(jtag) + programmer.configure(products.get("top.bit")) + + # Let the LUNA gateware take over in devices with shared USB port + device.honor_fpga_adv() + + # Wait for flash bridge enumeration + time.sleep(2) + + # Program SPI flash memory using the configured bridge + bridge = FlashBridgeConnection() + programmer = ECP5FlashBridgeProgrammer(bridge=bridge) + with open(args.file, "rb") as f: + bitstream = f.read() + programmer.flash(bitstream) + + +def program_flash_fast_unavailable(device, args): + logging.error("`flash-fast` requires the `luna` package in the Python environment.\n" + "Install `luna` or use `flash` instead.") + sys.exit(-1) + + def read_back_flash(device, args): ensure_unconfigured(device) @@ -298,7 +347,9 @@ def main(): Command("flash-erase", handler=erase_flash, help="Erases the contents of the FPGA's flash memory."), Command("flash-program", alias=["flash"], args=["file", "--offset"], handler=program_flash, - help="Programs the target bitstream onto the attached FPGA."), + help="Programs the target bitstream onto the FPGA's configuration flash."), + Command("flash-fast", args=["file", "--offset"], handler=program_flash_fast, + help="Programs a bitstream onto the FPGA's configuration flash using a SPI bridge"), Command("flash-read", args=["file", "--offset", "--length"], handler=read_back_flash, help="Reads the contents of the attached FPGA's configuration flash."), @@ -346,6 +397,13 @@ def main(): parser.print_help() return + # Add a special case where the platform information is needed + if args.command == "flash-fast": + if flash_fast_enable: + args.func = partial(program_flash_fast, platform=get_appropriate_platform()) + else: + args.func = program_flash_fast_unavailable + device = ApolloDebugger() # Set up python's logging to act as a simple print, for now. diff --git a/apollo_fpga/ecp5.py b/apollo_fpga/ecp5.py index 0d74112..fe86ba2 100644 --- a/apollo_fpga/ecp5.py +++ b/apollo_fpga/ecp5.py @@ -1260,3 +1260,41 @@ def reverse_bits(num): # Bit-reverse the data we capture in response, compensating for MSB-first ordering. response = [reverse_bits(b) for b in bytes(response)] return bytes(response) + + +class ECP5FlashBridgeProgrammer(ECP5CommandBasedProgrammer): + """ Class that enables programming the configuration SPI flash using the FPGA as + a SPI bridge (needs companion gateware). + + This programmer is only used for flashing the SPI memory. + """ + + # Only useful for flashing operation + + def __init__(self, bridge, *args, **kwargs): + """ Creates a new ECP5 Flash Bridge Programmer interface. + + Parameters: + bridge -- The connection object to operate with the gateware bridge. + + See ECP5Programmer.__init__ for additional accepted arguments. + """ + + # Store a reference to our SPI bridge. + self.bridge = bridge + + # And run the parent configuration. + super(ECP5FlashBridgeProgrammer, self).__init__(*args, **kwargs) + + def trigger_reconfiguration(self): + """ Triggers the target FPGA to reconfigure itself from its flash chip. """ + return self.bridge.trigger_reconfiguration() + + def _enter_background_spi(self, reset_flash=True): + """ Places the FPGA into background SPI mode; for e.g. programming a connected flash. """ + pass + + def _background_spi_transfer(self, data, reverse=False, ignore_response=False): + """ Performs a SPI transfer, targeting the configuration flash.""" + return self.bridge.transfer(data) + \ No newline at end of file diff --git a/apollo_fpga/gateware/flash_bridge.py b/apollo_fpga/gateware/flash_bridge.py new file mode 100644 index 0000000..caa34df --- /dev/null +++ b/apollo_fpga/gateware/flash_bridge.py @@ -0,0 +1,348 @@ +# +# This file is part of LUNA. +# +# Copyright (c) 2023 Great Scott Gadgets +# SPDX-License-Identifier: BSD-3-Clause + +import usb.core + +from amaranth import Signal, Elaboratable, Module, Cat, C +from amaranth.lib.fifo import AsyncFIFO + +from luna.gateware.interface.flash import ECP5ConfigurationFlashInterface +from luna.gateware.interface.spi import SPIBus +from luna.gateware.stream import StreamInterface +from luna.gateware.usb.usb2.request import USBRequestHandler +from luna.usb2 import USBDevice, USBStreamInEndpoint, USBStreamOutEndpoint + +from usb_protocol.types import USBRequestType, USBRequestRecipient +from usb_protocol.emitters import DeviceDescriptorCollection + +VENDOR_ID = 0x1209 +PRODUCT_ID = 0x000F + +BULK_ENDPOINT_NUMBER = 1 +MAX_BULK_PACKET_SIZE = 512 + + +class SPIStreamController(Elaboratable): + """ Class that drives a SPI bus with data from input stream packets. + Data received from the device is returned as another packet.""" + + def __init__(self): + self.period = 4 # powers of two only + self.bus = SPIBus() + self.input = StreamInterface() + self.output = StreamInterface() + + def elaborate(self, platform): + m = Module() + + # Counter for clock generation + cycles = Signal(range(self.period)) + + # Generate strobes for clock edges + sck_fall = Signal() + sck_rise = Signal() + sck_d = Signal() + m.d.sync += sck_d.eq(self.bus.sck) + m.d.comb += [ + sck_fall.eq( sck_d & ~self.bus.sck), # falling edge + sck_rise.eq(~sck_d & self.bus.sck), # rising edge + ] + + # I/O shift registers, bit counter and last flag + shreg_o = Signal(8) + shreg_i = Signal(8) + count_o = Signal(range(8)) + last = Signal() + + m.d.comb += [ + self.bus.sdi .eq(shreg_o[-1]), + self.output.payload .eq(shreg_i), + ] + + with m.FSM() as fsm: + m.d.comb += self.bus.cs.eq(~fsm.ongoing('IDLE')) + + with m.State("IDLE"): + m.d.comb += [ + self.input.ready .eq(1), + self.bus.sck .eq(0), + ] + with m.If(self.input.valid): + m.next = 'SHIFT' + + with m.State("WAIT"): + m.d.comb += [ + self.input.ready .eq(1), + self.bus.sck .eq(0), + ] + with m.If(self.input.valid): + m.next = 'SHIFT' + + with m.State("SHIFT"): + m.d.comb += [ + self.input.ready .eq(sck_fall & (count_o == 0) & ~last), + self.bus.sck .eq(cycles[-1]) + ] + m.d.sync += cycles.eq(cycles + 1) + + # Read logic, latch on rising edge + m.d.sync += self.output.valid.eq(0) + with m.If(sck_rise): + m.d.sync += [ + shreg_i .eq(Cat(self.bus.sdo, shreg_i[:-1])), + self.output.valid .eq(count_o == 0), + self.output.last .eq(last), + ] + + # Write logic, setup on falling edge + with m.If(sck_fall): + m.d.sync += [ + shreg_o .eq(Cat(C(0,1), shreg_o[:-1])), + count_o .eq(count_o - 1), + ] + with m.If(count_o == 0): + with m.If(last): + m.next = 'END' + with m.Elif(~self.input.valid): + m.next = 'WAIT' + + with m.State("END"): + m.d.comb += [ + self.input.ready .eq(0), + self.bus.sck .eq(0), + ] + m.d.sync += [ + last .eq(0), + cycles .eq(0), + ] + m.next = 'IDLE' + + with m.If(self.input.valid & self.input.ready): + m.d.sync += [ + shreg_o .eq(self.input.payload), + last .eq(self.input.last), + count_o .eq(7), + ] + + return m + + +class FlashBridgeRequestHandler(USBRequestHandler): + """ Request handler that can trigger a FPGA reconfiguration. """ + + REQUEST_TRIGGER_RECONF = 0 + + def __init__(self, if_number): + super().__init__() + self.if_number = if_number + + def elaborate(self, platform): + m = Module() + + interface = self.interface + setup = self.interface.setup + + # + # Vendor request handlers. + + self_prog = platform.request("self_program", dir="o").o + + with m.If((setup.type == USBRequestType.VENDOR) & \ + (setup.recipient == USBRequestRecipient.INTERFACE) & \ + (setup.index == self.if_number)): + + with m.Switch(setup.request): + + with m.Case(self.REQUEST_TRIGGER_RECONF): + + m.d.comb += interface.claim.eq(1) + + # Once the receive is complete, respond with an ACK. + with m.If(interface.rx_ready_for_response): + m.d.comb += interface.handshakes_out.ack.eq(1) + + # If we reach the status stage, send a ZLP. + with m.If(interface.status_requested): + m.d.comb += self.send_zlp() + m.d.usb += self_prog.eq(1) + + return m + + +class FlashBridgeSubmodule(Elaboratable): + """ Implements gateware for the USB<->SPI bridge. Intended to use as a submodule + See example in FlashBridge """ + + def __init__(self, endpoint): + # Endpoint number for the in/out stream endpoints + self.endpoint = endpoint + + # Define endpoints + self.endpoint_out = USBStreamOutEndpoint( + endpoint_number=endpoint, + max_packet_size=MAX_BULK_PACKET_SIZE, + ) + self.endpoint_in = USBStreamInEndpoint( + endpoint_number=endpoint, + max_packet_size=MAX_BULK_PACKET_SIZE + ) + + def elaborate(self, platform): + m = Module() + + stream_in = self.endpoint_in.stream + stream_out = self.endpoint_out.stream + + # Use two small asynchronous FIFOs for crossing clock domains + spi = SPIStreamController() + spi_bus = ECP5ConfigurationFlashInterface(bus=platform.request('spi_flash'), use_cs=True) + tx_fifo = AsyncFIFO(width=8+1, depth=8, w_domain="usb", r_domain="sync") + rx_fifo = AsyncFIFO(width=8+1, depth=8, w_domain="sync", r_domain="usb") + + m.submodules += spi + m.submodules += spi_bus + m.submodules += tx_fifo + m.submodules += rx_fifo + + m.d.comb += [ + # Connect output from USB host to transmission FIFO + tx_fifo.w_data .eq(Cat(stream_out.payload, stream_out.last)), + tx_fifo.w_en .eq(stream_out.valid), + stream_out.ready .eq(tx_fifo.w_rdy), + + # Connect transmission FIFO to the SPI controller + Cat(spi.input.payload, spi.input.last).eq(tx_fifo.r_data), + spi.input.valid .eq(tx_fifo.r_rdy), + tx_fifo.r_en .eq(spi.input.ready), + + # Connect output from SPI controller to reception FIFO + rx_fifo.w_data .eq(Cat(spi.output.payload, spi.output.last)), + rx_fifo.w_en .eq(spi.output.valid), + spi.output.ready .eq(1), # ignore rx_fifo.w_rdy + + # Connect reception FIFO to USB host input + Cat(stream_in.payload, stream_in.last).eq(rx_fifo.r_data), + stream_in.valid .eq(rx_fifo.r_rdy), + rx_fifo.r_en .eq(stream_in.ready), + + # Connect the SPI bus to our SPI controller + spi_bus.sck .eq(spi.bus.sck), + spi_bus.sdi .eq(spi.bus.sdi), + spi_bus.cs .eq(spi.bus.cs), + spi.bus.sdo .eq(spi_bus.sdo), + ] + + return m + + +class FlashBridge(Elaboratable): + + def create_descriptors(self): + """ Create the descriptors we want to use for our device. """ + + descriptors = DeviceDescriptorCollection() + + # + # We'll add the major components of the descriptors we we want. + # The collection we build here will be necessary to create a standard endpoint. + # + + # We'll need a device descriptor... + with descriptors.DeviceDescriptor() as d: + d.idVendor = VENDOR_ID + d.idProduct = PRODUCT_ID + + d.iManufacturer = "LUNA" + d.iProduct = "Configuration Flash bridge" + d.iSerialNumber = "no serial" + + d.bNumConfigurations = 1 + + # ... and a description of the USB configuration we'll provide. + with descriptors.ConfigurationDescriptor() as c: + + with c.InterfaceDescriptor() as i: + i.bInterfaceNumber = 0 + i.bInterfaceClass = 0xFF + i.bInterfaceSubclass = 0x01 + + with i.EndpointDescriptor() as e: + e.bEndpointAddress = BULK_ENDPOINT_NUMBER + e.wMaxPacketSize = MAX_BULK_PACKET_SIZE + + with i.EndpointDescriptor() as e: + e.bEndpointAddress = 0x80 | BULK_ENDPOINT_NUMBER + e.wMaxPacketSize = MAX_BULK_PACKET_SIZE + + return descriptors + + def elaborate(self, platform): + m = Module() + + # Generate our domain clocks/resets. + m.submodules.car = platform.clock_domain_generator() + + # Create our USB device interface... + ulpi = platform.request(platform.default_usb_connection) + m.submodules.usb = usb = USBDevice(bus=ulpi) + + # Add our standard control endpoint to the device. + descriptors = self.create_descriptors() + control_ep = usb.add_standard_control_endpoint(descriptors) + + # Add our vendor request handler to the control endpoint. + control_ep.add_request_handler(FlashBridgeRequestHandler(0)) + + # Add bridge submodule and input/output stream endpoints to our device. + m.submodules.bridge = bridge = FlashBridgeSubmodule(BULK_ENDPOINT_NUMBER) + usb.add_endpoint(bridge.endpoint_in) + usb.add_endpoint(bridge.endpoint_out) + + # Connect our device + m.d.comb += usb.connect.eq(1) + + return m + + +class FlashBridgeNotFound(IOError): + pass + +class FlashBridgeConnection: + def __init__(self): + # Try to create a connection to our configuration flash bridge. + device = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID, custom_match=self._find_cfg_flash_bridge) + + # If we couldn't find the bridge, bail out. + if device is None: + raise FlashBridgeNotFound("Unable to find device") + + self.device = device + self.interface, self.endpoint = self._find_cfg_flash_bridge(device, get_ep=True) + + def __del__(self): + self.trigger_reconfiguration() + + @staticmethod + def _find_cfg_flash_bridge(dev, get_ep=False): + for cfg in dev: + for intf in usb.util.find_descriptor(cfg, find_all=True, bInterfaceClass=0xFF, bInterfaceSubClass=0x01): + if not get_ep: + return True + return intf.bInterfaceNumber, intf[0].bEndpointAddress + return None, None if get_ep else False + + def trigger_reconfiguration(self): + """ Triggers the target FPGA to reconfigure itself from its flash chip. """ + request_type = usb.ENDPOINT_OUT | usb.RECIP_INTERFACE | usb.TYPE_VENDOR + return self.device.ctrl_transfer(request_type, 0, wValue=0, wIndex=self.interface) + + def transfer(self, data): + """ Performs a SPI transfer, targeting the configuration flash.""" + tx_sent = self.device.write(self.endpoint, data) + assert tx_sent == len(data) + rx_data = self.device.read(0x80 | self.endpoint, 512) + assert len(rx_data) == tx_sent, f'Expected {tx_sent} bytes, received {len(rx_data)}' + return rx_data diff --git a/pyproject.toml b/pyproject.toml index 18c76c8..606a271 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "pyusb>1.1.1", "pyvcd>=0.2.4", "prompt-toolkit>3.0.16", + "pyxdg>=0.27", ] [project.urls]