From 2cab7fbf0f96afd5bbb17fd94fd6de3ab6162d73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Boczar?= Date: Wed, 22 Jul 2020 14:01:02 +0200 Subject: [PATCH] test/axi: add shared AXI Lite interconnect tests --- litex/soc/interconnect/axi.py | 18 +-- test/test_axi.py | 220 +++++++++++++++++++++++++++++----- 2 files changed, 198 insertions(+), 40 deletions(-) diff --git a/litex/soc/interconnect/axi.py b/litex/soc/interconnect/axi.py index e76abe301..71040ac7d 100644 --- a/litex/soc/interconnect/axi.py +++ b/litex/soc/interconnect/axi.py @@ -134,16 +134,16 @@ def r_lite_description(data_width): ] class AXILiteInterface: - def __init__(self, data_width=32, address_width=32, clock_domain="sys"): + def __init__(self, data_width=32, address_width=32, clock_domain="sys", name=None): self.data_width = data_width self.address_width = address_width self.clock_domain = clock_domain - self.aw = stream.Endpoint(ax_lite_description(address_width)) - self.w = stream.Endpoint(w_lite_description(data_width)) - self.b = stream.Endpoint(b_lite_description()) - self.ar = stream.Endpoint(ax_lite_description(address_width)) - self.r = stream.Endpoint(r_lite_description(data_width)) + self.aw = stream.Endpoint(ax_lite_description(address_width), name=name) + self.w = stream.Endpoint(w_lite_description(data_width), name=name) + self.b = stream.Endpoint(b_lite_description(), name=name) + self.ar = stream.Endpoint(ax_lite_description(address_width), name=name) + self.r = stream.Endpoint(r_lite_description(data_width), name=name) def get_ios(self, bus_name="wb"): subsignals = [] @@ -1116,11 +1116,11 @@ class AXILiteInterconnectShared(Module): {slaves} """.format(slaves=AXILiteDecoder._doc_slaves) - def __init__(self, masters, slaves, register=False, timeout_cycles=1e6): + def __init__(self, masters, slaves, timeout_cycles=1e6): # TODO: data width shared = AXILiteInterface() self.submodules.arbiter = AXILiteArbiter(masters, shared) - self.submodules.decoder = AXILiteDecoder(shared, slaves, register) + self.submodules.decoder = AXILiteDecoder(shared, slaves) if timeout_cycles is not None: self.submodules.timeout = AXILiteTimeout(shared, timeout_cycles) @@ -1132,7 +1132,7 @@ class AXILiteCrossbar(Module): {slaves} """.format(slaves=AXILiteDecoder._doc_slaves) - def __init__(self, masters, slaves, register=False): + def __init__(self, masters, slaves, timeout_cycles=1e6): matches, busses = zip(*slaves) access_m_s = [[AXILiteInterface() for j in slaves] for i in masters] # a[master][slave] access_s_m = list(zip(*access_m_s)) # a[slave][master] diff --git a/test/test_axi.py b/test/test_axi.py index 1c8345569..f76e48b06 100644 --- a/test/test_axi.py +++ b/test/test_axi.py @@ -329,16 +329,21 @@ class TestAXI(unittest.TestCase): # TestAXILite -------------------------------------------------------------------------------------- +def _int_or_call(int_or_func): + if callable(int_or_func): + return int_or_func() + return int_or_func + class AXILiteChecker: - def __init__(self, ready_latency=None, response_latency=None, rdata_generator=None): - self.ready_latency = ready_latency or (lambda: 0) - self.response_latency = response_latency or (lambda: 0) + def __init__(self, ready_latency=0, response_latency=0, rdata_generator=None): + self.ready_latency = ready_latency + self.response_latency = response_latency self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de) self.writes = [] # (addr, data, strb) self.reads = [] # (addr, data) def delay(self, latency): - for _ in range(latency()): + for _ in range(_int_or_call(latency)): yield def handle_write(self, axi_lite): @@ -401,8 +406,11 @@ class AXILiteChecker: yield @passive -def timeout(ticks): - for _ in range(ticks): +def timeout_generator(ticks): + import os + for i in range(ticks): + if os.environ.get("TIMEOUT_DEBUG", "") == "1": + print("tick {}".format(i)) yield raise TimeoutError("Timeout after %d ticks" % ticks) @@ -655,22 +663,35 @@ class TestAXILite(unittest.TestCase): # TestAXILiteInterconnet --------------------------------------------------------------------------- -class TestAXILiteInterconnect(unittest.TestCase): - def axilite_pattern_generator(self, axi_lite, pattern, delay=0): - for rw, addr, data in pattern: +class AXILitePatternGenerator: + def __init__(self, axi_lite, pattern, delay=0): + self.axi_lite = axi_lite + self.pattern = pattern + self.delay = delay + self.errors = 0 + self.read_errors = [] + self.resp_errors = {"w": 0, "r": 0} + + def handler(self): + for rw, addr, data in self.pattern: assert rw in ["w", "r"] if rw == "w": - resp = (yield from axi_lite.write(addr, data, 2**len(axi_lite.w.strb) - 1)) - self.assertEqual(resp, RESP_OKAY) + strb = 2**len(self.axi_lite.w.strb) - 1 + resp = (yield from self.axi_lite.write(addr, data, strb)) else: - rdata, resp = (yield from axi_lite.read(addr)) - self.assertEqual(resp, RESP_OKAY) - self.assertEqual(rdata, data) - for _ in range(delay): + rdata, resp = (yield from self.axi_lite.read(addr)) + if rdata != data: + self.read_errors.append((rdata, data)) + self.errors += 1 + if resp != RESP_OKAY: + self.resp_errors[rw] += 1 + self.errors += 1 + for _ in range(_int_or_call(self.delay)): yield for _ in range(16): yield +class TestAXILiteInterconnect(unittest.TestCase): def test_interconnect_p2p(self): class DUT(Module): def __init__(self): @@ -694,7 +715,7 @@ class TestAXILiteInterconnect(unittest.TestCase): dut = DUT() checker = AXILiteChecker(rdata_generator=rdata_generator) generators = [ - self.axilite_pattern_generator(dut.master, pattern), + AXILitePatternGenerator(dut.master, pattern).handler(), checker.handler(dut.slave), ] run_simulation(dut, generators) @@ -737,7 +758,7 @@ class TestAXILiteInterconnect(unittest.TestCase): generators = [ generator(dut.master), checker(dut.slave), - timeout(300), + timeout_generator(300), ] run_simulation(dut, generators) @@ -772,7 +793,7 @@ class TestAXILiteInterconnect(unittest.TestCase): dut = DUT(n_masters) checker = AXILiteChecker() generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)] - generators += [timeout(300), checker.handler(dut.slave)] + generators += [timeout_generator(300), checker.handler(dut.slave)] run_simulation(dut, generators) order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203] self.assertEqual([addr for addr, data, strb in checker.writes], order) @@ -783,7 +804,7 @@ class TestAXILiteInterconnect(unittest.TestCase): dut = DUT(n_masters) checker = AXILiteChecker() generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)] - generators += [timeout(300), checker.handler(dut.slave)] + generators += [timeout_generator(300), checker.handler(dut.slave)] run_simulation(dut, generators) order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203] self.assertEqual([addr for addr, data, strb in checker.writes], order) @@ -820,7 +841,7 @@ class TestAXILiteInterconnect(unittest.TestCase): dut = DUT(n_masters) checker = AXILiteChecker(response_latency=lambda: 3) generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)] - generators += [timeout(300), checker.handler(dut.slave)] + generators += [timeout_generator(300), checker.handler(dut.slave)] run_simulation(dut, generators) order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203] self.assertEqual([addr for addr, data, strb in checker.writes], order) @@ -831,12 +852,22 @@ class TestAXILiteInterconnect(unittest.TestCase): dut = DUT(n_masters) checker = AXILiteChecker(response_latency=lambda: 3) generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)] - generators += [timeout(300), checker.handler(dut.slave)] + generators += [timeout_generator(300), checker.handler(dut.slave)] run_simulation(dut, generators) order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203] self.assertEqual([addr for addr, data, strb in checker.writes], order) self.assertEqual([addr for addr, data in checker.reads], order) + def address_decoder(self, i, size=0x100, python=False): + # bytes to 32-bit words aligned + _size = (size) >> 2 + _origin = (size * i) >> 2 + if python: # for python integers + shift = log2_int(_size) + return lambda a: ((a >> shift) == (_origin >> shift)) + # for migen signals + return lambda a: (a[log2_int(_size):] == (_origin >> log2_int(_size))) + def decoder_test(self, n_slaves, pattern, generator_delay=0): class DUT(Module): def __init__(self, decoders): @@ -845,25 +876,19 @@ class TestAXILiteInterconnect(unittest.TestCase): slaves = list(zip(decoders, self.slaves)) self.submodules.decoder = AXILiteDecoder(self.master, slaves) - def decoder(i): - # bytes to 32-bit words aligned - size = (0x100) >> 2 - origin = (0x100 * i) >> 2 - return lambda a: (a[log2_int(size):] == (origin >> log2_int(size))) - def rdata_generator(adr): for rw, a, v in pattern: if rw == "r" and a == adr: return v return 0xbaadc0de - dut = DUT([decoder(i) for i in range(n_slaves)]) + dut = DUT([self.address_decoder(i) for i in range(n_slaves)]) checkers = [AXILiteChecker(rdata_generator=rdata_generator) for _ in dut.slaves] - generators = [self.axilite_pattern_generator(dut.master, pattern, delay=generator_delay)] + generators = [AXILitePatternGenerator(dut.master, pattern, delay=generator_delay).handler()] generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)] - generators += [timeout(300)] - run_simulation(dut, generators, vcd_name='sim.vcd') + generators += [timeout_generator(300)] + run_simulation(dut, generators) return checkers @@ -946,3 +971,136 @@ class TestAXILiteInterconnect(unittest.TestCase): self.decoder_test(n_slaves=3, pattern=[ ("r", 0x300, 1), ]) + + def interconnect_shared_test(self, master_patterns, slave_decoders, + master_delay=0, slave_ready_latency=0, slave_response_latency=0, + timeout=300, **kwargs): + # number of masters/slaves is defined by the number of patterns/decoders + # master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data)) + # slave_decoders: list of address decoders per slave + class DUT(Module): + def __init__(self, n_masters, decoders, **kwargs): + self.masters = [AXILiteInterface(name="master") for _ in range(n_masters)] + self.slaves = [AXILiteInterface(name="slave") for _ in range(len(decoders))] + slaves = list(zip(decoders, self.slaves)) + self.submodules.interconnect = AXILiteInterconnectShared(self.masters, slaves, **kwargs) + + class ReadDataGenerator: + # Generates data based on decoded addresses and data defined in master_patterns + def __init__(self, patterns): + self.mem = {} + for pattern in patterns: + for rw, addr, val in pattern: + if rw == "r": + assert addr not in self.mem + self.mem[addr] = val + + def getter(self, n): + # on miss will give default data depending on slave n + return lambda addr: self.mem.get(addr, 0xbaad0000 + n) + + def new_checker(rdata_generator): + return AXILiteChecker(ready_latency=slave_ready_latency, + response_latency=slave_response_latency, + rdata_generator=rdata_generator) + + # perpare test + dut = DUT(len(master_patterns), slave_decoders, **kwargs) + rdata_generator = ReadDataGenerator(master_patterns) + checkers = [new_checker(rdata_generator.getter(i)) for i, _ in enumerate(master_patterns)] + pattern_generators = [AXILitePatternGenerator(dut.masters[i], pattern, delay=master_delay) + for i, pattern in enumerate(master_patterns)] + + # run simulator + generators = [gen.handler() for gen in pattern_generators] + generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)] + generators += [timeout_generator(timeout)] + run_simulation(dut, generators) + + return pattern_generators, checkers + + def test_interconnect_shared_basic(self): + master_patterns = [ + [("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)], + [("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)], + [("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)], + ] + slave_decoders = [self.address_decoder(i) for i in range(3)] + + generators, checkers = self.interconnect_shared_test(master_patterns, slave_decoders, + master_delay=1) + + for gen in generators: + self.assertEqual(gen.errors, 0) + + def addr(checker_list): + return [entry[0] for entry in checker_list] + + self.assertEqual(addr(checkers[0].writes), [0x000, 0x010]) + self.assertEqual(addr(checkers[1].writes), [0x101, 0x111, 0x112]) + self.assertEqual(addr(checkers[2].writes), [0x220, 0x221, 0x202, 0x222]) + self.assertEqual(addr(checkers[0].reads), []) + self.assertEqual(addr(checkers[1].reads), []) + self.assertEqual(addr(checkers[2].reads), []) + + def interconnect_shared_stress_test(self, timeout=1000, **kwargs): + prng = random.Random(42) + + n_masters = 3 + n_slaves = 3 + pattern_length = 64 + slave_region_size = 0x10000000 + # for testing purpose each master will access only its own region of a slave + master_region_size = 0x1000 + assert n_masters*master_region_size < slave_region_size + + def gen_pattern(n, length): + assert length < master_region_size + for i_access in range(length): + rw = "w" if prng.randint(0, 1) == 0 else "r" + i_slave = prng.randrange(n_slaves) + addr = i_slave*slave_region_size + n*master_region_size + i_access + data = addr + yield rw, addr, data + + master_patterns = [list(gen_pattern(i, pattern_length)) for i in range(n_masters)] + slave_decoders = [self.address_decoder(i, size=slave_region_size) for i in range(n_slaves)] + slave_decoders_py = [self.address_decoder(i, size=slave_region_size, python=True) + for i in range(n_slaves)] + + generators, checkers = self.interconnect_shared_test(master_patterns, slave_decoders, + timeout=timeout, **kwargs) + + for gen in generators: + read_errors = [" 0x{:08x} vs 0x{:08x}".format(v, ref) for v, ref in gen.read_errors] + msg = "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format( + gen.resp_errors, "\n".join(read_errors)) + self.assertEqual(gen.errors, 0, msg=msg) + + # make sure all the accesses at slave side are in correct address region + for i_slave, (checker, decoder) in enumerate(zip(checkers, slave_decoders_py)): + for addr in (entry[0] for entry in checker.writes + checker.reads): + # compensate for the fact that decoders work on word-aligned addresses + self.assertNotEqual(decoder(addr >> 2), 0) + + def test_interconnect_shared_stress_no_delay(self): + self.interconnect_shared_stress_test(timeout=1000, + master_delay=0, + slave_ready_latency=0, + slave_response_latency=0) + + def test_interconnect_shared_stress_rand_short(self): + prng = random.Random(42) + rand = lambda: prng.randrange(4) + self.interconnect_shared_stress_test(timeout=2000, + master_delay=rand, + slave_ready_latency=rand, + slave_response_latency=rand) + + def test_interconnect_shared_stress_rand_long(self): + prng = random.Random(42) + rand = lambda: prng.randrange(16) + self.interconnect_shared_stress_test(timeout=4000, + master_delay=rand, + slave_ready_latency=rand, + slave_response_latency=rand)