test/axi: add shared AXI Lite interconnect tests

This commit is contained in:
Jędrzej Boczar 2020-07-22 14:01:02 +02:00
parent 3a08b21d44
commit 2cab7fbf0f
2 changed files with 198 additions and 40 deletions

View File

@ -134,16 +134,16 @@ def r_lite_description(data_width):
] ]
class AXILiteInterface: class AXILiteInterface:
def __init__(self, data_width=32, address_width=32, clock_domain="sys"): def __init__(self, data_width=32, address_width=32, clock_domain="sys", name=None):
self.data_width = data_width self.data_width = data_width
self.address_width = address_width self.address_width = address_width
self.clock_domain = clock_domain self.clock_domain = clock_domain
self.aw = stream.Endpoint(ax_lite_description(address_width)) self.aw = stream.Endpoint(ax_lite_description(address_width), name=name)
self.w = stream.Endpoint(w_lite_description(data_width)) self.w = stream.Endpoint(w_lite_description(data_width), name=name)
self.b = stream.Endpoint(b_lite_description()) self.b = stream.Endpoint(b_lite_description(), name=name)
self.ar = stream.Endpoint(ax_lite_description(address_width)) self.ar = stream.Endpoint(ax_lite_description(address_width), name=name)
self.r = stream.Endpoint(r_lite_description(data_width)) self.r = stream.Endpoint(r_lite_description(data_width), name=name)
def get_ios(self, bus_name="wb"): def get_ios(self, bus_name="wb"):
subsignals = [] subsignals = []
@ -1116,11 +1116,11 @@ class AXILiteInterconnectShared(Module):
{slaves} {slaves}
""".format(slaves=AXILiteDecoder._doc_slaves) """.format(slaves=AXILiteDecoder._doc_slaves)
def __init__(self, masters, slaves, register=False, timeout_cycles=1e6): def __init__(self, masters, slaves, timeout_cycles=1e6):
# TODO: data width # TODO: data width
shared = AXILiteInterface() shared = AXILiteInterface()
self.submodules.arbiter = AXILiteArbiter(masters, shared) self.submodules.arbiter = AXILiteArbiter(masters, shared)
self.submodules.decoder = AXILiteDecoder(shared, slaves, register) self.submodules.decoder = AXILiteDecoder(shared, slaves)
if timeout_cycles is not None: if timeout_cycles is not None:
self.submodules.timeout = AXILiteTimeout(shared, timeout_cycles) self.submodules.timeout = AXILiteTimeout(shared, timeout_cycles)
@ -1132,7 +1132,7 @@ class AXILiteCrossbar(Module):
{slaves} {slaves}
""".format(slaves=AXILiteDecoder._doc_slaves) """.format(slaves=AXILiteDecoder._doc_slaves)
def __init__(self, masters, slaves, register=False): def __init__(self, masters, slaves, timeout_cycles=1e6):
matches, busses = zip(*slaves) matches, busses = zip(*slaves)
access_m_s = [[AXILiteInterface() for j in slaves] for i in masters] # a[master][slave] access_m_s = [[AXILiteInterface() for j in slaves] for i in masters] # a[master][slave]
access_s_m = list(zip(*access_m_s)) # a[slave][master] access_s_m = list(zip(*access_m_s)) # a[slave][master]

View File

