From 6baa07a69bff59249e6e88e5a8ac2aca0bfa7b51 Mon Sep 17 00:00:00 2001 From: Florent Kermarrec Date: Thu, 6 Feb 2020 11:06:41 +0100 Subject: [PATCH] soc/integration: add new soc class prorotype with SoCRegion/SoCBus/SoCCSR/SoCIRQ/SoC --- litex/soc/integration/soc.py | 559 +++++++++++++++++++++++++++++++++++ 1 file changed, 559 insertions(+) create mode 100755 litex/soc/integration/soc.py diff --git a/litex/soc/integration/soc.py b/litex/soc/integration/soc.py new file mode 100755 index 000000000..3847504e8 --- /dev/null +++ b/litex/soc/integration/soc.py @@ -0,0 +1,559 @@ +#!/usr/bin/env python3 + +# This file is Copyright (c) 2020 Florent Kermarrec +# License: BSD + +import logging +import time +import datetime + +from migen import * + +from litex.soc.interconnect import wishbone + +# TODO: +# - replace raise with exit on logging error. +# - use common module for SoCCSR/SoCIRQ. +# - add configurable CSR paging. +# - manage IO/Linker regions. + +logging.basicConfig(level=logging.INFO) + +# Helpers ------------------------------------------------------------------------------------------ +def colorer(s, color="bright"): + header = { + "bright": "\x1b[1m", + "green": "\x1b[32m", + "cyan": "\x1b[36m", + "red": "\x1b[31m", + "yellow": "\x1b[33m", + "underline": "\x1b[4m"}[color] + trailer = "\x1b[0m" + return header + str(s) + trailer + +def buildtime(with_time=True): + fmt = "%Y-%m-%d %H:%M:%S" if with_time else "%Y-%m-%d" + return datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d %H:%M:%S") + +# SoCRegion ---------------------------------------------------------------------------------------- + +class SoCRegion: + def __init__(self, origin=None, size=None, cached=True): + self.logger = logging.getLogger("SoCRegion") + self.origin = origin + self.size = size + self.cached = cached + + def decoder(self): + origin = self.origin + size = self.size + origin &= ~0x80000000 + size = 2**log2_int(size, False) + if (origin & (size - 1)) != 0: + self.logger.error("Origin needs to be aligned on size:") + self.logger.error(self) + raise + origin >>= 2 # bytes to words aligned + size >>= 2 # bytes to words aligned + return lambda a: (a[log2_int(size):-1] == (origin >> log2_int(size))) + + def __str__(self): + r = "" + if self.origin is not None: + r += "Origin: {}, ".format(colorer("0x{:08x}".format(self.origin))) + if self.size is not None: + r += "Size: {}, ".format(colorer("0x{:08x}".format(self.size))) + r += "Cached: {}".format(colorer(self.cached)) + return r + + +class SoCLinkerRegion(SoCRegion): + pass + +# SoCBus ------------------------------------------------------------------------------------------- + +class SoCBus: + supported_standard = ["wishbone"] + supported_data_width = [32, 64] + supported_address_width = [32] + + # Creation ------------------------------------------------------------------------------------- + def __init__(self, standard, data_width=32, address_width=32, timeout=1e6, reserved_regions={}): + self.logger = logging.getLogger("SoCBus") + self.logger.info(colorer("Creating new Bus Handler...", color="cyan")) + + # Check Standard + if standard not in self.supported_standard: + self.logger.error("Unsupported Standard: {} supporteds: {:s}".format( + colorer(standard, color="red"), + colorer(", ".join(self.supported_standard), color="green"))) + raise + + # Check Data Width + if data_width not in self.supported_data_width: + self.logger.error("Unsupported Data_Width: {} supporteds: {:s}".format( + colorer(data_width, color="red"), + colorer(", ".join(str(x) for x in self.supported_data_width), color="green"))) + raise + + # Check Address Width + if address_width not in self.supported_address_width: + self.logger.error("Unsupported Address Width: {} supporteds: {:s}".format( + colorer(data_width, color="red"), + colorer(", ".join(str(x) for x in self.supported_address_width), color="green"))) + raise + + # Create Bus + self.standard = standard + self.data_width = data_width + self.address_width = address_width + self.masters = {} + self.slaves = {} + self.regions = {} + self.timeout = timeout + self.logger.info("{}-bit {} Bus, {}GiB Address Space.".format( + colorer(data_width), colorer(standard), colorer(2**address_width/2**30))) + + # Adding reserved regions + self.logger.info("Adding {} Regions...".format(colorer("reserved"))) + for name, region in reserved_regions.items(): + if isinstance(region, int): + region = SoCRegion(origin=region, size=0x1000000) + self.add_region(name, region) + + self.logger.info(colorer("Bus Handler created.", color="cyan")) + + # Add/Allog/Check Regions ---------------------------------------------------------------------- + def add_region(self, name, region): + allocated = False + # Check if SoCLinkerRegion + if isinstance(region, SoCLinkerRegion): + self.logger.info("FIXME: SoCLinkerRegion") + # Check if SoCRegion + elif isinstance(region, SoCRegion): + # If no origin specified, allocate region. + if region.origin is None: + allocated = True + region = self.alloc_region(region.size, region.cached) + self.regions[name] = region + # Else add region and check for overlaps. + else: + self.regions[name] = region + overlap = self.check_region(self.regions) + if overlap is not None: + self.logger.error("Region overlap between {} and {}:".format( + colorer(overlap[0], color="red"), + colorer(overlap[1], color="red"))) + self.logger.error(str(self.regions[overlap[0]])) + self.logger.error(str(self.regions[overlap[1]])) + raise + self.logger.info("{} Region {} {}.".format( + colorer(name, color="underline"), + colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"), + str(region))) + else: + self.logger.error("{} is not a supported Region".format(colorer(name, color="red"))) + raise + + def alloc_region(self, size, cached=True): + self.logger.info("Allocating {} Region of size {}...".format( + colorer("Cached" if cached else "IO"), + colorer("0x{:08x}".format(size)))) + + # Limit Search Regions + uncached_regions = {} + for _, region in self.regions.items(): + if region.cached == False: + uncached_regions[name] = region + if cached == False: + search_regions = uncached_regions + else: + search_regions = {"main": SoCRegion(origin=0x00000000, size=2**self.address_width-1)} + + # Iterate on Search_Regions to find a Candidate + for _, search_region in search_regions.items(): + origin = search_region.origin + while (origin + size) < (search_region.origin + search_region.size): + # Create a Candicate. + candidate = SoCRegion(origin=origin, size=size, cached=cached) + overlap = False + # Check Candidate does not overlap with allocated existing regions + for _, allocated in self.regions.items(): + if self.check_region({"0": allocated, "1": candidate}) is not None: + origin = allocated.origin + allocated.size + overlap = True + break + if not overlap: + # If no overlap, the Candidate is selected + return candidate + + self.logger.error("Not enough Address Space to allocate Region") + raise + + def check_region(self, regions): + i = 0 + while i < len(regions): + n0 = list(regions.keys())[i] + r0 = regions[n0] + for n1 in list(regions.keys())[i+1:]: + r1 = regions[n1] + if isinstance(r0, SoCLinkerRegion) or isinstance(r1, SoCLinkerRegion): + continue + if r0.origin >= (r1.origin + r1.size): + continue + if r1.origin >= (r0.origin + r0.size): + continue + return (n0, n1) + i += 1 + return None + + # Add Master/Slave ----------------------------------------------------------------------------- + def add_master(self, name=None, master=None, io_regions={}): + if name is None: + name = "master{:d}".format(len(self.masters)) + if name in self.masters.keys(): + self.logger.error("{} already declared as Bus Master:".format(colorer(name, color="red"))) + self.logger.error(self) + raise + self.masters[name] = master + self.logger.info("{} {} as Bus Master.".format(colorer(name, color="underline"), colorer("added", color="green"))) + # FIXME: handle IO regions + + def add_slave(self, name=None, slave=None, region=None): + no_name = name is None + no_region = region is None + if no_name and no_region: + self.logger.error("Please specify at least {} or {} of Bus Slave".format( + colorer("name", color="red"), + colorer("region", color="red"))) + raise + if no_name: + name = "slave{:d}".format(len(self.slaves)) + if no_region: + region = self.regions.get(name, None) + if region is None: + self.logger.error("Unable to find Region {}".format(colorer(name, color="red"))) + raise + else: + self.add_region(name, region) + if name in self.slaves.keys(): + self.logger.error("{} already declared as Bus Slave:".format(colorer(name, color="red"))) + self.logger.error(self) + raise + self.slaves[name] = slave + self.logger.info("{} {} as Bus Slave.".format( + colorer(name, color="underline"), + colorer("added", color="green"))) + + # Str ------------------------------------------------------------------------------------------ + def __str__(self): + r = "{}-bit {} Bus, {}GiB Address Space.\n".format( + colorer(self.data_width), colorer(self.standard), colorer(2**self.address_width/2**30)) + r += "Bus Regions: ({})\n".format(len(self.regions.keys())) if len(self.regions.keys()) else "" + for name, region in self.regions.items(): + r += colorer(name, color="underline") + " "*(20-len(name)) + ": " + str(region) + "\n" + r += "Bus Masters: ({})\n".format(len(self.masters.keys())) if len(self.masters.keys()) else "" + for name in self.masters.keys(): + r += "- {}\n".format(colorer(name, color="underline")) + r += "Bus Slaves: ({})\n".format(len(self.slaves.keys())) if len(self.slaves.keys()) else "" + for name in self.slaves.keys(): + r += "- {}\n".format(colorer(name, color="underline")) + r = r[:-1] + return r + +# SoCCSR ---------------------------------------------------------------------------------------- + +class SoCCSR: + supported_data_width = [8, 32] + supported_address_width = [14, 15] + supported_alignment = [32, 64] + supported_paging = [0x800] + + # Creation ------------------------------------------------------------------------------------- + def __init__(self, data_width=32, address_width=14, alignment=32, paging=0x800, reserved_csrs={}): + self.logger = logging.getLogger("SoCCSR") + self.logger.info(colorer("Creating new CSR Handler...", color="cyan")) + + # Check Data Width + if data_width not in self.supported_data_width: + self.logger.error("Unsupported data_width: {} supporteds: {:s}".format( + colorer(data_width, color="red"), + colorer(", ".join(str(x) for x in self.supported_data_width)), color="green")) + raise + + # Check Address Width + if address_width not in self.supported_address_width: + self.logger.error("Unsupported address_width: {} supporteds: {:s}".format( + colorer(address_width, color="red"), + colorer(", ".join(str(x) for x in self.supported_address_width), color="green"))) + raise + + # Check Alignment + if alignment not in self.supported_alignment: + self.logger.error("Unsupported alignment: {} supporteds: {:s}".format( + colorer(alignment, color="red"), + colorer(", ".join(str(x) for x in self.supported_alignment), color="green"))) + raise + + # Check Paging + if paging not in self.supported_paging: + self.logger.error("Unsupported paging: {} supporteds: {:s}".format( + colorer(paging, color="red"), + colorer(", ".join(str(x) for x in self.supported_paging), color="green"))) + raise + + # Create CSR Handler + self.data_width = data_width + self.address_width = address_width + self.alignment = alignment + self.paging = paging + self.csrs = {} + self.n_csrs = 4*2**address_width//paging # FIXME + self.logger.info("{}-bit CSR Bus, {}KiB Address Space, {}B Paging (Up to {} Locations).\n".format( + colorer(self.data_width), + colorer(2**self.address_width/2**10), + colorer(self.paging), + self.n_csrs)) + + # Adding reserved CSRs + self.logger.info("Adding {} CSRs...".format(colorer("reserved"))) + for name, n in reserved_csrs.items(): + self.add(name, n) + + self.logger.info(colorer("CSR Bus Handler created.", color="cyan")) + + # Add ------------------------------------------------------------------------------------------ + def add(self, name, n=None): + allocated = False + if name in self.csrs.keys(): + self.logger.error("{} CSR name already used.".format(colorer(name, "red"))) + self.logger.error(self) + raise + if n in self.csrs.values(): + self.logger.error("{} CSR Location already used.".format(colorer(n, "red"))) + self.logger.error(self) + raise + if n is None: + allocated = True + n = self.alloc(name) + else: + if n < 0: + self.logger.error("{} CSR Location should be positive.".format( + colorer(n, color="red"))) + raise + if n > self.n_csrs: + self.logger.error("{} CSR Location too high (Up to {}).".format( + colorer(n, color="red"), + colorer(self.n_csrs, color="green"))) + raise + self.csrs[name] = n + self.logger.info("{} CSR {} at Location {}.".format( + colorer(name, color="underline"), + colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"), + colorer(n))) + + # Alloc ---------------------------------------------------------------------------------------- + def alloc(self, name): + for n in range(self.data_width//8*2**self.address_width//self.paging): + if n not in self.csrs.values(): + return n + self.logger.error("Not enough CSR Locations.") + self.logger.error(self) + raise + + # Str ------------------------------------------------------------------------------------------ + def __str__(self): + r = "{}-bit CSR Bus, {}KiB Address Space, {}B Paging (Up to {} Locations).\n".format( + colorer(self.data_width), + colorer(2**self.address_width/2**10), + colorer(self.paging), + self.n_csrs) + r += "CSR Locations: ({})\n".format(len(self.csrs.keys())) if len(self.csrs.keys()) else "" + for name in self.csrs.keys(): + r += "- {}{}: {}\n".format(colorer(name, color="underline"), " "*(20-len(name)), colorer(self.csrs[name])) + r = r[:-1] + return r + +# SoCIRQ ------------------------------------------------------------------------------------------- + +class SoCIRQ: + # Creation ------------------------------------------------------------------------------------- + def __init__(self, n_irqs=32, reserved_irqs={}): + self.logger = logging.getLogger("SoCIRQ") + self.logger.info(colorer("Creating new SoC IRQ Handler...", color="cyan")) + + # Check IRQ Number + if n_irqs > 32: + self.logger.error("Unsupported IRQs number: {} supporteds: {:s}".format( + colorer(n, color="red"), colorer("Up to 32", color="green"))) + raise + + # Create IRQ Handler + self.n_irqs = n_irqs + self.irqs = {} + self.logger.info("IRQ Handler (up to {} Locations).".format(colorer(n_irqs))) + + # Adding reserved IRQs + self.logger.info("Adding {} IRQs...".format(colorer("reserved"))) + for name, n in reserved_irqs.items(): + self.add(name, n) + + self.logger.info(colorer("IRQ Handler created.", color="cyan")) + + # Add ------------------------------------------------------------------------------------------ + def add(self, name, n=None): + allocated = False + if name in self.irqs.keys(): + self.logger.error("{} IRQ name already used.".format(colorer(name, "red"))) + self.logger.error(self) + raise + if n in self.irqs.values(): + self.logger.error("{} IRQ Location already used.".format(colorer(n, "red"))) + self.logger.error(self) + raise + if n is None: + allocated = True + n = self.alloc(name) + else: + if n < 0: + self.logger.error("{} IRQ Location should be positive.".format( + colorer(n, color="red"))) + raise + if n > self.n_irqs: + self.logger.error("{} IRQ Location too high (Up to {}).".format( + colorer(n, color="red"), + colorer(self.n_csrs, color="green"))) + raise + self.irqs[name] = n + self.logger.info("{} IRQ {} at Location {}.".format( + colorer(name, color="underline"), + colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"), + colorer(n))) + + # Alloc ---------------------------------------------------------------------------------------- + def alloc(self, name): + for n in range(self.n_irqs): + if n not in self.irqs.values(): + return n + self.logger.error("Not enough Locations.") + self.logger.error(self) + raise + + # Str ------------------------------------------------------------------------------------------ + def __str__(self): + r ="IRQ Handler (up to {} Locations).\n".format(colorer(self.n_irqs)) + r += "IRQs Locations:\n" if len(self.irqs.keys()) else "" + for name in self.irqs.keys(): + r += "- {}{}: {}\n".format(colorer(name, color="underline"), " "*(20-len(name)), colorer(self.irqs[name])) + r = r[:-1] + return r + +# SoC ---------------------------------------------------------------------------------------------- + +class SoC(Module): + def __init__(self, + bus_standard = "wishbone", + bus_data_width = 32, + bus_address_width = 32, + bus_timeout = 1e6, + bus_reserved_regions = {}, + + csr_data_width = 32, + csr_address_width = 14, + csr_alignment = 32, + csr_paging = 0x800, + csr_reserved_csrs = {}, + + irq_n_irqs = 32, + irq_reserved_irqs = {}, + ): + + self.logger = logging.getLogger("SoC") + self.logger.info(colorer(" __ _ __ _ __ ", color="bright")) + self.logger.info(colorer(" / / (_) /____ | |/_/ ", color="bright")) + self.logger.info(colorer(" / /__/ / __/ -_)> < ", color="bright")) + self.logger.info(colorer(" /____/_/\\__/\\__/_/|_| ", color="bright")) + self.logger.info(colorer(" Build your hardware, easily!", color="bright")) + + self.logger.info(colorer("-"*80, color="bright")) + self.logger.info(colorer("Creating new SoC... ({})".format(buildtime()), color="cyan")) + self.logger.info(colorer("-"*80, color="bright")) + + # SoC Bus Handler -------------------------------------------------------------------------- + self.bus = SoCBus( + standard = bus_standard, + data_width = bus_data_width, + address_width = bus_address_width, + timeout = bus_timeout, + reserved_regions = bus_reserved_regions, + ) + + # SoC Bus Handler -------------------------------------------------------------------------- + self.csr = SoCCSR( + data_width = csr_data_width, + address_width = csr_address_width, + alignment = csr_alignment, + paging = csr_paging, + reserved_csrs = csr_reserved_csrs, + ) + + # SoC IRQ Handler -------------------------------------------------------------------------- + self.irq = SoCIRQ( + n_irqs = irq_n_irqs, + reserved_irqs = irq_reserved_irqs + ) + + self.logger.info(colorer("-"*80, color="bright")) + self.logger.info(colorer("Initial SoC:", color="cyan")) + self.logger.info(colorer("-"*80, color="bright")) + self.logger.info(self.bus) + self.logger.info(self.csr) + self.logger.info(self.irq) + self.logger.info(colorer("-"*80, color="bright")) + + + def do_finalize(self): + self.logger.info(colorer("-"*80, color="bright")) + self.logger.info(colorer("Finalized SoC:", color="cyan")) + self.logger.info(colorer("-"*80, color="bright")) + self.logger.info(self.bus) + self.logger.info(self.csr) + self.logger.info(self.irq) + self.logger.info(colorer("-"*80, color="bright")) + + # SoC Bus Interconnect --------------------------------------------------------------------- + bus_masters = self.bus.masters.values() + bus_slaves = [(self.bus.regions[n].decoder(), s) for n, s in self.bus.slaves.items()] + if len(bus_masters) and len(bus_slaves): + self.submodules.bus_interconnect = wishbone.InterconnectShared( + masters = bus_masters, + slaves = bus_slaves, + register = True, + timeout_cycles = self.bus.timeout) + + #exit() + +# Test (FIXME: move to litex/text and improve) ----------------------------------------------------- + +if __name__ == "__main__": + bus = SoCBus("wishbone", reserved_regions={ + "rom": SoCRegion(origin=0x00000100, size=1024), + "ram": SoCRegion(size=512), + } + ) + bus.add_master("cpu", None) + bus.add_slave("rom", None, SoCRegion(size=1024)) + bus.add_slave("ram", None, SoCRegion(size=1024)) + + + csr = SoCCSR(reserved_csrs={"ctrl": 0, "uart": 1}) + csr.add("csr0") + csr.add("csr1", 0) + #csr.add("csr2", 46) + csr.add("csr3", -1) + print(bus) + print(csr) + + irq = SoCIRQ(reserved_irqs={"uart": 1}) + + soc = SoC()