soc/interconnect/axi: point-to-point interconnect and timeout module with tests

This commit is contained in:
Jędrzej Boczar 2020-07-17 16:48:46 +02:00
parent b4c1120e3d
commit f47ccdae99
2 changed files with 180 additions and 7 deletions

View file

@ -5,10 +5,14 @@
"""AXI4 Full/Lite support for LiteX"""
from migen import *
from migen.genlib import roundrobin
from migen.genlib.misc import split, displacer, chooser, WaitTimer
from litex.soc.interconnect import stream
from litex.build.generic_platform import *
from litex.soc.interconnect import csr_bus
# AXI Definition -----------------------------------------------------------------------------------
BURST_FIXED = 0b00
@ -72,6 +76,24 @@ def _connect_axi(master, slave):
r.extend(m.connect(s))
return r
def _axi_layout_flat(axi):
# yields tuples (channel, name, direction)
def get_dir(channel, direction):
if channel in ["b", "r"]:
return {DIR_M_TO_S: DIR_S_TO_M, DIR_S_TO_M: DIR_M_TO_S}[direction]
return direction
for ch in ["aw", "w", "b", "ar", "r"]:
channel = getattr(axi, ch)
for group in channel.layout:
if len(group) == 3:
name, _, direction = group
yield ch, name, get_dir(ch, direction)
else:
_, subgroups = group
for subgroup in subgroups:
name, _, direction = subgroup
yield ch, name, get_dir(ch, direction)
class AXIInterface:
def __init__(self, data_width=32, address_width=32, id_width=1, clock_domain="sys"):
self.data_width = data_width
@ -88,6 +110,9 @@ class AXIInterface:
def connect(self, slave):
return _connect_axi(self, slave)
def layout_flat(self):
return list(_axi_layout_flat(self))
# AXI Lite Definition ------------------------------------------------------------------------------
def ax_lite_description(address_width):
@ -161,6 +186,9 @@ class AXILiteInterface:
def connect(self, slave):
return _connect_axi(self, slave)
def layout_flat(self):
return list(_axi_layout_flat(self))
def write(self, addr, data, strb=None):
if strb is None:
strb = 2**len(self.w.strb) - 1
@ -621,14 +649,14 @@ def axi_lite_to_simple(axi_lite, port_adr, port_dat_r, port_dat_w=None, port_we=
return fsm, comb
class AXILite2CSR(Module):
def __init__(self, axi_lite=None, csr=None):
def __init__(self, axi_lite=None, bus_csr=None):
if axi_lite is None:
axi_lite = AXILiteInterface()
if csr is None:
csr = csr.bus.Interface()
if bus_csr is None:
bus_csr = csr_bus.Interface()
self.axi_lite = axi_lite
self.csr = csr
self.csr = bus_csr
fsm, comb = axi_lite_to_simple(self.axi_lite,
port_adr=self.csr.adr, port_dat_r=self.csr.dat_r,
@ -852,3 +880,59 @@ class AXILiteConverter(Module):
raise NotImplementedError("AXILiteUpConverter")
else:
self.comb += master.connect(slave)
# AXILite Timeout ----------------------------------------------------------------------------------
class AXILiteTimeout(Module):
"""Protect master against slave timeouts (master _has_ to respond correctly)"""
def __init__(self, master, cycles):
self.error = Signal()
# # #
timer = WaitTimer(int(cycles))
self.submodules += timer
is_write = Signal()
is_read = Signal()
self.submodules.fsm = fsm = FSM()
fsm.act("WAIT",
is_write.eq((master.aw.valid & ~master.aw.ready) | (master.w.valid & ~master.w.ready)),
is_read.eq(master.ar.valid & ~master.ar.ready),
timer.wait.eq(is_write | is_read),
# done is updated in `sync`, so we must make sure that `ready` has not been issued
# by slave during that single cycle, by checking `timer.wait`
If(timer.done & timer.wait,
self.error.eq(1),
If(is_write,
NextState("RESPOND-WRITE")
).Else(
NextState("RESPOND-READ")
)
)
)
fsm.act("RESPOND-WRITE",
master.aw.ready.eq(master.aw.valid),
master.w.ready.eq(master.w.valid),
master.b.valid.eq(~master.aw.valid & ~master.w.valid),
master.b.resp.eq(RESP_SLVERR),
If(master.b.valid & master.b.ready,
NextState("WAIT")
)
)
fsm.act("RESPOND-READ",
master.ar.ready.eq(master.ar.valid),
master.r.valid.eq(~master.ar.valid),
master.r.resp.eq(RESP_SLVERR),
master.r.data.eq(2**len(master.r.data) - 1),
If(master.r.valid & master.r.ready,
NextState("WAIT")
)
)
# AXILite Interconnect -----------------------------------------------------------------------------
class AXILiteInterconnectPointToPoint(Module):
def __init__(self, master, slave):
self.comb += master.connect(slave)

View file

@ -333,8 +333,8 @@ class AXILiteChecker:
def __init__(self, latency=None, rdata_generator=None):
self.latency = latency or (lambda: 0)
self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
self.writes = []
self.reads = []
self.writes = [] # (addr, data, strb)
self.reads = [] # (addr, data)
def delay(self):
for _ in range(self.latency()):
@ -555,7 +555,7 @@ class TestAXILite(unittest.TestCase):
dut = DUT(width_from=width_from, width_to=width_to)
checker = AXILiteChecker(latency, rdata_generator)
run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)], vcd_name='sim.vcd')
run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)])
self.assertEqual(checker.writes, write_expected)
self.assertEqual(checker.reads, read_expected)
@ -637,3 +637,92 @@ class TestAXILite(unittest.TestCase):
]
self.converter_test(width_from=32, width_to=16,
write_pattern=write_pattern, write_expected=write_expected)
def axilite_pattern_generator(self, axi_lite, pattern):
for rw, addr, data in pattern:
assert rw in ["w", "r"]
if rw == "w":
resp = (yield from axi_lite.write(addr, data, 2**len(axi_lite.w.strb) - 1))
self.assertEqual(resp, RESP_OKAY)
else:
rdata, resp = (yield from axi_lite.read(addr))
self.assertEqual(resp, RESP_OKAY)
self.assertEqual(rdata, data)
for _ in range(16):
yield
def test_axilite_interconnect_p2p(self):
class DUT(Module):
def __init__(self):
self.master = master = AXILiteInterface()
self.slave = slave = AXILiteInterface()
self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
pattern = [
("w", 0x00000004, 0x11111111),
("w", 0x0000000c, 0x22222222),
("r", 0x00000010, 0x33333333),
("r", 0x00000018, 0x44444444),
]
def rdata_generator(adr):
for rw, a, v in pattern:
if rw == "r" and a == adr:
return v
return 0xbaadc0de
dut = DUT()
checker = AXILiteChecker(rdata_generator=rdata_generator)
generators = [
self.axilite_pattern_generator(dut.master, pattern),
checker.handler(dut.slave),
]
run_simulation(dut, generators)
self.assertEqual(checker.writes, [(addr, data, 0b1111) for rw, addr, data in pattern if rw == "w"])
self.assertEqual(checker.reads, [(addr, data) for rw, addr, data in pattern if rw == "r"])
def test_axilite_timeout(self):
class DUT(Module):
def __init__(self):
self.master = master = AXILiteInterface()
self.slave = slave = AXILiteInterface()
self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
self.submodules.timeout = AXILiteTimeout(master, 16)
@passive
def timeout(ticks):
for _ in range(ticks):
yield
raise TimeoutError("Timeout after %d ticks" % ticks)
def generator(axi_lite):
resp = (yield from axi_lite.write(0x00001000, 0x11111111))
self.assertEqual(resp, RESP_OKAY)
resp = (yield from axi_lite.write(0x00002000, 0x22222222))
self.assertEqual(resp, RESP_SLVERR)
data, resp = (yield from axi_lite.read(0x00003000))
self.assertEqual(resp, RESP_SLVERR)
self.assertEqual(data, 0xffffffff)
yield
def checker(axi_lite):
for _ in range(16):
yield
yield axi_lite.aw.ready.eq(1)
yield axi_lite.w.ready.eq(1)
yield
yield axi_lite.aw.ready.eq(0)
yield axi_lite.w.ready.eq(0)
yield axi_lite.b.valid.eq(1)
yield
while not (yield axi_lite.b.ready):
yield
yield axi_lite.b.valid.eq(0)
dut = DUT()
generators = [
generator(dut.master),
checker(dut.slave),
timeout(300),
]
run_simulation(dut, generators, vcd_name='sim.vcd')