soc/SoCBusHandler: Integrate interconnect code since avoid duplication and simplify reuse.

Also extends supported data_widths.

A simple custom interconnect can now be created with code like this:

# Create 2 AXI Masters / 2 AXI Slaves.
axi_m_0 = axi.AXIInterface(data_width=32,  address_width=32)
axi_m_1 = axi.AXIInterface(data_width=64,  address_width=32)
axi_s_0 = axi.AXIInterface(data_width=512, address_width=32)
axi_s_1 = axi.AXIInterface(data_width=512, address_width=32)

axi_s_0_region = SoCRegion(origin=0x00000000, size=0x10000000)
axi_s_1_region = SoCRegion(origin=0x10000000, size=0x10000000)

# Create Bus Handler .
self.custom_bus = SoCBusHandler(
    name                  = "SoCCustomBusHandler",
    standard              = "axi",
    data_width            = 512,
    address_width         = 32,
    bursting              = True,
    interconnect          = "crossbar",
    interconnect_register = True,
)

# Add AXI Buses.
self.custom_bus.add_master(master=axi_m_0)
self.custom_bus.add_master(master=axi_m_1)
self.custom_bus.add_slave(slave=axi_s_0, region=axi_s_0_region)
self.custom_bus.add_slave(slave=axi_s_0, region=axi_s_1_region)

# Finalize.
self.custom_bus.finalize()
print(self.custom_bus)
This commit is contained in:
Florent Kermarrec 2022-10-28 12:43:00 +02:00
parent 3603e90ed8
commit 13448b8260

View file

