Merge pull request #600 from antmicro/jboc/axi-lite

Implement AXI Lite interconnect
This commit is contained in:
enjoy-digital 2020-07-22 23:03:07 +02:00 committed by GitHub
commit 99e88dfc0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 1214 additions and 363 deletions

View file

@ -101,7 +101,7 @@ class SoCCSRRegion:
# SoCBusHandler ------------------------------------------------------------------------------------
class SoCBusHandler(Module):
supported_standard = ["wishbone"]
supported_standard = ["wishbone", "axi-lite"]
supported_data_width = [32, 64]
supported_address_width = [32]
@ -281,48 +281,57 @@ class SoCBusHandler(Module):
def add_adapter(self, name, interface, direction="m2s"):
assert direction in ["m2s", "s2m"]
if isinstance(interface, wishbone.Interface):
if interface.data_width != self.data_width:
new_interface = wishbone.Interface(data_width=self.data_width)
if direction == "m2s":
converter = wishbone.Converter(master=interface, slave=new_interface)
if direction == "s2m":
converter = wishbone.Converter(master=new_interface, slave=interface)
self.submodules += converter
else:
new_interface = interface
elif isinstance(interface, axi.AXILiteInterface):
# Data width conversion
intermediate = axi.AXILiteInterface(data_width=self.data_width)
if interface.data_width != self.data_width:
interface_cls = type(interface)
converter_cls = {
wishbone.Interface: wishbone.Converter,
axi.AXILiteInterface: axi.AXILiteConverter,
}[interface_cls]
converted_interface = interface_cls(data_width=self.data_width)
if direction == "m2s":
converter = axi.AXILiteConverter(master=interface, slave=intermediate)
if direction == "s2m":
converter = axi.AXILiteConverter(master=intermediate, slave=interface)
self.submodules += converter
# Bus type conversion
new_interface = wishbone.Interface(data_width=self.data_width)
if direction == "m2s":
converter = axi.AXILite2Wishbone(axi_lite=intermediate, wishbone=new_interface)
master, slave = interface, converted_interface
elif direction == "s2m":
converter = axi.Wishbone2AXILite(wishbone=new_interface, axi_lite=intermediate)
master, slave = converted_interface, interface
converter = converter_cls(master=master, slave=slave)
self.submodules += converter
else:
raise TypeError(interface)
converted_interface = interface
# Wishbone <-> AXILite bridging
main_bus_cls = {
"wishbone": wishbone.Interface,
"axi-lite": axi.AXILiteInterface,
}[self.standard]
if isinstance(converted_interface, main_bus_cls):
bridged_interface = converted_interface
else:
bridged_interface = main_bus_cls(data_width=self.data_width)
if direction == "m2s":
master, slave = converted_interface, bridged_interface
elif direction == "s2m":
master, slave = bridged_interface, converted_interface
bridge_cls = {
(wishbone.Interface, axi.AXILiteInterface): axi.Wishbone2AXILite,
(axi.AXILiteInterface, wishbone.Interface): axi.AXILite2Wishbone,
}[type(master), type(slave)]
bridge = bridge_cls(master, slave)
self.submodules += bridge
if type(interface) != type(bridged_interface) or interface.data_width != bridged_interface.data_width:
fmt = "{name} Bus {converted} from {frombus} {frombits}-bit to {tobus} {tobits}-bit."
frombus = "Wishbone" if isinstance(interface, wishbone.Interface) else "AXILite"
tobus = "Wishbone" if isinstance(new_interface, wishbone.Interface) else "AXILite"
frombits = interface.data_width
tobits = new_interface.data_width
if frombus != tobus or frombits != tobits:
bus_names = {
wishbone.Interface: "Wishbone",
axi.AXILiteInterface: "AXI Lite",
}
self.logger.info(fmt.format(
name = colorer(name),
converted = colorer("converted", color="cyan"),
frombus = colorer("Wishbone" if isinstance(interface, wishbone.Interface) else "AXILite"),
frombus = colorer(bus_names[type(interface)]),
frombits = colorer(interface.data_width),
tobus = colorer("Wishbone" if isinstance(new_interface, wishbone.Interface) else "AXILite"),
tobits = colorer(new_interface.data_width)))
return new_interface
tobus = colorer(bus_names[type(bridged_interface)]),
tobits = colorer(bridged_interface.data_width)))
return bridged_interface
def add_master(self, name=None, master=None):
if name is None:
@ -757,8 +766,16 @@ class SoC(Module):
self.csr.add(name, use_loc_if_exists=True)
def add_ram(self, name, origin, size, contents=[], mode="rw"):
ram_bus = wishbone.Interface(data_width=self.bus.data_width)
ram = wishbone.SRAM(size, bus=ram_bus, init=contents, read_only=(mode == "r"))
ram_cls = {
"wishbone": wishbone.SRAM,
"axi-lite": axi.AXILiteSRAM,
}[self.bus.standard]
interface_cls = {
"wishbone": wishbone.Interface,
"axi-lite": axi.AXILiteInterface,
}[self.bus.standard]
ram_bus = interface_cls(data_width=self.bus.data_width)
ram = ram_cls(size, bus=ram_bus, init=contents, read_only=(mode == "r"))
self.bus.add_slave(name, ram.bus, SoCRegion(origin=origin, size=size, mode=mode))
self.check_if_exists(name)
self.logger.info("RAM {} {} {}.".format(
@ -771,13 +788,18 @@ class SoC(Module):
self.add_ram(name, origin, size, contents, mode="r")
def add_csr_bridge(self, origin):
self.submodules.csr_bridge = wishbone.Wishbone2CSR(
csr_bridge_cls = {
"wishbone": wishbone.Wishbone2CSR,
"axi-lite": axi.AXILite2CSR,
}[self.bus.standard]
self.submodules.csr_bridge = csr_bridge_cls(
bus_csr = csr_bus.Interface(
address_width = self.csr.address_width,
data_width = self.csr.data_width))
csr_size = 2**(self.csr.address_width + 2)
csr_region = SoCRegion(origin=origin, size=csr_size, cached=False)
self.bus.add_slave("csr", self.csr_bridge.wishbone, csr_region)
bus = getattr(self.csr_bridge, self.bus.standard.replace('-', '_'))
self.bus.add_slave("csr", bus, csr_region)
self.csr.add_master(name="bridge", master=self.csr_bridge.csr)
self.add_config("CSR_DATA_WIDTH", self.csr.data_width)
self.add_config("CSR_ALIGNMENT", self.csr.alignment)
@ -857,18 +879,27 @@ class SoC(Module):
self.logger.info(self.irq)
self.logger.info(colorer("-"*80, color="bright"))
interconnect_p2p_cls = {
"wishbone": wishbone.InterconnectPointToPoint,
"axi-lite": axi.AXILiteInterconnectPointToPoint,
}[self.bus.standard]
interconnect_shared_cls = {
"wishbone": wishbone.InterconnectShared,
"axi-lite": axi.AXILiteInterconnectShared,
}[self.bus.standard]
# SoC Bus Interconnect ---------------------------------------------------------------------
if len(self.bus.masters) and len(self.bus.slaves):
# If 1 bus_master, 1 bus_slave and no address translation, use InterconnectPointToPoint.
if ((len(self.bus.masters) == 1) and
(len(self.bus.slaves) == 1) and
(next(iter(self.bus.regions.values())).origin == 0)):
self.submodules.bus_interconnect = wishbone.InterconnectPointToPoint(
self.submodules.bus_interconnect = interconnect_p2p_cls(
master = next(iter(self.bus.masters.values())),
slave = next(iter(self.bus.slaves.values())))
# Otherwise, use InterconnectShared.
else:
self.submodules.bus_interconnect = wishbone.InterconnectShared(
self.submodules.bus_interconnect = interconnect_shared_cls(
masters = self.bus.masters.values(),
slaves = [(self.bus.regions[n].decoder(self.bus), s) for n, s in self.bus.slaves.items()],
register = True,

View file

@ -251,6 +251,17 @@ class SoCCore(LiteXSoC):
# SoCCore arguments --------------------------------------------------------------------------------
def soc_core_args(parser):
# Bus parameters
parser.add_argument("--bus-standard", default="wishbone",
help="select bus standard: {}, (default=wishbone)".format(
", ".join(SoCBusHandler.supported_standard)))
parser.add_argument("--bus-data-width", default=32, type=auto_int,
help="Bus data width (default=32)")
parser.add_argument("--bus-address-width", default=32, type=auto_int,
help="Bus address width (default=32)")
parser.add_argument("--bus-timeout", default=1e6, type=float,
help="Bus timeout in cycles (default=1e6)")
# CPU parameters
parser.add_argument("--cpu-type", default=None,
help="select CPU: {}, (default=vexriscv)".format(", ".join(iter(cpu.CPUS.keys()))))

View file

@ -5,10 +5,14 @@
"""AXI4 Full/Lite support for LiteX"""
from migen import *
from migen.genlib import roundrobin
from migen.genlib.misc import split, displacer, chooser, WaitTimer
from litex.soc.interconnect import stream
from litex.build.generic_platform import *
from litex.soc.interconnect import csr_bus
# AXI Definition -----------------------------------------------------------------------------------
BURST_FIXED = 0b00
@ -72,6 +76,24 @@ def _connect_axi(master, slave):
r.extend(m.connect(s))
return r
def _axi_layout_flat(axi):
# yields tuples (channel, name, direction)
def get_dir(channel, direction):
if channel in ["b", "r"]:
return {DIR_M_TO_S: DIR_S_TO_M, DIR_S_TO_M: DIR_M_TO_S}[direction]
return direction
for ch in ["aw", "w", "b", "ar", "r"]:
channel = getattr(axi, ch)
for group in channel.layout:
if len(group) == 3:
name, _, direction = group
yield ch, name, get_dir(ch, direction)
else:
_, subgroups = group
for subgroup in subgroups:
name, _, direction = subgroup
yield ch, name, get_dir(ch, direction)
class AXIInterface:
def __init__(self, data_width=32, address_width=32, id_width=1, clock_domain="sys"):
self.data_width = data_width
@ -88,6 +110,9 @@ class AXIInterface:
def connect(self, slave):
return _connect_axi(self, slave)
def layout_flat(self):
return list(_axi_layout_flat(self))
# AXI Lite Definition ------------------------------------------------------------------------------
def ax_lite_description(address_width):
@ -109,16 +134,16 @@ def r_lite_description(data_width):
]
class AXILiteInterface:
def __init__(self, data_width=32, address_width=32, clock_domain="sys"):
def __init__(self, data_width=32, address_width=32, clock_domain="sys", name=None):
self.data_width = data_width
self.address_width = address_width
self.clock_domain = clock_domain
self.aw = stream.Endpoint(ax_lite_description(address_width))
self.w = stream.Endpoint(w_lite_description(data_width))
self.b = stream.Endpoint(b_lite_description())
self.ar = stream.Endpoint(ax_lite_description(address_width))
self.r = stream.Endpoint(r_lite_description(data_width))
self.aw = stream.Endpoint(ax_lite_description(address_width), name=name)
self.w = stream.Endpoint(w_lite_description(data_width), name=name)
self.b = stream.Endpoint(b_lite_description(), name=name)
self.ar = stream.Endpoint(ax_lite_description(address_width), name=name)
self.r = stream.Endpoint(r_lite_description(data_width), name=name)
def get_ios(self, bus_name="wb"):
subsignals = []
@ -161,9 +186,13 @@ class AXILiteInterface:
def connect(self, slave):
return _connect_axi(self, slave)
def layout_flat(self):
return list(_axi_layout_flat(self))
def write(self, addr, data, strb=None):
if strb is None:
strb = 2**len(self.w.strb) - 1
# aw + w
yield self.aw.valid.eq(1)
yield self.aw.addr.eq(addr)
yield self.w.data.eq(data)
@ -173,9 +202,12 @@ class AXILiteInterface:
while not (yield self.aw.ready):
yield
yield self.aw.valid.eq(0)
yield self.aw.addr.eq(0)
while not (yield self.w.ready):
yield
yield self.w.valid.eq(0)
yield self.w.strb.eq(0)
# b
yield self.b.ready.eq(1)
while not (yield self.b.valid):
yield
@ -184,12 +216,14 @@ class AXILiteInterface:
return resp
def read(self, addr):
# ar
yield self.ar.valid.eq(1)
yield self.ar.addr.eq(addr)
yield
while not (yield self.ar.ready):
yield
yield self.ar.valid.eq(0)
# r
yield self.r.ready.eq(1)
while not (yield self.r.valid):
yield
@ -621,14 +655,14 @@ def axi_lite_to_simple(axi_lite, port_adr, port_dat_r, port_dat_w=None, port_we=
return fsm, comb
class AXILite2CSR(Module):
def __init__(self, axi_lite=None, csr=None):
def __init__(self, axi_lite=None, bus_csr=None):
if axi_lite is None:
axi_lite = AXILiteInterface()
if csr is None:
csr = csr.bus.Interface()
if bus_csr is None:
bus_csr = csr_bus.Interface()
self.axi_lite = axi_lite
self.csr = csr
self.csr = bus_csr
fsm, comb = axi_lite_to_simple(self.axi_lite,
port_adr=self.csr.adr, port_dat_r=self.csr.dat_r,
@ -852,3 +886,269 @@ class AXILiteConverter(Module):
raise NotImplementedError("AXILiteUpConverter")
else:
self.comb += master.connect(slave)
# AXILite Timeout ----------------------------------------------------------------------------------
class AXILiteTimeout(Module):
"""Protect master against slave timeouts (master _has_ to respond correctly)"""
def __init__(self, master, cycles):
self.error = Signal()
wr_error = Signal()
rd_error = Signal()
# # #
self.comb += self.error.eq(wr_error | rd_error)
wr_timer = WaitTimer(int(cycles))
rd_timer = WaitTimer(int(cycles))
self.submodules += wr_timer, rd_timer
def channel_fsm(timer, wait_cond, error, response):
fsm = FSM(reset_state="WAIT")
fsm.act("WAIT",
timer.wait.eq(wait_cond),
# done is updated in `sync`, so we must make sure that `ready` has not been issued
# by slave during that single cycle, by checking `timer.wait`
If(timer.done & timer.wait,
error.eq(1),
NextState("RESPOND")
)
)
fsm.act("RESPOND", *response)
return fsm
self.submodules.wr_fsm = channel_fsm(
timer = wr_timer,
wait_cond = (master.aw.valid & ~master.aw.ready) | (master.w.valid & ~master.w.ready),
error = wr_error,
response = [
master.aw.ready.eq(master.aw.valid),
master.w.ready.eq(master.w.valid),
master.b.valid.eq(~master.aw.valid & ~master.w.valid),
master.b.resp.eq(RESP_SLVERR),
If(master.b.valid & master.b.ready,
NextState("WAIT")
)
])
self.submodules.rd_fsm = channel_fsm(
timer = rd_timer,
wait_cond = master.ar.valid & ~master.ar.ready,
error = rd_error,
response = [
master.ar.ready.eq(master.ar.valid),
master.r.valid.eq(~master.ar.valid),
master.r.resp.eq(RESP_SLVERR),
master.r.data.eq(2**len(master.r.data) - 1),
If(master.r.valid & master.r.ready,
NextState("WAIT")
)
])
# AXILite Interconnect -----------------------------------------------------------------------------
class AXILiteInterconnectPointToPoint(Module):
def __init__(self, master, slave):
self.comb += master.connect(slave)
class AXILiteRequestCounter(Module):
def __init__(self, request, response, max_requests=256):
self.counter = counter = Signal(max=max_requests)
self.full = full = Signal()
self.empty = empty = Signal()
self.stall = stall = Signal()
self.ready = self.empty
self.comb += [
full.eq(counter == max_requests - 1),
empty.eq(counter == 0),
stall.eq(request & full),
]
self.sync += [
If(request & response,
counter.eq(counter)
).Elif(request & ~full,
counter.eq(counter + 1)
).Elif(response & ~empty,
counter.eq(counter - 1)
),
]
class AXILiteArbiter(Module):
"""AXI Lite arbiter
Arbitrate between master interfaces and connect one to the target.
New master will not be selected until all requests have been responded to.
Arbitration for write and read channels is done separately.
"""
def __init__(self, masters, target):
self.submodules.rr_write = roundrobin.RoundRobin(len(masters), roundrobin.SP_CE)
self.submodules.rr_read = roundrobin.RoundRobin(len(masters), roundrobin.SP_CE)
def get_sig(interface, channel, name):
return getattr(getattr(interface, channel), name)
# mux master->slave signals
for channel, name, direction in target.layout_flat():
rr = self.rr_write if channel in ["aw", "w", "b"] else self.rr_read
if direction == DIR_M_TO_S:
choices = Array(get_sig(m, channel, name) for m in masters)
self.comb += get_sig(target, channel, name).eq(choices[rr.grant])
# connect slave->master signals
for channel, name, direction in target.layout_flat():
rr = self.rr_write if channel in ["aw", "w", "b"] else self.rr_read
if direction == DIR_S_TO_M:
source = get_sig(target, channel, name)
for i, m in enumerate(masters):
dest = get_sig(m, channel, name)
if name == "ready":
self.comb += dest.eq(source & (rr.grant == i))
else:
self.comb += dest.eq(source)
# allow to change rr.grant only after all requests from a master have been responded to
self.submodules.wr_lock = wr_lock = AXILiteRequestCounter(
request=target.aw.valid & target.aw.ready, response=target.b.valid & target.b.ready)
self.submodules.rd_lock = rd_lock = AXILiteRequestCounter(
request=target.ar.valid & target.ar.ready, response=target.r.valid & target.r.ready)
# switch to next request only if there are no responses pending
self.comb += [
self.rr_write.ce.eq(~(target.aw.valid | target.w.valid | target.b.valid) & wr_lock.ready),
self.rr_read.ce.eq(~(target.ar.valid | target.r.valid) & rd_lock.ready),
]
# connect bus requests to round-robin selectors
self.comb += [
self.rr_write.request.eq(Cat(*[m.aw.valid | m.w.valid | m.b.valid for m in masters])),
self.rr_read.request.eq(Cat(*[m.ar.valid | m.r.valid for m in masters])),
]
class AXILiteDecoder(Module):
_doc_slaves = """
slaves: [(decoder, slave), ...]
List of slaves with address decoders, where `decoder` is a function:
decoder(Signal(address_width - log2(data_width//8))) -> Signal(1)
that returns 1 when the slave is selected and 0 otherwise.
""".strip()
__doc__ = """AXI Lite decoder
Decode master access to particular slave based on its decoder function.
{slaves}
""".format(slaves=_doc_slaves)
def __init__(self, master, slaves, register=False):
# TODO: unused register argument
addr_shift = log2_int(master.data_width//8)
channels = {
"write": {"aw", "w", "b"},
"read": {"ar", "r"},
}
# reverse mapping: directions[channel] -> "write"/"read"
directions = {ch: d for d, chs in channels.items() for ch in chs}
def new_slave_sel():
return {"write": Signal(len(slaves)), "read": Signal(len(slaves))}
slave_sel_dec = new_slave_sel()
slave_sel_reg = new_slave_sel()
slave_sel = new_slave_sel()
# we need to hold the slave selected until all responses come back
# TODO: we could reuse arbiter counters
locks = {
"write": AXILiteRequestCounter(
request=master.aw.valid & master.aw.ready,
response=master.b.valid & master.b.ready),
"read": AXILiteRequestCounter(
request=master.ar.valid & master.ar.ready,
response=master.r.valid & master.r.ready),
}
self.submodules += locks.values()
def get_sig(interface, channel, name):
return getattr(getattr(interface, channel), name)
# # #
# decode slave addresses
for i, (decoder, bus) in enumerate(slaves):
self.comb += [
slave_sel_dec["write"][i].eq(decoder(master.aw.addr[addr_shift:])),
slave_sel_dec["read"][i].eq(decoder(master.ar.addr[addr_shift:])),
]
# change the current selection only when we've got all responses
for channel in locks.keys():
self.sync += If(locks[channel].ready, slave_sel_reg[channel].eq(slave_sel_dec[channel]))
# we have to cut the delaying select
for ch, final in slave_sel.items():
self.comb += If(locks[ch].ready,
final.eq(slave_sel_dec[ch])
).Else(
final.eq(slave_sel_reg[ch])
)
# connect master->slaves signals except valid/ready
for i, (_, slave) in enumerate(slaves):
for channel, name, direction in master.layout_flat():
if direction == DIR_M_TO_S:
src = get_sig(master, channel, name)
dst = get_sig(slave, channel, name)
# mask master control signals depending on slave selection
if name in ["valid", "ready"]:
src = src & slave_sel[directions[channel]][i]
self.comb += dst.eq(src)
# connect slave->master signals masking not selected slaves
for channel, name, direction in master.layout_flat():
if direction == DIR_S_TO_M:
dst = get_sig(master, channel, name)
masked = []
for i, (_, slave) in enumerate(slaves):
src = get_sig(slave, channel, name)
# mask depending on channel
mask = Replicate(slave_sel[directions[channel]][i], len(dst))
masked.append(src & mask)
self.comb += dst.eq(reduce(or_, masked))
class AXILiteInterconnectShared(Module):
__doc__ = """AXI Lite shared interconnect
{slaves}
""".format(slaves=AXILiteDecoder._doc_slaves)
def __init__(self, masters, slaves, register=False, timeout_cycles=1e6):
# TODO: data width
shared = AXILiteInterface()
self.submodules.arbiter = AXILiteArbiter(masters, shared)
self.submodules.decoder = AXILiteDecoder(shared, slaves)
if timeout_cycles is not None:
self.submodules.timeout = AXILiteTimeout(shared, timeout_cycles)
class AXILiteCrossbar(Module):
__doc__ = """AXI Lite crossbar
MxN crossbar for M masters and N slaves.
{slaves}
""".format(slaves=AXILiteDecoder._doc_slaves)
def __init__(self, masters, slaves, register=False, timeout_cycles=1e6):
matches, busses = zip(*slaves)
access_m_s = [[AXILiteInterface() for j in slaves] for i in masters] # a[master][slave]
access_s_m = list(zip(*access_m_s)) # a[slave][master]
# decode each master into its access row
for slaves, master in zip(access_m_s, masters):
slaves = list(zip(matches, slaves))
self.submodules += AXILiteDecoder(master, slaves, register)
# arbitrate each access column onto its slave
for masters, bus in zip(access_s_m, busses):
self.submodules += AXILiteArbiter(masters, bus)

View file

@ -15,7 +15,7 @@ from migen.genlib.misc import split, displacer, chooser, WaitTimer
from litex.build.generic_platform import *
from litex.soc.interconnect import csr
from litex.soc.interconnect import csr, csr_bus
# Wishbone Definition ------------------------------------------------------------------------------

View file

@ -7,7 +7,7 @@ import random
from migen import *
from litex.soc.interconnect.axi import *
from litex.soc.interconnect import wishbone, csr_bus
from litex.soc.interconnect import wishbone
# Software Models ----------------------------------------------------------------------------------
@ -326,314 +326,3 @@ class TestAXI(unittest.TestCase):
r_valid_random = 90,
r_ready_random = 90
)
# TestAXILite --------------------------------------------------------------------------------------
class AXILiteChecker:
def __init__(self, latency=None, rdata_generator=None):
self.latency = latency or (lambda: 0)
self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
self.writes = []
self.reads = []
def delay(self):
for _ in range(self.latency()):
yield
def handle_write(self, axi_lite):
while not (yield axi_lite.aw.valid):
yield
yield from self.delay()
addr = (yield axi_lite.aw.addr)
yield axi_lite.aw.ready.eq(1)
yield
yield axi_lite.aw.ready.eq(0)
while not (yield axi_lite.w.valid):
yield
yield from self.delay()
data = (yield axi_lite.w.data)
strb = (yield axi_lite.w.strb)
yield axi_lite.w.ready.eq(1)
yield
yield axi_lite.w.ready.eq(0)
yield axi_lite.b.valid.eq(1)
yield axi_lite.b.resp.eq(RESP_OKAY)
yield
while not (yield axi_lite.b.ready):
yield
yield axi_lite.b.valid.eq(0)
self.writes.append((addr, data, strb))
def handle_read(self, axi_lite):
while not (yield axi_lite.ar.valid):
yield
yield from self.delay()
addr = (yield axi_lite.ar.addr)
yield axi_lite.ar.ready.eq(1)
yield
yield axi_lite.ar.ready.eq(0)
data = self.rdata_generator(addr)
yield axi_lite.r.valid.eq(1)
yield axi_lite.r.resp.eq(RESP_OKAY)
yield axi_lite.r.data.eq(data)
yield
while not (yield axi_lite.r.ready):
yield
yield axi_lite.r.valid.eq(0)
self.reads.append((addr, data))
@passive
def handler(self, axi_lite):
while True:
if (yield axi_lite.aw.valid):
yield from self.handle_write(axi_lite)
if (yield axi_lite.ar.valid):
yield from self.handle_read(axi_lite)
yield
class TestAXILite(unittest.TestCase):
def test_wishbone2axi2wishbone(self):
class DUT(Module):
def __init__(self):
self.wishbone = wishbone.Interface(data_width=32)
# # #
axi = AXILiteInterface(data_width=32, address_width=32)
wb = wishbone.Interface(data_width=32)
wishbone2axi = Wishbone2AXILite(self.wishbone, axi)
axi2wishbone = AXILite2Wishbone(axi, wb)
self.submodules += wishbone2axi, axi2wishbone
sram = wishbone.SRAM(1024, init=[0x12345678, 0xa55aa55a])
self.submodules += sram
self.comb += wb.connect(sram.bus)
def generator(dut):
dut.errors = 0
if (yield from dut.wishbone.read(0)) != 0x12345678:
dut.errors += 1
if (yield from dut.wishbone.read(1)) != 0xa55aa55a:
dut.errors += 1
for i in range(32):
yield from dut.wishbone.write(i, i)
for i in range(32):
if (yield from dut.wishbone.read(i)) != i:
dut.errors += 1
dut = DUT()
run_simulation(dut, [generator(dut)])
self.assertEqual(dut.errors, 0)
def test_axilite2csr(self):
@passive
def csr_mem_handler(csr, mem):
while True:
adr = (yield csr.adr)
yield csr.dat_r.eq(mem[adr])
if (yield csr.we):
mem[adr] = (yield csr.dat_w)
yield
class DUT(Module):
def __init__(self):
self.axi_lite = AXILiteInterface()
self.csr = csr_bus.Interface()
self.submodules.axilite2csr = AXILite2CSR(self.axi_lite, self.csr)
self.errors = 0
prng = random.Random(42)
mem_ref = [prng.randrange(255) for i in range(100)]
def generator(dut):
dut.errors = 0
for adr, ref in enumerate(mem_ref):
adr = adr << 2
data, resp = (yield from dut.axi_lite.read(adr))
self.assertEqual(resp, 0b00)
if data != ref:
dut.errors += 1
write_data = [prng.randrange(255) for _ in mem_ref]
for adr, wdata in enumerate(write_data):
adr = adr << 2
resp = (yield from dut.axi_lite.write(adr, wdata))
self.assertEqual(resp, 0b00)
rdata, resp = (yield from dut.axi_lite.read(adr))
self.assertEqual(resp, 0b00)
if rdata != wdata:
dut.errors += 1
dut = DUT()
mem = [v for v in mem_ref]
run_simulation(dut, [generator(dut), csr_mem_handler(dut.csr, mem)])
self.assertEqual(dut.errors, 0)
def test_axilite_sram(self):
class DUT(Module):
def __init__(self, size, init):
self.axi_lite = AXILiteInterface()
self.submodules.sram = AXILiteSRAM(size, init=init, bus=self.axi_lite)
self.errors = 0
def generator(dut, ref_init):
for adr, ref in enumerate(ref_init):
adr = adr << 2
data, resp = (yield from dut.axi_lite.read(adr))
self.assertEqual(resp, 0b00)
if data != ref:
dut.errors += 1
write_data = [prng.randrange(255) for _ in ref_init]
for adr, wdata in enumerate(write_data):
adr = adr << 2
resp = (yield from dut.axi_lite.write(adr, wdata))
self.assertEqual(resp, 0b00)
rdata, resp = (yield from dut.axi_lite.read(adr))
self.assertEqual(resp, 0b00)
if rdata != wdata:
dut.errors += 1
prng = random.Random(42)
init = [prng.randrange(2**32) for i in range(100)]
dut = DUT(size=len(init)*4, init=[v for v in init])
run_simulation(dut, [generator(dut, init)])
self.assertEqual(dut.errors, 0)
def converter_test(self, width_from, width_to,
write_pattern=None, write_expected=None,
read_pattern=None, read_expected=None):
assert not (write_pattern is None and read_pattern is None)
if write_pattern is None:
write_pattern = []
write_expected = []
elif len(write_pattern[0]) == 2:
# add w.strb
write_pattern = [(adr, data, 2**(width_from//8)-1) for adr, data in write_pattern]
if read_pattern is None:
read_pattern = []
read_expected = []
class DUT(Module):
def __init__(self, width_from, width_to):
self.master = AXILiteInterface(data_width=width_from)
self.slave = AXILiteInterface(data_width=width_to)
self.submodules.converter = AXILiteConverter(self.master, self.slave)
def generator(axi_lite):
for addr, data, strb in write_pattern or []:
resp = (yield from axi_lite.write(addr, data, strb))
self.assertEqual(resp, RESP_OKAY)
for _ in range(16):
yield
for addr, refdata in read_pattern or []:
data, resp = (yield from axi_lite.read(addr))
self.assertEqual(resp, RESP_OKAY)
self.assertEqual(data, refdata)
for _ in range(4):
yield
def rdata_generator(adr):
for a, v in read_expected:
if a == adr:
return v
return 0xbaadc0de
_latency = 0
def latency():
nonlocal _latency
_latency = (_latency + 1) % 3
return _latency
dut = DUT(width_from=width_from, width_to=width_to)
checker = AXILiteChecker(latency, rdata_generator)
run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)], vcd_name='sim.vcd')
self.assertEqual(checker.writes, write_expected)
self.assertEqual(checker.reads, read_expected)
def test_axilite_down_converter_32to16(self):
write_pattern = [
(0x00000000, 0x22221111),
(0x00000004, 0x44443333),
(0x00000008, 0x66665555),
(0x00000100, 0x88887777),
]
write_expected = [
(0x00000000, 0x1111, 0b11),
(0x00000002, 0x2222, 0b11),
(0x00000004, 0x3333, 0b11),
(0x00000006, 0x4444, 0b11),
(0x00000008, 0x5555, 0b11),
(0x0000000a, 0x6666, 0b11),
(0x00000100, 0x7777, 0b11),
(0x00000102, 0x8888, 0b11),
]
read_pattern = write_pattern
read_expected = [(adr, data) for (adr, data, _) in write_expected]
self.converter_test(width_from=32, width_to=16,
write_pattern=write_pattern, write_expected=write_expected,
read_pattern=read_pattern, read_expected=read_expected)
def test_axilite_down_converter_32to8(self):
write_pattern = [
(0x00000000, 0x44332211),
(0x00000004, 0x88776655),
]
write_expected = [
(0x00000000, 0x11, 0b1),
(0x00000001, 0x22, 0b1),
(0x00000002, 0x33, 0b1),
(0x00000003, 0x44, 0b1),
(0x00000004, 0x55, 0b1),
(0x00000005, 0x66, 0b1),
(0x00000006, 0x77, 0b1),
(0x00000007, 0x88, 0b1),
]
read_pattern = write_pattern
read_expected = [(adr, data) for (adr, data, _) in write_expected]
self.converter_test(width_from=32, width_to=8,
write_pattern=write_pattern, write_expected=write_expected,
read_pattern=read_pattern, read_expected=read_expected)
def test_axilite_down_converter_64to32(self):
write_pattern = [
(0x00000000, 0x2222222211111111),
(0x00000008, 0x4444444433333333),
]
write_expected = [
(0x00000000, 0x11111111, 0b1111),
(0x00000004, 0x22222222, 0b1111),
(0x00000008, 0x33333333, 0b1111),
(0x0000000c, 0x44444444, 0b1111),
]
read_pattern = write_pattern
read_expected = [(adr, data) for (adr, data, _) in write_expected]
self.converter_test(width_from=64, width_to=32,
write_pattern=write_pattern, write_expected=write_expected,
read_pattern=read_pattern, read_expected=read_expected)
def test_axilite_down_converter_strb(self):
write_pattern = [
(0x00000000, 0x22221111, 0b1100),
(0x00000004, 0x44443333, 0b1111),
(0x00000008, 0x66665555, 0b1011),
(0x00000100, 0x88887777, 0b0011),
]
write_expected = [
(0x00000002, 0x2222, 0b11),
(0x00000004, 0x3333, 0b11),
(0x00000006, 0x4444, 0b11),
(0x00000008, 0x5555, 0b11),
(0x0000000a, 0x6666, 0b10),
(0x00000100, 0x7777, 0b11),
]
self.converter_test(width_from=32, width_to=16,
write_pattern=write_pattern, write_expected=write_expected)

820
test/test_axi_lite.py Normal file
View file

@ -0,0 +1,820 @@
# This file is Copyright (c) 2020 Antmicro <www.antmicro.com>
# License: BSD
import unittest
import random
from migen import *
from litex.soc.interconnect.axi import *
from litex.soc.interconnect import wishbone, csr_bus
# Helpers ------------------------------------------------------------------------------------------
def _int_or_call(int_or_func):
if callable(int_or_func):
return int_or_func()
return int_or_func
@passive
def timeout_generator(ticks):
import os
for i in range(ticks):
if os.environ.get("TIMEOUT_DEBUG", "") == "1":
print("tick {}".format(i))
yield
raise TimeoutError("Timeout after %d ticks" % ticks)
class AXILiteChecker:
def __init__(self, ready_latency=0, response_latency=0, rdata_generator=None):
self.ready_latency = ready_latency
self.response_latency = response_latency
self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
self.writes = [] # (addr, data, strb)
self.reads = [] # (addr, data)
def delay(self, latency):
for _ in range(_int_or_call(latency)):
yield
def handle_write(self, axi_lite):
# aw
while not (yield axi_lite.aw.valid):
yield
yield from self.delay(self.ready_latency)
addr = (yield axi_lite.aw.addr)
yield axi_lite.aw.ready.eq(1)
yield
yield axi_lite.aw.ready.eq(0)
while not (yield axi_lite.w.valid):
yield
yield from self.delay(self.ready_latency)
# w
data = (yield axi_lite.w.data)
strb = (yield axi_lite.w.strb)
yield axi_lite.w.ready.eq(1)
yield
yield axi_lite.w.ready.eq(0)
yield from self.delay(self.response_latency)
# b
yield axi_lite.b.valid.eq(1)
yield axi_lite.b.resp.eq(RESP_OKAY)
yield
while not (yield axi_lite.b.ready):
yield
yield axi_lite.b.valid.eq(0)
self.writes.append((addr, data, strb))
def handle_read(self, axi_lite):
# ar
while not (yield axi_lite.ar.valid):
yield
yield from self.delay(self.ready_latency)
addr = (yield axi_lite.ar.addr)
yield axi_lite.ar.ready.eq(1)
yield
yield axi_lite.ar.ready.eq(0)
yield from self.delay(self.response_latency)
# r
data = self.rdata_generator(addr)
yield axi_lite.r.valid.eq(1)
yield axi_lite.r.resp.eq(RESP_OKAY)
yield axi_lite.r.data.eq(data)
yield
while not (yield axi_lite.r.ready):
yield
yield axi_lite.r.valid.eq(0)
yield axi_lite.r.data.eq(0)
self.reads.append((addr, data))
@passive
def handler(self, axi_lite):
while True:
if (yield axi_lite.aw.valid):
yield from self.handle_write(axi_lite)
if (yield axi_lite.ar.valid):
yield from self.handle_read(axi_lite)
yield
class AXILitePatternGenerator:
def __init__(self, axi_lite, pattern, delay=0):
# patter: (rw, addr, data)
self.axi_lite = axi_lite
self.pattern = pattern
self.delay = delay
self.errors = 0
self.read_errors = []
self.resp_errors = {"w": 0, "r": 0}
def handler(self):
for rw, addr, data in self.pattern:
assert rw in ["w", "r"]
if rw == "w":
strb = 2**len(self.axi_lite.w.strb) - 1
resp = (yield from self.axi_lite.write(addr, data, strb))
else:
rdata, resp = (yield from self.axi_lite.read(addr))
if rdata != data:
self.read_errors.append((rdata, data))
self.errors += 1
if resp != RESP_OKAY:
self.resp_errors[rw] += 1
self.errors += 1
for _ in range(_int_or_call(self.delay)):
yield
for _ in range(16):
yield
# TestAXILite --------------------------------------------------------------------------------------
class TestAXILite(unittest.TestCase):
def test_wishbone2axi2wishbone(self):
class DUT(Module):
def __init__(self):
self.wishbone = wishbone.Interface(data_width=32)
# # #
axi = AXILiteInterface(data_width=32, address_width=32)
wb = wishbone.Interface(data_width=32)
wishbone2axi = Wishbone2AXILite(self.wishbone, axi)
axi2wishbone = AXILite2Wishbone(axi, wb)
self.submodules += wishbone2axi, axi2wishbone
sram = wishbone.SRAM(1024, init=[0x12345678, 0xa55aa55a])
self.submodules += sram
self.comb += wb.connect(sram.bus)
def generator(dut):
dut.errors = 0
if (yield from dut.wishbone.read(0)) != 0x12345678:
dut.errors += 1
if (yield from dut.wishbone.read(1)) != 0xa55aa55a:
dut.errors += 1
for i in range(32):
yield from dut.wishbone.write(i, i)
for i in range(32):
if (yield from dut.wishbone.read(i)) != i:
dut.errors += 1
dut = DUT()
run_simulation(dut, [generator(dut)])
self.assertEqual(dut.errors, 0)
def test_axilite2csr(self):
@passive
def csr_mem_handler(csr, mem):
while True:
adr = (yield csr.adr)
yield csr.dat_r.eq(mem[adr])
if (yield csr.we):
mem[adr] = (yield csr.dat_w)
yield
class DUT(Module):
def __init__(self):
self.axi_lite = AXILiteInterface()
self.csr = csr_bus.Interface()
self.submodules.axilite2csr = AXILite2CSR(self.axi_lite, self.csr)
self.errors = 0
prng = random.Random(42)
mem_ref = [prng.randrange(255) for i in range(100)]
def generator(dut):
dut.errors = 0
for adr, ref in enumerate(mem_ref):
adr = adr << 2
data, resp = (yield from dut.axi_lite.read(adr))
self.assertEqual(resp, 0b00)
if data != ref:
dut.errors += 1
write_data = [prng.randrange(255) for _ in mem_ref]
for adr, wdata in enumerate(write_data):
adr = adr << 2
resp = (yield from dut.axi_lite.write(adr, wdata))
self.assertEqual(resp, 0b00)
rdata, resp = (yield from dut.axi_lite.read(adr))
self.assertEqual(resp, 0b00)
if rdata != wdata:
dut.errors += 1
dut = DUT()
mem = [v for v in mem_ref]
run_simulation(dut, [generator(dut), csr_mem_handler(dut.csr, mem)])
self.assertEqual(dut.errors, 0)
def test_axilite_sram(self):
class DUT(Module):
def __init__(self, size, init):
self.axi_lite = AXILiteInterface()
self.submodules.sram = AXILiteSRAM(size, init=init, bus=self.axi_lite)
self.errors = 0
def generator(dut, ref_init):
for adr, ref in enumerate(ref_init):
adr = adr << 2
data, resp = (yield from dut.axi_lite.read(adr))
self.assertEqual(resp, 0b00)
if data != ref:
dut.errors += 1
write_data = [prng.randrange(255) for _ in ref_init]
for adr, wdata in enumerate(write_data):
adr = adr << 2
resp = (yield from dut.axi_lite.write(adr, wdata))
self.assertEqual(resp, 0b00)
rdata, resp = (yield from dut.axi_lite.read(adr))
self.assertEqual(resp, 0b00)
if rdata != wdata:
dut.errors += 1
prng = random.Random(42)
init = [prng.randrange(2**32) for i in range(100)]
dut = DUT(size=len(init)*4, init=[v for v in init])
run_simulation(dut, [generator(dut, init)])
self.assertEqual(dut.errors, 0)
def converter_test(self, width_from, width_to,
write_pattern=None, write_expected=None,
read_pattern=None, read_expected=None):
assert not (write_pattern is None and read_pattern is None)
if write_pattern is None:
write_pattern = []
write_expected = []
elif len(write_pattern[0]) == 2:
# add w.strb
write_pattern = [(adr, data, 2**(width_from//8)-1) for adr, data in write_pattern]
if read_pattern is None:
read_pattern = []
read_expected = []
class DUT(Module):
def __init__(self, width_from, width_to):
self.master = AXILiteInterface(data_width=width_from)
self.slave = AXILiteInterface(data_width=width_to)
self.submodules.converter = AXILiteConverter(self.master, self.slave)
def generator(axi_lite):
for addr, data, strb in write_pattern or []:
resp = (yield from axi_lite.write(addr, data, strb))
self.assertEqual(resp, RESP_OKAY)
for _ in range(16):
yield
for addr, refdata in read_pattern or []:
data, resp = (yield from axi_lite.read(addr))
self.assertEqual(resp, RESP_OKAY)
self.assertEqual(data, refdata)
for _ in range(4):
yield
def rdata_generator(adr):
for a, v in read_expected:
if a == adr:
return v
return 0xbaadc0de
_latency = 0
def latency():
nonlocal _latency
_latency = (_latency + 1) % 3
return _latency
dut = DUT(width_from=width_from, width_to=width_to)
checker = AXILiteChecker(ready_latency=latency, rdata_generator=rdata_generator)
run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)])
self.assertEqual(checker.writes, write_expected)
self.assertEqual(checker.reads, read_expected)
def test_axilite_down_converter_32to16(self):
write_pattern = [
(0x00000000, 0x22221111),
(0x00000004, 0x44443333),
(0x00000008, 0x66665555),
(0x00000100, 0x88887777),
]
write_expected = [
(0x00000000, 0x1111, 0b11),
(0x00000002, 0x2222, 0b11),
(0x00000004, 0x3333, 0b11),
(0x00000006, 0x4444, 0b11),
(0x00000008, 0x5555, 0b11),
(0x0000000a, 0x6666, 0b11),
(0x00000100, 0x7777, 0b11),
(0x00000102, 0x8888, 0b11),
]
read_pattern = write_pattern
read_expected = [(adr, data) for (adr, data, _) in write_expected]
self.converter_test(width_from=32, width_to=16,
write_pattern=write_pattern, write_expected=write_expected,
read_pattern=read_pattern, read_expected=read_expected)
def test_axilite_down_converter_32to8(self):
write_pattern = [
(0x00000000, 0x44332211),
(0x00000004, 0x88776655),
]
write_expected = [
(0x00000000, 0x11, 0b1),
(0x00000001, 0x22, 0b1),
(0x00000002, 0x33, 0b1),
(0x00000003, 0x44, 0b1),
(0x00000004, 0x55, 0b1),
(0x00000005, 0x66, 0b1),
(0x00000006, 0x77, 0b1),
(0x00000007, 0x88, 0b1),
]
read_pattern = write_pattern
read_expected = [(adr, data) for (adr, data, _) in write_expected]
self.converter_test(width_from=32, width_to=8,
write_pattern=write_pattern, write_expected=write_expected,
read_pattern=read_pattern, read_expected=read_expected)
def test_axilite_down_converter_64to32(self):
write_pattern = [
(0x00000000, 0x2222222211111111),
(0x00000008, 0x4444444433333333),
]
write_expected = [
(0x00000000, 0x11111111, 0b1111),
(0x00000004, 0x22222222, 0b1111),
(0x00000008, 0x33333333, 0b1111),
(0x0000000c, 0x44444444, 0b1111),
]
read_pattern = write_pattern
read_expected = [(adr, data) for (adr, data, _) in write_expected]
self.converter_test(width_from=64, width_to=32,
write_pattern=write_pattern, write_expected=write_expected,
read_pattern=read_pattern, read_expected=read_expected)
def test_axilite_down_converter_strb(self):
write_pattern = [
(0x00000000, 0x22221111, 0b1100),
(0x00000004, 0x44443333, 0b1111),
(0x00000008, 0x66665555, 0b1011),
(0x00000100, 0x88887777, 0b0011),
]
write_expected = [
(0x00000002, 0x2222, 0b11),
(0x00000004, 0x3333, 0b11),
(0x00000006, 0x4444, 0b11),
(0x00000008, 0x5555, 0b11),
(0x0000000a, 0x6666, 0b10),
(0x00000100, 0x7777, 0b11),
]
self.converter_test(width_from=32, width_to=16,
write_pattern=write_pattern, write_expected=write_expected)
# TestAXILiteInterconnet ---------------------------------------------------------------------------
class TestAXILiteInterconnect(unittest.TestCase):
def test_interconnect_p2p(self):
class DUT(Module):
def __init__(self):
self.master = master = AXILiteInterface()
self.slave = slave = AXILiteInterface()
self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
pattern = [
("w", 0x00000004, 0x11111111),
("w", 0x0000000c, 0x22222222),
("r", 0x00000010, 0x33333333),
("r", 0x00000018, 0x44444444),
]
def rdata_generator(adr):
for rw, a, v in pattern:
if rw == "r" and a == adr:
return v
return 0xbaadc0de
dut = DUT()
checker = AXILiteChecker(rdata_generator=rdata_generator)
generators = [
AXILitePatternGenerator(dut.master, pattern).handler(),
checker.handler(dut.slave),
]
run_simulation(dut, generators)
self.assertEqual(checker.writes, [(addr, data, 0b1111) for rw, addr, data in pattern if rw == "w"])
self.assertEqual(checker.reads, [(addr, data) for rw, addr, data in pattern if rw == "r"])
def test_timeout(self):
class DUT(Module):
def __init__(self):
self.master = master = AXILiteInterface()
self.slave = slave = AXILiteInterface()
self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
self.submodules.timeout = AXILiteTimeout(master, 16)
def generator(axi_lite):
resp = (yield from axi_lite.write(0x00001000, 0x11111111))
self.assertEqual(resp, RESP_OKAY)
resp = (yield from axi_lite.write(0x00002000, 0x22222222))
self.assertEqual(resp, RESP_SLVERR)
data, resp = (yield from axi_lite.read(0x00003000))
self.assertEqual(resp, RESP_SLVERR)
self.assertEqual(data, 0xffffffff)
yield
def checker(axi_lite):
for _ in range(16):
yield
yield axi_lite.aw.ready.eq(1)
yield axi_lite.w.ready.eq(1)
yield
yield axi_lite.aw.ready.eq(0)
yield axi_lite.w.ready.eq(0)
yield axi_lite.b.valid.eq(1)
yield
while not (yield axi_lite.b.ready):
yield
yield axi_lite.b.valid.eq(0)
dut = DUT()
generators = [
generator(dut.master),
checker(dut.slave),
timeout_generator(300),
]
run_simulation(dut, generators)
def test_arbiter_order(self):
class DUT(Module):
def __init__(self, n_masters):
self.masters = [AXILiteInterface() for _ in range(n_masters)]
self.slave = AXILiteInterface()
self.submodules.arbiter = AXILiteArbiter(self.masters, self.slave)
def generator(n, axi_lite, delay=0):
def gen(i):
return 100*n + i
for i in range(4):
resp = (yield from axi_lite.write(gen(i), gen(i)))
self.assertEqual(resp, RESP_OKAY)
for _ in range(delay):
yield
for i in range(4):
data, resp = (yield from axi_lite.read(gen(i)))
self.assertEqual(resp, RESP_OKAY)
for _ in range(delay):
yield
for _ in range(8):
yield
n_masters = 3
# with no delay each master will do all transfers at once
with self.subTest(delay=0):
dut = DUT(n_masters)
checker = AXILiteChecker()
generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators)
order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
self.assertEqual([addr for addr, data in checker.reads], order)
# with some delay, the round-robin arbiter will iterate over masters
with self.subTest(delay=1):
dut = DUT(n_masters)
checker = AXILiteChecker()
generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators)
order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
self.assertEqual([addr for addr, data in checker.reads], order)
def test_arbiter_holds_grant_until_response(self):
class DUT(Module):
def __init__(self, n_masters):
self.masters = [AXILiteInterface() for _ in range(n_masters)]
self.slave = AXILiteInterface()
self.submodules.arbiter = AXILiteArbiter(self.masters, self.slave)
def generator(n, axi_lite, delay=0):
def gen(i):
return 100*n + i
for i in range(4):
resp = (yield from axi_lite.write(gen(i), gen(i)))
self.assertEqual(resp, RESP_OKAY)
for _ in range(delay):
yield
for i in range(4):
data, resp = (yield from axi_lite.read(gen(i)))
self.assertEqual(resp, RESP_OKAY)
for _ in range(delay):
yield
for _ in range(8):
yield
n_masters = 3
# with no delay each master will do all transfers at once
with self.subTest(delay=0):
dut = DUT(n_masters)
checker = AXILiteChecker(response_latency=lambda: 3)
generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators)
order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
self.assertEqual([addr for addr, data in checker.reads], order)
# with some delay, the round-robin arbiter will iterate over masters
with self.subTest(delay=1):
dut = DUT(n_masters)
checker = AXILiteChecker(response_latency=lambda: 3)
generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators)
order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
self.assertEqual([addr for addr, data in checker.reads], order)
def address_decoder(self, i, size=0x100, python=False):
# bytes to 32-bit words aligned
_size = (size) >> 2
_origin = (size * i) >> 2
if python: # for python integers
shift = log2_int(_size)
return lambda a: ((a >> shift) == (_origin >> shift))
# for migen signals
return lambda a: (a[log2_int(_size):] == (_origin >> log2_int(_size)))
def decoder_test(self, n_slaves, pattern, generator_delay=0):
class DUT(Module):
def __init__(self, decoders):
self.master = AXILiteInterface()
self.slaves = [AXILiteInterface() for _ in range(len(decoders))]
slaves = list(zip(decoders, self.slaves))
self.submodules.decoder = AXILiteDecoder(self.master, slaves)
def rdata_generator(adr):
for rw, a, v in pattern:
if rw == "r" and a == adr:
return v
return 0xbaadc0de
dut = DUT([self.address_decoder(i) for i in range(n_slaves)])
checkers = [AXILiteChecker(rdata_generator=rdata_generator) for _ in dut.slaves]
generators = [AXILitePatternGenerator(dut.master, pattern, delay=generator_delay).handler()]
generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
generators += [timeout_generator(300)]
run_simulation(dut, generators)
return checkers
def test_decoder_write(self):
for delay in [0, 1, 0]:
with self.subTest(delay=delay):
slaves = self.decoder_test(n_slaves=3, pattern=[
("w", 0x010, 1),
("w", 0x110, 2),
("w", 0x210, 3),
("w", 0x011, 1),
("w", 0x012, 1),
("w", 0x111, 2),
("w", 0x112, 2),
("w", 0x211, 3),
("w", 0x212, 3),
], generator_delay=delay)
def addr(checker_list):
return [entry[0] for entry in checker_list]
self.assertEqual(addr(slaves[0].writes), [0x010, 0x011, 0x012])
self.assertEqual(addr(slaves[1].writes), [0x110, 0x111, 0x112])
self.assertEqual(addr(slaves[2].writes), [0x210, 0x211, 0x212])
for slave in slaves:
self.assertEqual(slave.reads, [])
def test_decoder_read(self):
for delay in [0, 1]:
with self.subTest(delay=delay):
slaves = self.decoder_test(n_slaves=3, pattern=[
("r", 0x010, 1),
("r", 0x110, 2),
("r", 0x210, 3),
("r", 0x011, 1),
("r", 0x012, 1),
("r", 0x111, 2),
("r", 0x112, 2),
("r", 0x211, 3),
("r", 0x212, 3),
], generator_delay=delay)
def addr(checker_list):
return [entry[0] for entry in checker_list]
self.assertEqual(addr(slaves[0].reads), [0x010, 0x011, 0x012])
self.assertEqual(addr(slaves[1].reads), [0x110, 0x111, 0x112])
self.assertEqual(addr(slaves[2].reads), [0x210, 0x211, 0x212])
for slave in slaves:
self.assertEqual(slave.writes, [])
def test_decoder_read_write(self):
for delay in [0, 1]:
with self.subTest(delay=delay):
slaves = self.decoder_test(n_slaves=3, pattern=[
("w", 0x010, 1),
("w", 0x110, 2),
("r", 0x111, 2),
("r", 0x011, 1),
("r", 0x211, 3),
("w", 0x210, 3),
], generator_delay=delay)
def addr(checker_list):
return [entry[0] for entry in checker_list]
self.assertEqual(addr(slaves[0].writes), [0x010])
self.assertEqual(addr(slaves[0].reads), [0x011])
self.assertEqual(addr(slaves[1].writes), [0x110])
self.assertEqual(addr(slaves[1].reads), [0x111])
self.assertEqual(addr(slaves[2].writes), [0x210])
self.assertEqual(addr(slaves[2].reads), [0x211])
def test_decoder_stall(self):
with self.assertRaises(TimeoutError):
self.decoder_test(n_slaves=3, pattern=[
("w", 0x300, 1),
])
with self.assertRaises(TimeoutError):
self.decoder_test(n_slaves=3, pattern=[
("r", 0x300, 1),
])
def interconnect_test(self, master_patterns, slave_decoders,
master_delay=0, slave_ready_latency=0, slave_response_latency=0,
disconnected_slaves=None, timeout=300, interconnect=AXILiteInterconnectShared,
**kwargs):
# number of masters/slaves is defined by the number of patterns/decoders
# master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
# slave_decoders: list of address decoders per slave
# delay/latency: control the speed of masters/slaves
# disconnected_slaves: list of slave numbers that shouldn't respond to any transactions
class DUT(Module):
def __init__(self, n_masters, decoders, **kwargs):
self.masters = [AXILiteInterface(name="master") for _ in range(n_masters)]
self.slaves = [AXILiteInterface(name="slave") for _ in range(len(decoders))]
slaves = list(zip(decoders, self.slaves))
self.submodules.interconnect = interconnect(self.masters, slaves, **kwargs)
class ReadDataGenerator:
# Generates data based on decoded addresses and data defined in master_patterns
def __init__(self, patterns):
self.mem = {}
for pattern in patterns:
for rw, addr, val in pattern:
if rw == "r":
assert addr not in self.mem
self.mem[addr] = val
def getter(self, n):
# on miss will give default data depending on slave n
return lambda addr: self.mem.get(addr, 0xbaad0000 + n)
def new_checker(rdata_generator):
return AXILiteChecker(ready_latency=slave_ready_latency,
response_latency=slave_response_latency,
rdata_generator=rdata_generator)
# perpare test
dut = DUT(len(master_patterns), slave_decoders, **kwargs)
rdata_generator = ReadDataGenerator(master_patterns)
checkers = [new_checker(rdata_generator.getter(i)) for i, _ in enumerate(master_patterns)]
pattern_generators = [AXILitePatternGenerator(dut.masters[i], pattern, delay=master_delay)
for i, pattern in enumerate(master_patterns)]
# run simulator
generators = [gen.handler() for gen in pattern_generators]
generators += [checker.handler(slave)
for i, (slave, checker) in enumerate(zip(dut.slaves, checkers))
if i not in (disconnected_slaves or [])]
generators += [timeout_generator(timeout)]
run_simulation(dut, generators, vcd_name='sim.vcd')
return pattern_generators, checkers
def test_interconnect_shared_basic(self):
master_patterns = [
[("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
[("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
[("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
]
slave_decoders = [self.address_decoder(i) for i in range(3)]
generators, checkers = self.interconnect_test(master_patterns, slave_decoders,
master_delay=1)
for gen in generators:
self.assertEqual(gen.errors, 0)
def addr(checker_list):
return [entry[0] for entry in checker_list]
self.assertEqual(addr(checkers[0].writes), [0x000, 0x010])
self.assertEqual(addr(checkers[1].writes), [0x101, 0x111, 0x112])
self.assertEqual(addr(checkers[2].writes), [0x220, 0x221, 0x202, 0x222])
self.assertEqual(addr(checkers[0].reads), [])
self.assertEqual(addr(checkers[1].reads), [])
self.assertEqual(addr(checkers[2].reads), [])
def interconnect_stress_test(self, timeout=1000, **kwargs):
prng = random.Random(42)
n_masters = 3
n_slaves = 3
pattern_length = 64
slave_region_size = 0x10000000
# for testing purpose each master will access only its own region of a slave
master_region_size = 0x1000
assert n_masters*master_region_size < slave_region_size
def gen_pattern(n, length):
assert length < master_region_size
for i_access in range(length):
rw = "w" if prng.randint(0, 1) == 0 else "r"
i_slave = prng.randrange(n_slaves)
addr = i_slave*slave_region_size + n*master_region_size + i_access
data = addr
yield rw, addr, data
master_patterns = [list(gen_pattern(i, pattern_length)) for i in range(n_masters)]
slave_decoders = [self.address_decoder(i, size=slave_region_size) for i in range(n_slaves)]
slave_decoders_py = [self.address_decoder(i, size=slave_region_size, python=True)
for i in range(n_slaves)]
generators, checkers = self.interconnect_test(master_patterns, slave_decoders,
timeout=timeout, **kwargs)
for gen in generators:
read_errors = [" 0x{:08x} vs 0x{:08x}".format(v, ref) for v, ref in gen.read_errors]
msg = "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
gen.resp_errors, "\n".join(read_errors))
if not kwargs.get("disconnected_slaves", None):
self.assertEqual(gen.errors, 0, msg=msg)
else: # when some slaves are disconnected we should have some errors
self.assertNotEqual(gen.errors, 0, msg=msg)
# make sure all the accesses at slave side are in correct address region
for i_slave, (checker, decoder) in enumerate(zip(checkers, slave_decoders_py)):
for addr in (entry[0] for entry in checker.writes + checker.reads):
# compensate for the fact that decoders work on word-aligned addresses
self.assertNotEqual(decoder(addr >> 2), 0)
def test_interconnect_shared_stress_no_delay(self):
self.interconnect_stress_test(timeout=1000,
master_delay=0,
slave_ready_latency=0,
slave_response_latency=0)
def test_interconnect_shared_stress_rand_short(self):
prng = random.Random(42)
rand = lambda: prng.randrange(4)
self.interconnect_stress_test(timeout=2000,
master_delay=rand,
slave_ready_latency=rand,
slave_response_latency=rand)
def test_interconnect_shared_stress_rand_long(self):
prng = random.Random(42)
rand = lambda: prng.randrange(16)
self.interconnect_stress_test(timeout=4000,
master_delay=rand,
slave_ready_latency=rand,
slave_response_latency=rand)
def test_interconnect_shared_stress_timeout(self):
self.interconnect_stress_test(timeout=4000,
disconnected_slaves=[1],
timeout_cycles=50)
def test_crossbar_stress_no_delay(self):
self.interconnect_stress_test(timeout=1000,
master_delay=0,
slave_ready_latency=0,
slave_response_latency=0,
interconnect=AXILiteCrossbar)
def test_crossbar_stress_rand(self):
prng = random.Random(42)
rand = lambda: prng.randrange(4)
self.interconnect_stress_test(timeout=2000,
master_delay=rand,
slave_ready_latency=rand,
slave_response_latency=rand,
interconnect=AXILiteCrossbar)