From 7b8b68afcb48c8e14e12ba97f41dec628b734714 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Boczar?= Date: Mon, 6 Apr 2020 11:31:52 +0200 Subject: [PATCH] test: add core.bankmachine tests --- test/test_bankmachine.py | 422 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 422 insertions(+) create mode 100644 test/test_bankmachine.py diff --git a/test/test_bankmachine.py b/test/test_bankmachine.py new file mode 100644 index 0000000..1d692cb --- /dev/null +++ b/test/test_bankmachine.py @@ -0,0 +1,422 @@ +# This file is Copyright (c) 2020 Antmicro +# License: BSD + +import math +import unittest + +from migen import * + +# from litex.soc.interconnect import stream + +from litedram.common import * +from litedram.core.bankmachine import BankMachine + +from test.common import timeout_generator + + +class BankMachineDUT(Module): + # fill only settings needed by BankMachine + default_controller_settings = dict( + cmd_buffer_depth=8, + cmd_buffer_buffered=False, + with_auto_precharge=True, + ) + default_phy_settings = dict( + cwl=2, + nphases=2, + nranks=1, + # indirectyl + memtype="DDR2", + dfi_databits=2*16, + ) + default_geom_settings = dict( + bankbits=3, + rowbits=13, + colbits=10, + ) + default_timing_settings = dict( + tRAS=None, + tRC=None, + tCCD=1, + tRCD=2, + tRP=2, + tWR=2, + ) + + def __init__(self, n, controller_settings=None, phy_settings=None, geom_settings=None, + timing_settings=None): + # update settings if provided + def updated(settings, update): + copy = settings.copy() + copy.update(update or {}) + return copy + + controller_settings = updated(self.default_controller_settings, controller_settings) + phy_settings = updated(self.default_phy_settings, phy_settings) + geom_settings = updated(self.default_geom_settings, geom_settings) + timing_settings = updated(self.default_timing_settings, timing_settings) + + class SimpleSettings(Settings): + def __init__(self, **kwargs): + self.set_attributes(kwargs) + + settings = SimpleSettings(**controller_settings) + settings.phy = SimpleSettings(**phy_settings) + settings.geom = SimpleSettings(**geom_settings) + settings.timing = SimpleSettings(**timing_settings) + settings.geom.addressbits = max(settings.geom.rowbits, settings.geom.colbits) + self.settings = settings + + self.address_align = log2_int(burst_lengths[settings.phy.memtype]) + self.address_width = LiteDRAMInterface(self.address_align, settings).address_width + + bankmachine = BankMachine(n=n, address_width=self.address_width, + address_align=self.address_align, + nranks=settings.phy.nranks, settings=settings) + self.submodules.bankmachine = bankmachine + + def get_cmd(self): + # cmd_request_rw_layout -> name + layout = [name for name, _ in cmd_request_rw_layout(a=self.settings.geom.addressbits, + ba=self.settings.geom.bankbits)] + request = {} + for name in layout + ["valid", "ready", "first", "last"]: + request[name] = (yield getattr(self.bankmachine.cmd, name)) + request["type"] = { + (0, 0, 0): "nop", + (1, 0, 1): "write", + (1, 0, 0): "read", + (0, 1, 0): "activate", + (0, 1, 1): "precharge", + (1, 1, 0): "refresh", + }[(request["cas"], request["ras"], request["we"])] + return request + + def req_address(self, row, col): + col = col & (2**self.settings.geom.colbits - 1) + row = row & (2**self.settings.geom.rowbits - 1) + split = self.settings.geom.colbits - self.address_align + return (row << split) | col + + +class TestBankMachine(unittest.TestCase): + def test_init(self): + BankMachineDUT(1) + + def bankmachine_commands_test(self, dut, requests, generators=None): + # perform a test by simulating requests producer and return registered commands + commands = [] + + def producer(dut): + for req in requests: + yield dut.bankmachine.req.addr.eq(req["addr"]) + yield dut.bankmachine.req.we.eq(req["we"]) + yield dut.bankmachine.req.valid.eq(1) + yield + while not (yield dut.bankmachine.req.ready): + yield + yield dut.bankmachine.req.valid.eq(0) + for _ in range(req.get("delay", 0)): + yield + + def req_consumer(dut): + for req in requests: + if req["we"]: + signal = dut.bankmachine.req.wdata_ready + else: + signal = dut.bankmachine.req.rdata_valid + while not (yield signal): + yield + yield + + @passive + def cmd_consumer(dut): + while True: + while not (yield dut.bankmachine.cmd.valid): + yield + yield dut.bankmachine.cmd.ready.eq(1) + yield + commands.append((yield from dut.get_cmd())) + yield dut.bankmachine.cmd.ready.eq(0) + yield + + all_generators = [ + producer(dut), + req_consumer(dut), + cmd_consumer(dut), + timeout_generator(50 * len(requests)), + ] + if generators is not None: + all_generators += [g(dut) for g in generators] + run_simulation(dut, all_generators) + return commands + + def test_opens_correct_row(self): + # Verify that the correct row is activated before read/write commands + dut = BankMachineDUT(3) + requests = [ + dict(addr=dut.req_address(row=0xf0, col=0x0d), we=0), + dict(addr=dut.req_address(row=0xd0, col=0x0d), we=1), + ] + commands = self.bankmachine_commands_test(dut=dut, requests=requests) + # commands: activate, read (auto-precharge), activate, write + self.assertEqual(commands[0]["type"], "activate") + self.assertEqual(commands[0]["a"], 0xf0) + self.assertEqual(commands[2]["type"], "activate") + self.assertEqual(commands[2]["a"], 0xd0) + + def test_correct_bank_address(self): + # Verify that `ba` always corresponds to the BankMachine number + for bn in [0, 2, 7]: + with self.subTest(bn=bn): + dut = BankMachineDUT(bn, geom_settings=dict(bankbits=3)) + requests = [dict(addr=0, we=0)] + commands = self.bankmachine_commands_test(dut=dut, requests=requests) + for cmd in commands: + self.assertEqual(cmd["ba"], bn) + + def test_read_write_same_row(self): + # Verify that there is only one activate when working on single row + dut = BankMachineDUT(1) + requests = [ + dict(addr=dut.req_address(row=0xba, col=0xad), we=0), + dict(addr=dut.req_address(row=0xba, col=0xad), we=1), + dict(addr=dut.req_address(row=0xba, col=0xbe), we=0), + dict(addr=dut.req_address(row=0xba, col=0xbe), we=1), + ] + commands = self.bankmachine_commands_test(dut=dut, requests=requests) + commands = [(cmd["type"], cmd["a"]) for cmd in commands] + expected = [ + ("activate", 0xba), + ("read", 0xad << dut.address_align), + ("write", 0xad << dut.address_align), + ("read", 0xbe << dut.address_align), + ("write", 0xbe << dut.address_align), + ] + self.assertEqual(commands, expected) + + def test_write_different_rows_with_delay(self): + # Verify that precharge is used when changing row with a delay + # this is independent form auto-precharge + for auto_precharge in [False, True]: + with self.subTest(auto_precharge=auto_precharge): + settings = dict(with_auto_precharge=auto_precharge) + dut = BankMachineDUT(1, controller_settings=settings) + requests = [ + dict(addr=dut.req_address(row=0xba, col=0xad), we=1, delay=8), + dict(addr=dut.req_address(row=0xda, col=0xad), we=1), + ] + commands = self.bankmachine_commands_test(dut=dut, requests=requests) + commands = [(cmd["type"], cmd["a"]) for cmd in commands] + expected = [ + ("activate", 0xba), + ("write", 0xad << dut.address_align), + ("precharge", 0xad << dut.address_align), + ("activate", 0xda), + ("write", 0xad << dut.address_align), + ] + self.assertEqual(commands, expected) + + def test_write_different_rows_with_auto_precharge(self): + # Verify that auto-precharge is used when changing row without delay + settings = dict(with_auto_precharge=True) + dut = BankMachineDUT(1, controller_settings=settings) + requests = [ + dict(addr=dut.req_address(row=0xba, col=0xad), we=1), + dict(addr=dut.req_address(row=0xda, col=0xad), we=1), + ] + commands = self.bankmachine_commands_test(dut=dut, requests=requests) + commands = [(cmd["type"], cmd["a"]) for cmd in commands] + expected = [ + ("activate", 0xba), + ("write", (0xad << dut.address_align) | (1 << 10)), + ("activate", 0xda), + ("write", 0xad << dut.address_align), + ] + self.assertEqual(commands, expected) + + def test_write_different_rows_without_auto_precharge(self): + # Verify that auto-precharge is used when changing row without delay + settings = dict(with_auto_precharge=False) + dut = BankMachineDUT(1, controller_settings=settings) + requests = [ + dict(addr=dut.req_address(row=0xba, col=0xad), we=1), + dict(addr=dut.req_address(row=0xda, col=0xad), we=1), + ] + commands = self.bankmachine_commands_test(dut=dut, requests=requests) + commands = [(cmd["type"], cmd["a"]) for cmd in commands] + expected = [ + ("activate", 0xba), + ("write", 0xad << dut.address_align), + ("precharge", 0xad << dut.address_align), + ("activate", 0xda), + ("write", 0xad << dut.address_align), + ] + self.assertEqual(commands, expected) + + def test_burst_no_request_lost(self): + # Verify that no request is lost in fast bursts of requests regardless of cmd_buffer_depth + for cmd_buffer_depth in [8]: + settings = dict(cmd_buffer_depth=cmd_buffer_depth) + with self.subTest(**settings): + dut = BankMachineDUT(1, controller_settings=settings) + # long sequence of writes to the same row + requests = [dict(addr=dut.req_address(row=0xba, col=i), we=1) for i in range(32)] + expected = ([("activate", 0xba)] + + [("write", i << dut.address_align) for i in range(32)]) + commands = self.bankmachine_commands_test(dut=dut, requests=requests) + commands = [(cmd["type"], cmd["a"]) for cmd in commands] + self.assertEqual(commands, expected) + + def test_lock_until_requests_finished(self): + # Verify that lock is being held until all requests in FIFO are processed + @passive + def lock_checker(dut): + req = dut.bankmachine.req + self.assertEqual((yield req.lock), 0) + + # wait until first request becomes locked + while not (yield req.valid): + yield + + # wait until lock should be released (all requests in queue gets processed) + # here it happens when the final wdata_ready ends + for _ in range(3): + while not (yield req.wdata_ready): + yield + self.assertEqual((yield req.lock), 1) + yield + + yield + self.assertEqual((yield req.lock), 0) + + dut = BankMachineDUT(1) + # simple sequence with row change + requests = [ + dict(addr=dut.req_address(row=0x1a, col=0x01), we=1), + dict(addr=dut.req_address(row=0x1b, col=0x02), we=1), + dict(addr=dut.req_address(row=0x1c, col=0x04), we=1), + ] + self.bankmachine_commands_test(dut=dut, requests=requests, generators=[lock_checker]) + + def timing_test(self, from_cmd, to_cmd, time_expected, **dut_kwargs): + @passive + def timing_checker(dut): + def is_cmd(cmd_type, test_ready): + cmd = (yield from dut.get_cmd()) + ready = cmd["ready"] if test_ready else True + return cmd["valid"] and ready and cmd["type"] == cmd_type + + # time between WRITE ends (ready and valid) and PRECHARGE becomes valid + while not (yield from is_cmd(from_cmd, test_ready=True)): + yield + yield # wait until cmd deactivates in case the second cmd is the same as first + time = 1 + while not (yield from is_cmd(to_cmd, test_ready=False)): + yield + time += 1 + + self.assertEqual(time, time_expected) + + dut = BankMachineDUT(1, **dut_kwargs) + # simple sequence with row change + requests = [ + dict(addr=dut.req_address(row=0xba, col=0xad), we=1), + dict(addr=dut.req_address(row=0xda, col=0xad), we=1), + ] + self.bankmachine_commands_test(dut=dut, requests=requests, generators=[timing_checker]) + + def test_timing_write_to_precharge(self): + controller_settings = dict(with_auto_precharge=False) + timing_settings = dict(tWR=6, tCCD=4) + phy_settings = dict(cwl=2, nphases=2) + write_latency = math.ceil(phy_settings["cwl"] / phy_settings["nphases"]) + precharge_time = write_latency + timing_settings["tWR"] + timing_settings["tCCD"] + self.timing_test("write", "precharge", precharge_time, + controller_settings=controller_settings, + phy_settings=phy_settings, + timing_settings=timing_settings) + + def test_timing_activate_to_activate(self): + timing_settings = dict(tRC=16) + self.timing_test("activate", "activate", time_expected=16, + timing_settings=timing_settings) + + def test_timing_activate_to_precharge(self): + timing_settings = dict(tRAS=32) + self.timing_test("activate", "precharge", time_expected=32, + timing_settings=timing_settings) + + def test_refresh(self): + # Verify that no commands are issued during refresh and after it the row is re-activated + @passive + def refresh_generator(dut): + # wait some time for the bankmachine to start + for _ in range(16): + yield + + # request a refresh + yield dut.bankmachine.refresh_req.eq(1) + while not (yield dut.bankmachine.refresh_gnt): + yield + + # wait when refresh is being performed + # make sure no command is issued during refresh + for _ in range(32): + self.assertEqual((yield dut.bankmachine.cmd.valid), 0) + yield + + # signalize refresh is ready + yield dut.bankmachine.refresh_req.eq(0) + + dut = BankMachineDUT(1) + requests = [dict(addr=dut.req_address(row=0xba, col=i), we=1) for i in range(16)] + commands = self.bankmachine_commands_test(dut=dut, requests=requests, + generators=[refresh_generator]) + commands = [(cmd["type"], cmd["a"]) for cmd in commands] + # refresh will close row, so bankmachine should re-activate it after refresh + self.assertEqual(commands.count(("activate", 0xba)), 2) + # verify that the write commands are correct + write_commands = [cmd for cmd in commands if cmd[0] == "write"] + expected_writes = [("write", i << dut.address_align) for i in range(16)] + self.assertEqual(write_commands, expected_writes) + + def test_output_annotations(self): + # Verify that all commands are annotated correctly using is_* signals + checked = set() + + @passive + def cmd_checker(dut): + while True: + cmd = (yield from dut.get_cmd()) + if cmd["valid"]: + if cmd["type"] in ["activate", "precharge"]: + self.assertEqual(cmd["is_cmd"], 1) + self.assertEqual(cmd["is_write"], 0) + self.assertEqual(cmd["is_read"], 0) + elif cmd["type"] in ["write"]: + self.assertEqual(cmd["is_cmd"], 0) + self.assertEqual(cmd["is_write"], 1) + self.assertEqual(cmd["is_read"], 0) + elif cmd["type"] in ["read"]: + self.assertEqual(cmd["is_cmd"], 0) + self.assertEqual(cmd["is_write"], 0) + self.assertEqual(cmd["is_read"], 1) + else: + raise ValueError(cmd["type"]) + checked.add(cmd["type"]) + yield + + dut = BankMachineDUT(1) + requests = [ + dict(addr=dut.req_address(row=0xba, col=0xad), we=0), + dict(addr=dut.req_address(row=0xba, col=0xad), we=1), + dict(addr=dut.req_address(row=0xda, col=0xad), we=0), + # wait enough time for regular (not auto) precharge to be used + dict(addr=dut.req_address(row=0xda, col=0xad), we=1, delay=32), + dict(addr=dut.req_address(row=0xba, col=0xad), we=0), + dict(addr=dut.req_address(row=0xba, col=0xad), we=1), + ] + self.bankmachine_commands_test(dut=dut, requests=requests, generators=[cmd_checker]) + # bankmachine does not produce refresh commands + self.assertEqual(checked, {"activate", "precharge", "write", "read"})