@ -115,7 +115,7 @@ class SoCCSRRegion:
class SoCBusHandler(LiteXModule):
supported_standard = ["wishbone", "axi-lite", "axi"]
supported_data_width = [32, 64]
supported_data_width = [32, 64, 128, 256, 512]
supported_address_width = [32]
# Creation -------------------------------------------------------------------------------------
@ -125,7 +125,7 @@ class SoCBusHandler(LiteXModule):
address_width = 32,
timeout = 1e6,
bursting = False,
interconnect = "shared",
interconnect = "shared", interconnect_register=True,
reserved_regions = {}
):
self.logger = logging.getLogger(name)
@ -156,17 +156,18 @@ class SoCBusHandler(LiteXModule):
raise SoCError()
# Create Bus
self.standard = standard
self.data_width = data_width
self.address_width = address_width
self.bursting = bursting
self.interconnect = interconnect
self.masters = {}
self.slaves = {}
self.regions = {}
self.io_regions = {}
self.io_regions_check = True
self.timeout = timeout
self.standard = standard
self.data_width = data_width
self.address_width = address_width
self.bursting = bursting
self.interconnect = interconnect
self.interconnect_register = interconnect_register
self.masters = {}
self.slaves = {}
self.regions = {}
self.io_regions = {}
self.io_regions_check = True
self.timeout = timeout
self.logger.info("{}-bit {} Bus, {}GiB Address Space.".format(
colorer(data_width), colorer(standard), colorer(2**address_width/2**30)))
@ -462,6 +463,48 @@ class SoCBusHandler(LiteXModule):
# Else just return address_width:
return self.address_width
def do_finalize(self):
interconnect_p2p_cls = {
"wishbone": wishbone.InterconnectPointToPoint,
"axi-lite": axi.AXILiteInterconnectPointToPoint,
"axi" : axi.AXIInterconnectPointToPoint,
}[self.standard]
interconnect_shared_cls = {
"wishbone": wishbone.InterconnectShared,
"axi-lite": axi.AXILiteInterconnectShared,
"axi" : axi.AXIInterconnectShared,
}[self.standard]
interconnect_crossbar_cls = {
"wishbone": wishbone.Crossbar,
"axi-lite": axi.AXILiteCrossbar,
"axi" : axi.AXICrossbar,
}[self.standard]
if len(self.masters) and len(self.slaves):
# If 1 bus_master, 1 bus_slave and no address translation, use InterconnectPointToPoint.
if ((len(self.masters) == 1) and
(len(self.slaves) == 1) and
(next(iter(self.regions.values())).origin == 0)):
self._interconnect = interconnect_p2p_cls(
master = next(iter(self.masters.values())),
slave = next(iter(self.slaves.values())))
# Otherwise, use InterconnectShared/Crossbar.
else:
interconnect_cls = {
"shared" : interconnect_shared_cls,
"crossbar": interconnect_crossbar_cls,
}[self.interconnect]
self._interconnect = interconnect_cls(
masters = list(self.masters.values()),
slaves = [(self.regions[n].decoder(self), s) for n, s in self.slaves.items()],
register = self.interconnect_register,
timeout_cycles = self.timeout
)
self.logger.info("Interconnect: {} ({} <-> {}).".format(
colorer(self._interconnect.__class__.__name__),
colorer(len(self.masters)),
colorer(len(self.slaves))))
# Str ------------------------------------------------------------------------------------------
def __str__(self):
r = "{}-bit {} Bus, {}GiB Address Space.\n".format(
@ -1121,48 +1164,10 @@ class SoC(LiteXModule):
)
# SoC Bus Interconnect ---------------------------------------------------------------------
interconnect_p2p_cls = {
"wishbone": wishbone.InterconnectPointToPoint,
"axi-lite": axi.AXILiteInterconnectPointToPoint,
"axi" : axi.AXIInterconnectPointToPoint,
}[self.bus.standard]
interconnect_shared_cls = {
"wishbone": wishbone.InterconnectShared,
"axi-lite": axi.AXILiteInterconnectShared,
"axi" : axi.AXIInterconnectShared,
}[self.bus.standard]
interconnect_crossbar_cls = {
"wishbone": wishbone.Crossbar,
"axi-lite": axi.AXILiteCrossbar,
"axi" : axi.AXICrossbar,
}[self.bus.standard]
if len(self.bus.masters) and len(self.bus.slaves):
# If 1 bus_master, 1 bus_slave and no address translation, use InterconnectPointToPoint.
if ((len(self.bus.masters) == 1) and
(len(self.bus.slaves) == 1) and
(next(iter(self.bus.regions.values())).origin == 0)):
self.bus_interconnect = interconnect_p2p_cls(
master = next(iter(self.bus.masters.values())),
slave = next(iter(self.bus.slaves.values())))
# Otherwise, use InterconnectShared/Crossbar.
else:
interconnect_cls = {
"shared" : interconnect_shared_cls,
"crossbar": interconnect_crossbar_cls,
}[self.bus.interconnect]
self.bus_interconnect = interconnect_cls(
masters = list(self.bus.masters.values()),
slaves = [(self.bus.regions[n].decoder(self.bus), s) for n, s in self.bus.slaves.items()],
register = True,
timeout_cycles = self.bus.timeout)
if hasattr(self, "ctrl") and self.bus.timeout is not None:
if hasattr(self.ctrl, "bus_error") and hasattr(self.bus_interconnect, "timeout"):
self.comb += self.ctrl.bus_error.eq(self.bus_interconnect.timeout.error)
self.bus.logger.info("Interconnect: {} ({} <-> {}).".format(
colorer(self.bus_interconnect.__class__.__name__),
colorer(len(self.bus.masters)),
colorer(len(self.bus.slaves))))
self.bus.finalize()
if hasattr(self, "ctrl") and self.bus.timeout is not None:
if hasattr(self.ctrl, "bus_error") and hasattr(self.bus._interconnect, "timeout"):
self.comb += self.ctrl.bus_error.eq(self.bus._interconnect.timeout.error)
self.add_config("BUS_STANDARD", self.bus.standard.upper())
self.add_config("BUS_DATA_WIDTH", self.bus.data_width)
self.add_config("BUS_ADDRESS_WIDTH", self.bus.address_width)
@ -1170,24 +1175,7 @@ class SoC(LiteXModule):
# SoC DMA Bus Interconnect (Cache Coherence) -----------------------------------------------
if hasattr(self, "dma_bus"):
if len(self.dma_bus.masters) and len(self.dma_bus.slaves):
# If 1 bus_master, 1 bus_slave and no address translation, use InterconnectPointToPoint.
if ((len(self.dma_bus.masters) == 1) and
(len(self.dma_bus.slaves) == 1) and
(next(iter(self.dma_bus.regions.values())).origin == 0)):
self.dma_bus_interconnect = wishbone.InterconnectPointToPoint(
master = next(iter(self.dma_bus.masters.values())),
slave = next(iter(self.dma_bus.slaves.values())))
# Otherwise, use InterconnectShared.
else:
self.dma_bus_interconnect = wishbone.InterconnectShared(
masters = list(self.dma_bus.masters.values()),
slaves = [(self.dma_bus.regions[n].decoder(self.dma_bus), s) for n, s in self.dma_bus.slaves.items()],
register = True)
self.bus.logger.info("DMA Interconnect: {} ({} <-> {}).".format(
colorer(self.dma_bus_interconnect.__class__.__name__),
colorer(len(self.dma_bus.masters)),
colorer(len(self.dma_bus.slaves))))
self.dma_bus.finalize()
self.add_config("CPU_HAS_DMA_BUS")
# SoC Main CSRs collection -----------------------------------------------------------------