@ -329,16 +329,21 @@ class TestAXI(unittest.TestCase):
# TestAXILite -------------------------------------------------------------------------------------- # TestAXILite --------------------------------------------------------------------------------------
def _int_or_call(int_or_func):
if callable(int_or_func):
return int_or_func()
return int_or_func
class AXILiteChecker: class AXILiteChecker:
def __init__(self, ready_latency=None, response_latency=None, rdata_generator=None): def __init__(self, ready_latency=0, response_latency=0, rdata_generator=None):
self.ready_latency = ready_latency or (lambda: 0) self.ready_latency = ready_latency
self.response_latency = response_latency or (lambda: 0) self.response_latency = response_latency
self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de) self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
self.writes = [] # (addr, data, strb) self.writes = [] # (addr, data, strb)
self.reads = [] # (addr, data) self.reads = [] # (addr, data)
def delay(self, latency): def delay(self, latency):
for _ in range(latency()): for _ in range(_int_or_call(latency)):
yield yield
def handle_write(self, axi_lite): def handle_write(self, axi_lite):
@ -401,8 +406,11 @@ class AXILiteChecker:
yield yield
@passive @passive
def timeout(ticks): def timeout_generator(ticks):
for _ in range(ticks): import os
for i in range(ticks):
if os.environ.get("TIMEOUT_DEBUG", "") == "1":
print("tick {}".format(i))
yield yield
raise TimeoutError("Timeout after %d ticks" % ticks) raise TimeoutError("Timeout after %d ticks" % ticks)
@ -655,22 +663,35 @@ class TestAXILite(unittest.TestCase):
# TestAXILiteInterconnet --------------------------------------------------------------------------- # TestAXILiteInterconnet ---------------------------------------------------------------------------
class TestAXILiteInterconnect(unittest.TestCase): class AXILitePatternGenerator:
def axilite_pattern_generator(self, axi_lite, pattern, delay=0): def __init__(self, axi_lite, pattern, delay=0):
for rw, addr, data in pattern: self.axi_lite = axi_lite
self.pattern = pattern
self.delay = delay
self.errors = 0
self.read_errors = []
self.resp_errors = {"w": 0, "r": 0}
def handler(self):
for rw, addr, data in self.pattern:
assert rw in ["w", "r"] assert rw in ["w", "r"]
if rw == "w": if rw == "w":
resp = (yield from axi_lite.write(addr, data, 2**len(axi_lite.w.strb) - 1)) strb = 2**len(self.axi_lite.w.strb) - 1
self.assertEqual(resp, RESP_OKAY) resp = (yield from self.axi_lite.write(addr, data, strb))
else: else:
rdata, resp = (yield from axi_lite.read(addr)) rdata, resp = (yield from self.axi_lite.read(addr))
self.assertEqual(resp, RESP_OKAY) if rdata != data:
self.assertEqual(rdata, data) self.read_errors.append((rdata, data))
for _ in range(delay): self.errors += 1
if resp != RESP_OKAY:
self.resp_errors[rw] += 1
self.errors += 1
for _ in range(_int_or_call(self.delay)):
yield yield
for _ in range(16): for _ in range(16):
yield yield
class TestAXILiteInterconnect(unittest.TestCase):
def test_interconnect_p2p(self): def test_interconnect_p2p(self):
class DUT(Module): class DUT(Module):
def __init__(self): def __init__(self):
@ -694,7 +715,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
dut = DUT() dut = DUT()
checker = AXILiteChecker(rdata_generator=rdata_generator) checker = AXILiteChecker(rdata_generator=rdata_generator)
generators = [ generators = [
self.axilite_pattern_generator(dut.master, pattern), AXILitePatternGenerator(dut.master, pattern).handler(),
checker.handler(dut.slave), checker.handler(dut.slave),
] ]
run_simulation(dut, generators) run_simulation(dut, generators)
@ -737,7 +758,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
generators = [ generators = [
generator(dut.master), generator(dut.master),
checker(dut.slave), checker(dut.slave),
timeout(300), timeout_generator(300),
] ]
run_simulation(dut, generators) run_simulation(dut, generators)
@ -772,7 +793,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
dut = DUT(n_masters) dut = DUT(n_masters)
checker = AXILiteChecker() checker = AXILiteChecker()
generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)] generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
generators += [timeout(300), checker.handler(dut.slave)] generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators) run_simulation(dut, generators)
order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203] order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order) self.assertEqual([addr for addr, data, strb in checker.writes], order)
@ -783,7 +804,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
dut = DUT(n_masters) dut = DUT(n_masters)
checker = AXILiteChecker() checker = AXILiteChecker()
generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)] generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
generators += [timeout(300), checker.handler(dut.slave)] generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators) run_simulation(dut, generators)
order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203] order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order) self.assertEqual([addr for addr, data, strb in checker.writes], order)
@ -820,7 +841,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
dut = DUT(n_masters) dut = DUT(n_masters)
checker = AXILiteChecker(response_latency=lambda: 3) checker = AXILiteChecker(response_latency=lambda: 3)
generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)] generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
generators += [timeout(300), checker.handler(dut.slave)] generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators) run_simulation(dut, generators)
order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203] order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order) self.assertEqual([addr for addr, data, strb in checker.writes], order)
@ -831,12 +852,22 @@ class TestAXILiteInterconnect(unittest.TestCase):
dut = DUT(n_masters) dut = DUT(n_masters)
checker = AXILiteChecker(response_latency=lambda: 3) checker = AXILiteChecker(response_latency=lambda: 3)
generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)] generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
generators += [timeout(300), checker.handler(dut.slave)] generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators) run_simulation(dut, generators)
order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203] order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order) self.assertEqual([addr for addr, data, strb in checker.writes], order)
self.assertEqual([addr for addr, data in checker.reads], order) self.assertEqual([addr for addr, data in checker.reads], order)
def address_decoder(self, i, size=0x100, python=False):
# bytes to 32-bit words aligned
_size = (size) >> 2
_origin = (size * i) >> 2
if python: # for python integers
shift = log2_int(_size)
return lambda a: ((a >> shift) == (_origin >> shift))
# for migen signals
return lambda a: (a[log2_int(_size):] == (_origin >> log2_int(_size)))
def decoder_test(self, n_slaves, pattern, generator_delay=0): def decoder_test(self, n_slaves, pattern, generator_delay=0):
class DUT(Module): class DUT(Module):
def __init__(self, decoders): def __init__(self, decoders):
@ -845,25 +876,19 @@ class TestAXILiteInterconnect(unittest.TestCase):
slaves = list(zip(decoders, self.slaves)) slaves = list(zip(decoders, self.slaves))
self.submodules.decoder = AXILiteDecoder(self.master, slaves) self.submodules.decoder = AXILiteDecoder(self.master, slaves)
def decoder(i):
# bytes to 32-bit words aligned
size = (0x100) >> 2
origin = (0x100 * i) >> 2
return lambda a: (a[log2_int(size):] == (origin >> log2_int(size)))
def rdata_generator(adr): def rdata_generator(adr):
for rw, a, v in pattern: for rw, a, v in pattern:
if rw == "r" and a == adr: if rw == "r" and a == adr:
return v return v
return 0xbaadc0de return 0xbaadc0de
dut = DUT([decoder(i) for i in range(n_slaves)]) dut = DUT([self.address_decoder(i) for i in range(n_slaves)])
checkers = [AXILiteChecker(rdata_generator=rdata_generator) for _ in dut.slaves] checkers = [AXILiteChecker(rdata_generator=rdata_generator) for _ in dut.slaves]
generators = [self.axilite_pattern_generator(dut.master, pattern, delay=generator_delay)] generators = [AXILitePatternGenerator(dut.master, pattern, delay=generator_delay).handler()]
generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)] generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
generators += [timeout(300)] generators += [timeout_generator(300)]
run_simulation(dut, generators, vcd_name='sim.vcd') run_simulation(dut, generators)
return checkers return checkers
@ -946,3 +971,136 @@ class TestAXILiteInterconnect(unittest.TestCase):
self.decoder_test(n_slaves=3, pattern=[ self.decoder_test(n_slaves=3, pattern=[
("r", 0x300, 1), ("r", 0x300, 1),
]) ])
def interconnect_shared_test(self, master_patterns, slave_decoders,
master_delay=0, slave_ready_latency=0, slave_response_latency=0,
timeout=300, **kwargs):
# number of masters/slaves is defined by the number of patterns/decoders
# master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
# slave_decoders: list of address decoders per slave
class DUT(Module):
def __init__(self, n_masters, decoders, **kwargs):
self.masters = [AXILiteInterface(name="master") for _ in range(n_masters)]
self.slaves = [AXILiteInterface(name="slave") for _ in range(len(decoders))]
slaves = list(zip(decoders, self.slaves))
self.submodules.interconnect = AXILiteInterconnectShared(self.masters, slaves, **kwargs)
class ReadDataGenerator:
# Generates data based on decoded addresses and data defined in master_patterns
def __init__(self, patterns):
self.mem = {}
for pattern in patterns:
for rw, addr, val in pattern:
if rw == "r":
assert addr not in self.mem
self.mem[addr] = val
def getter(self, n):
# on miss will give default data depending on slave n
return lambda addr: self.mem.get(addr, 0xbaad0000 + n)
def new_checker(rdata_generator):
return AXILiteChecker(ready_latency=slave_ready_latency,
response_latency=slave_response_latency,
rdata_generator=rdata_generator)
# perpare test
dut = DUT(len(master_patterns), slave_decoders, **kwargs)
rdata_generator = ReadDataGenerator(master_patterns)
checkers = [new_checker(rdata_generator.getter(i)) for i, _ in enumerate(master_patterns)]
pattern_generators = [AXILitePatternGenerator(dut.masters[i], pattern, delay=master_delay)
for i, pattern in enumerate(master_patterns)]
# run simulator
generators = [gen.handler() for gen in pattern_generators]
generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
generators += [timeout_generator(timeout)]
run_simulation(dut, generators)
return pattern_generators, checkers
def test_interconnect_shared_basic(self):
master_patterns = [
[("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
[("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
[("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
]
slave_decoders = [self.address_decoder(i) for i in range(3)]
generators, checkers = self.interconnect_shared_test(master_patterns, slave_decoders,
master_delay=1)
for gen in generators:
self.assertEqual(gen.errors, 0)
def addr(checker_list):
return [entry[0] for entry in checker_list]
self.assertEqual(addr(checkers[0].writes), [0x000, 0x010])
self.assertEqual(addr(checkers[1].writes), [0x101, 0x111, 0x112])
self.assertEqual(addr(checkers[2].writes), [0x220, 0x221, 0x202, 0x222])
self.assertEqual(addr(checkers[0].reads), [])
self.assertEqual(addr(checkers[1].reads), [])
self.assertEqual(addr(checkers[2].reads), [])
def interconnect_shared_stress_test(self, timeout=1000, **kwargs):
prng = random.Random(42)
n_masters = 3
n_slaves = 3
pattern_length = 64
slave_region_size = 0x10000000
# for testing purpose each master will access only its own region of a slave
master_region_size = 0x1000
assert n_masters*master_region_size < slave_region_size
def gen_pattern(n, length):
assert length < master_region_size
for i_access in range(length):
rw = "w" if prng.randint(0, 1) == 0 else "r"
i_slave = prng.randrange(n_slaves)
addr = i_slave*slave_region_size + n*master_region_size + i_access
data = addr
yield rw, addr, data
master_patterns = [list(gen_pattern(i, pattern_length)) for i in range(n_masters)]
slave_decoders = [self.address_decoder(i, size=slave_region_size) for i in range(n_slaves)]
slave_decoders_py = [self.address_decoder(i, size=slave_region_size, python=True)
for i in range(n_slaves)]
generators, checkers = self.interconnect_shared_test(master_patterns, slave_decoders,
timeout=timeout, **kwargs)
for gen in generators:
read_errors = [" 0x{:08x} vs 0x{:08x}".format(v, ref) for v, ref in gen.read_errors]
msg = "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
gen.resp_errors, "\n".join(read_errors))
self.assertEqual(gen.errors, 0, msg=msg)
# make sure all the accesses at slave side are in correct address region
for i_slave, (checker, decoder) in enumerate(zip(checkers, slave_decoders_py)):
for addr in (entry[0] for entry in checker.writes + checker.reads):
# compensate for the fact that decoders work on word-aligned addresses
self.assertNotEqual(decoder(addr >> 2), 0)
def test_interconnect_shared_stress_no_delay(self):
self.interconnect_shared_stress_test(timeout=1000,
master_delay=0,
slave_ready_latency=0,
slave_response_latency=0)
def test_interconnect_shared_stress_rand_short(self):
prng = random.Random(42)
rand = lambda: prng.randrange(4)
self.interconnect_shared_stress_test(timeout=2000,
master_delay=rand,
slave_ready_latency=rand,
slave_response_latency=rand)
def test_interconnect_shared_stress_rand_long(self):
prng = random.Random(42)
rand = lambda: prng.randrange(16)
self.interconnect_shared_stress_test(timeout=4000,
master_delay=rand,
slave_ready_latency=rand,
slave_response_latency=rand)