soc/integration: add new soc class prorotype with SoCRegion/SoCBus/SoCCSR/SoCIRQ/SoC

This commit is contained in:
Florent Kermarrec 2020-02-06 11:06:41 +01:00
parent 9b11e9192d
commit 6baa07a69b
1 changed files with 559 additions and 0 deletions

559
litex/soc/integration/soc.py Executable file
View File

@ -0,0 +1,559 @@
#!/usr/bin/env python3
# This file is Copyright (c) 2020 Florent Kermarrec <florent@enjoy-digital.fr>
# License: BSD
import logging
import time
import datetime
from migen import *
from litex.soc.interconnect import wishbone
# TODO:
# - replace raise with exit on logging error.
# - use common module for SoCCSR/SoCIRQ.
# - add configurable CSR paging.
# - manage IO/Linker regions.
logging.basicConfig(level=logging.INFO)
# Helpers ------------------------------------------------------------------------------------------
def colorer(s, color="bright"):
header = {
"bright": "\x1b[1m",
"green": "\x1b[32m",
"cyan": "\x1b[36m",
"red": "\x1b[31m",
"yellow": "\x1b[33m",
"underline": "\x1b[4m"}[color]
trailer = "\x1b[0m"
return header + str(s) + trailer
def buildtime(with_time=True):
fmt = "%Y-%m-%d %H:%M:%S" if with_time else "%Y-%m-%d"
return datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d %H:%M:%S")
# SoCRegion ----------------------------------------------------------------------------------------
class SoCRegion:
def __init__(self, origin=None, size=None, cached=True):
self.logger = logging.getLogger("SoCRegion")
self.origin = origin
self.size = size
self.cached = cached
def decoder(self):
origin = self.origin
size = self.size
origin &= ~0x80000000
size = 2**log2_int(size, False)
if (origin & (size - 1)) != 0:
self.logger.error("Origin needs to be aligned on size:")
self.logger.error(self)
raise
origin >>= 2 # bytes to words aligned
size >>= 2 # bytes to words aligned
return lambda a: (a[log2_int(size):-1] == (origin >> log2_int(size)))
def __str__(self):
r = ""
if self.origin is not None:
r += "Origin: {}, ".format(colorer("0x{:08x}".format(self.origin)))
if self.size is not None:
r += "Size: {}, ".format(colorer("0x{:08x}".format(self.size)))
r += "Cached: {}".format(colorer(self.cached))
return r
class SoCLinkerRegion(SoCRegion):
pass
# SoCBus -------------------------------------------------------------------------------------------
class SoCBus:
supported_standard = ["wishbone"]
supported_data_width = [32, 64]
supported_address_width = [32]
# Creation -------------------------------------------------------------------------------------
def __init__(self, standard, data_width=32, address_width=32, timeout=1e6, reserved_regions={}):
self.logger = logging.getLogger("SoCBus")
self.logger.info(colorer("Creating new Bus Handler...", color="cyan"))
# Check Standard
if standard not in self.supported_standard:
self.logger.error("Unsupported Standard: {} supporteds: {:s}".format(
colorer(standard, color="red"),
colorer(", ".join(self.supported_standard), color="green")))
raise
# Check Data Width
if data_width not in self.supported_data_width:
self.logger.error("Unsupported Data_Width: {} supporteds: {:s}".format(
colorer(data_width, color="red"),
colorer(", ".join(str(x) for x in self.supported_data_width), color="green")))
raise
# Check Address Width
if address_width not in self.supported_address_width:
self.logger.error("Unsupported Address Width: {} supporteds: {:s}".format(
colorer(data_width, color="red"),
colorer(", ".join(str(x) for x in self.supported_address_width), color="green")))
raise
# Create Bus
self.standard = standard
self.data_width = data_width
self.address_width = address_width
self.masters = {}
self.slaves = {}
self.regions = {}
self.timeout = timeout
self.logger.info("{}-bit {} Bus, {}GiB Address Space.".format(
colorer(data_width), colorer(standard), colorer(2**address_width/2**30)))
# Adding reserved regions
self.logger.info("Adding {} Regions...".format(colorer("reserved")))
for name, region in reserved_regions.items():
if isinstance(region, int):
region = SoCRegion(origin=region, size=0x1000000)
self.add_region(name, region)
self.logger.info(colorer("Bus Handler created.", color="cyan"))
# Add/Allog/Check Regions ----------------------------------------------------------------------
def add_region(self, name, region):
allocated = False
# Check if SoCLinkerRegion
if isinstance(region, SoCLinkerRegion):
self.logger.info("FIXME: SoCLinkerRegion")
# Check if SoCRegion
elif isinstance(region, SoCRegion):
# If no origin specified, allocate region.
if region.origin is None:
allocated = True
region = self.alloc_region(region.size, region.cached)
self.regions[name] = region
# Else add region and check for overlaps.
else:
self.regions[name] = region
overlap = self.check_region(self.regions)
if overlap is not None:
self.logger.error("Region overlap between {} and {}:".format(
colorer(overlap[0], color="red"),
colorer(overlap[1], color="red")))
self.logger.error(str(self.regions[overlap[0]]))
self.logger.error(str(self.regions[overlap[1]]))
raise
self.logger.info("{} Region {} {}.".format(
colorer(name, color="underline"),
colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"),
str(region)))
else:
self.logger.error("{} is not a supported Region".format(colorer(name, color="red")))
raise
def alloc_region(self, size, cached=True):
self.logger.info("Allocating {} Region of size {}...".format(
colorer("Cached" if cached else "IO"),
colorer("0x{:08x}".format(size))))
# Limit Search Regions
uncached_regions = {}
for _, region in self.regions.items():
if region.cached == False:
uncached_regions[name] = region
if cached == False:
search_regions = uncached_regions
else:
search_regions = {"main": SoCRegion(origin=0x00000000, size=2**self.address_width-1)}
# Iterate on Search_Regions to find a Candidate
for _, search_region in search_regions.items():
origin = search_region.origin
while (origin + size) < (search_region.origin + search_region.size):
# Create a Candicate.
candidate = SoCRegion(origin=origin, size=size, cached=cached)
overlap = False
# Check Candidate does not overlap with allocated existing regions
for _, allocated in self.regions.items():
if self.check_region({"0": allocated, "1": candidate}) is not None:
origin = allocated.origin + allocated.size
overlap = True
break
if not overlap:
# If no overlap, the Candidate is selected
return candidate
self.logger.error("Not enough Address Space to allocate Region")
raise
def check_region(self, regions):
i = 0
while i < len(regions):
n0 = list(regions.keys())[i]
r0 = regions[n0]
for n1 in list(regions.keys())[i+1:]:
r1 = regions[n1]
if isinstance(r0, SoCLinkerRegion) or isinstance(r1, SoCLinkerRegion):
continue
if r0.origin >= (r1.origin + r1.size):
continue
if r1.origin >= (r0.origin + r0.size):
continue
return (n0, n1)
i += 1
return None
# Add Master/Slave -----------------------------------------------------------------------------
def add_master(self, name=None, master=None, io_regions={}):
if name is None:
name = "master{:d}".format(len(self.masters))
if name in self.masters.keys():
self.logger.error("{} already declared as Bus Master:".format(colorer(name, color="red")))
self.logger.error(self)
raise
self.masters[name] = master
self.logger.info("{} {} as Bus Master.".format(colorer(name, color="underline"), colorer("added", color="green")))
# FIXME: handle IO regions
def add_slave(self, name=None, slave=None, region=None):
no_name = name is None
no_region = region is None
if no_name and no_region:
self.logger.error("Please specify at least {} or {} of Bus Slave".format(
colorer("name", color="red"),
colorer("region", color="red")))
raise
if no_name:
name = "slave{:d}".format(len(self.slaves))
if no_region:
region = self.regions.get(name, None)
if region is None:
self.logger.error("Unable to find Region {}".format(colorer(name, color="red")))
raise
else:
self.add_region(name, region)
if name in self.slaves.keys():
self.logger.error("{} already declared as Bus Slave:".format(colorer(name, color="red")))
self.logger.error(self)
raise
self.slaves[name] = slave
self.logger.info("{} {} as Bus Slave.".format(
colorer(name, color="underline"),
colorer("added", color="green")))
# Str ------------------------------------------------------------------------------------------
def __str__(self):
r = "{}-bit {} Bus, {}GiB Address Space.\n".format(
colorer(self.data_width), colorer(self.standard), colorer(2**self.address_width/2**30))
r += "Bus Regions: ({})\n".format(len(self.regions.keys())) if len(self.regions.keys()) else ""
for name, region in self.regions.items():
r += colorer(name, color="underline") + " "*(20-len(name)) + ": " + str(region) + "\n"
r += "Bus Masters: ({})\n".format(len(self.masters.keys())) if len(self.masters.keys()) else ""
for name in self.masters.keys():
r += "- {}\n".format(colorer(name, color="underline"))
r += "Bus Slaves: ({})\n".format(len(self.slaves.keys())) if len(self.slaves.keys()) else ""
for name in self.slaves.keys():
r += "- {}\n".format(colorer(name, color="underline"))
r = r[:-1]
return r
# SoCCSR ----------------------------------------------------------------------------------------
class SoCCSR:
supported_data_width = [8, 32]
supported_address_width = [14, 15]
supported_alignment = [32, 64]
supported_paging = [0x800]
# Creation -------------------------------------------------------------------------------------
def __init__(self, data_width=32, address_width=14, alignment=32, paging=0x800, reserved_csrs={}):
self.logger = logging.getLogger("SoCCSR")
self.logger.info(colorer("Creating new CSR Handler...", color="cyan"))
# Check Data Width
if data_width not in self.supported_data_width:
self.logger.error("Unsupported data_width: {} supporteds: {:s}".format(
colorer(data_width, color="red"),
colorer(", ".join(str(x) for x in self.supported_data_width)), color="green"))
raise
# Check Address Width
if address_width not in self.supported_address_width:
self.logger.error("Unsupported address_width: {} supporteds: {:s}".format(
colorer(address_width, color="red"),
colorer(", ".join(str(x) for x in self.supported_address_width), color="green")))
raise
# Check Alignment
if alignment not in self.supported_alignment:
self.logger.error("Unsupported alignment: {} supporteds: {:s}".format(
colorer(alignment, color="red"),
colorer(", ".join(str(x) for x in self.supported_alignment), color="green")))
raise
# Check Paging
if paging not in self.supported_paging:
self.logger.error("Unsupported paging: {} supporteds: {:s}".format(
colorer(paging, color="red"),
colorer(", ".join(str(x) for x in self.supported_paging), color="green")))
raise
# Create CSR Handler
self.data_width = data_width
self.address_width = address_width
self.alignment = alignment
self.paging = paging
self.csrs = {}
self.n_csrs = 4*2**address_width//paging # FIXME
self.logger.info("{}-bit CSR Bus, {}KiB Address Space, {}B Paging (Up to {} Locations).\n".format(
colorer(self.data_width),
colorer(2**self.address_width/2**10),
colorer(self.paging),
self.n_csrs))
# Adding reserved CSRs
self.logger.info("Adding {} CSRs...".format(colorer("reserved")))
for name, n in reserved_csrs.items():
self.add(name, n)
self.logger.info(colorer("CSR Bus Handler created.", color="cyan"))
# Add ------------------------------------------------------------------------------------------
def add(self, name, n=None):
allocated = False
if name in self.csrs.keys():
self.logger.error("{} CSR name already used.".format(colorer(name, "red")))
self.logger.error(self)
raise
if n in self.csrs.values():
self.logger.error("{} CSR Location already used.".format(colorer(n, "red")))
self.logger.error(self)
raise
if n is None:
allocated = True
n = self.alloc(name)
else:
if n < 0:
self.logger.error("{} CSR Location should be positive.".format(
colorer(n, color="red")))
raise
if n > self.n_csrs:
self.logger.error("{} CSR Location too high (Up to {}).".format(
colorer(n, color="red"),
colorer(self.n_csrs, color="green")))
raise
self.csrs[name] = n
self.logger.info("{} CSR {} at Location {}.".format(
colorer(name, color="underline"),
colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"),
colorer(n)))
# Alloc ----------------------------------------------------------------------------------------
def alloc(self, name):
for n in range(self.data_width//8*2**self.address_width//self.paging):
if n not in self.csrs.values():
return n
self.logger.error("Not enough CSR Locations.")
self.logger.error(self)
raise
# Str ------------------------------------------------------------------------------------------
def __str__(self):
r = "{}-bit CSR Bus, {}KiB Address Space, {}B Paging (Up to {} Locations).\n".format(
colorer(self.data_width),
colorer(2**self.address_width/2**10),
colorer(self.paging),
self.n_csrs)
r += "CSR Locations: ({})\n".format(len(self.csrs.keys())) if len(self.csrs.keys()) else ""
for name in self.csrs.keys():
r += "- {}{}: {}\n".format(colorer(name, color="underline"), " "*(20-len(name)), colorer(self.csrs[name]))
r = r[:-1]
return r
# SoCIRQ -------------------------------------------------------------------------------------------
class SoCIRQ:
# Creation -------------------------------------------------------------------------------------
def __init__(self, n_irqs=32, reserved_irqs={}):
self.logger = logging.getLogger("SoCIRQ")
self.logger.info(colorer("Creating new SoC IRQ Handler...", color="cyan"))
# Check IRQ Number
if n_irqs > 32:
self.logger.error("Unsupported IRQs number: {} supporteds: {:s}".format(
colorer(n, color="red"), colorer("Up to 32", color="green")))
raise
# Create IRQ Handler
self.n_irqs = n_irqs
self.irqs = {}
self.logger.info("IRQ Handler (up to {} Locations).".format(colorer(n_irqs)))
# Adding reserved IRQs
self.logger.info("Adding {} IRQs...".format(colorer("reserved")))
for name, n in reserved_irqs.items():
self.add(name, n)
self.logger.info(colorer("IRQ Handler created.", color="cyan"))
# Add ------------------------------------------------------------------------------------------
def add(self, name, n=None):
allocated = False
if name in self.irqs.keys():
self.logger.error("{} IRQ name already used.".format(colorer(name, "red")))
self.logger.error(self)
raise
if n in self.irqs.values():
self.logger.error("{} IRQ Location already used.".format(colorer(n, "red")))
self.logger.error(self)
raise
if n is None:
allocated = True
n = self.alloc(name)
else:
if n < 0:
self.logger.error("{} IRQ Location should be positive.".format(
colorer(n, color="red")))
raise
if n > self.n_irqs:
self.logger.error("{} IRQ Location too high (Up to {}).".format(
colorer(n, color="red"),
colorer(self.n_csrs, color="green")))
raise
self.irqs[name] = n
self.logger.info("{} IRQ {} at Location {}.".format(
colorer(name, color="underline"),
colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"),
colorer(n)))
# Alloc ----------------------------------------------------------------------------------------
def alloc(self, name):
for n in range(self.n_irqs):
if n not in self.irqs.values():
return n
self.logger.error("Not enough Locations.")
self.logger.error(self)
raise
# Str ------------------------------------------------------------------------------------------
def __str__(self):
r ="IRQ Handler (up to {} Locations).\n".format(colorer(self.n_irqs))
r += "IRQs Locations:\n" if len(self.irqs.keys()) else ""
for name in self.irqs.keys():
r += "- {}{}: {}\n".format(colorer(name, color="underline"), " "*(20-len(name)), colorer(self.irqs[name]))
r = r[:-1]
return r
# SoC ----------------------------------------------------------------------------------------------
class SoC(Module):
def __init__(self,
bus_standard = "wishbone",
bus_data_width = 32,
bus_address_width = 32,
bus_timeout = 1e6,
bus_reserved_regions = {},
csr_data_width = 32,
csr_address_width = 14,
csr_alignment = 32,
csr_paging = 0x800,
csr_reserved_csrs = {},
irq_n_irqs = 32,
irq_reserved_irqs = {},
):
self.logger = logging.getLogger("SoC")
self.logger.info(colorer(" __ _ __ _ __ ", color="bright"))
self.logger.info(colorer(" / / (_) /____ | |/_/ ", color="bright"))
self.logger.info(colorer(" / /__/ / __/ -_)> < ", color="bright"))
self.logger.info(colorer(" /____/_/\\__/\\__/_/|_| ", color="bright"))
self.logger.info(colorer(" Build your hardware, easily!", color="bright"))
self.logger.info(colorer("-"*80, color="bright"))
self.logger.info(colorer("Creating new SoC... ({})".format(buildtime()), color="cyan"))
self.logger.info(colorer("-"*80, color="bright"))
# SoC Bus Handler --------------------------------------------------------------------------
self.bus = SoCBus(
standard = bus_standard,
data_width = bus_data_width,
address_width = bus_address_width,
timeout = bus_timeout,
reserved_regions = bus_reserved_regions,
)
# SoC Bus Handler --------------------------------------------------------------------------
self.csr = SoCCSR(
data_width = csr_data_width,
address_width = csr_address_width,
alignment = csr_alignment,
paging = csr_paging,
reserved_csrs = csr_reserved_csrs,
)
# SoC IRQ Handler --------------------------------------------------------------------------
self.irq = SoCIRQ(
n_irqs = irq_n_irqs,
reserved_irqs = irq_reserved_irqs
)
self.logger.info(colorer("-"*80, color="bright"))
self.logger.info(colorer("Initial SoC:", color="cyan"))
self.logger.info(colorer("-"*80, color="bright"))
self.logger.info(self.bus)
self.logger.info(self.csr)
self.logger.info(self.irq)
self.logger.info(colorer("-"*80, color="bright"))
def do_finalize(self):
self.logger.info(colorer("-"*80, color="bright"))
self.logger.info(colorer("Finalized SoC:", color="cyan"))
self.logger.info(colorer("-"*80, color="bright"))
self.logger.info(self.bus)
self.logger.info(self.csr)
self.logger.info(self.irq)
self.logger.info(colorer("-"*80, color="bright"))
# SoC Bus Interconnect ---------------------------------------------------------------------
bus_masters = self.bus.masters.values()
bus_slaves = [(self.bus.regions[n].decoder(), s) for n, s in self.bus.slaves.items()]
if len(bus_masters) and len(bus_slaves):
self.submodules.bus_interconnect = wishbone.InterconnectShared(
masters = bus_masters,
slaves = bus_slaves,
register = True,
timeout_cycles = self.bus.timeout)
#exit()
# Test (FIXME: move to litex/text and improve) -----------------------------------------------------
if __name__ == "__main__":
bus = SoCBus("wishbone", reserved_regions={
"rom": SoCRegion(origin=0x00000100, size=1024),
"ram": SoCRegion(size=512),
}
)
bus.add_master("cpu", None)
bus.add_slave("rom", None, SoCRegion(size=1024))
bus.add_slave("ram", None, SoCRegion(size=1024))
csr = SoCCSR(reserved_csrs={"ctrl": 0, "uart": 1})
csr.add("csr0")
csr.add("csr1", 0)
#csr.add("csr2", 46)
csr.add("csr3", -1)
print(bus)
print(csr)
irq = SoCIRQ(reserved_irqs={"uart": 1})
soc = SoC()