litedram/test/test_bankmachine.py
2020-04-07 11:58:56 +02:00

422 lines
18 KiB
Python

# This file is Copyright (c) 2020 Antmicro <www.antmicro.com>
# License: BSD
import math
import unittest
from migen import *
# from litex.soc.interconnect import stream
from litedram.common import *
from litedram.core.bankmachine import BankMachine
from test.common import timeout_generator
class BankMachineDUT(Module):
# fill only settings needed by BankMachine
default_controller_settings = dict(
cmd_buffer_depth=8,
cmd_buffer_buffered=False,
with_auto_precharge=True,
)
default_phy_settings = dict(
cwl=2,
nphases=2,
nranks=1,
# indirectyl
memtype="DDR2",
dfi_databits=2*16,
)
default_geom_settings = dict(
bankbits=3,
rowbits=13,
colbits=10,
)
default_timing_settings = dict(
tRAS=None,
tRC=None,
tCCD=1,
tRCD=2,
tRP=2,
tWR=2,
)
def __init__(self, n, controller_settings=None, phy_settings=None, geom_settings=None,
timing_settings=None):
# update settings if provided
def updated(settings, update):
copy = settings.copy()
copy.update(update or {})
return copy
controller_settings = updated(self.default_controller_settings, controller_settings)
phy_settings = updated(self.default_phy_settings, phy_settings)
geom_settings = updated(self.default_geom_settings, geom_settings)
timing_settings = updated(self.default_timing_settings, timing_settings)
class SimpleSettings(Settings):
def __init__(self, **kwargs):
self.set_attributes(kwargs)
settings = SimpleSettings(**controller_settings)
settings.phy = SimpleSettings(**phy_settings)
settings.geom = SimpleSettings(**geom_settings)
settings.timing = SimpleSettings(**timing_settings)
settings.geom.addressbits = max(settings.geom.rowbits, settings.geom.colbits)
self.settings = settings
self.address_align = log2_int(burst_lengths[settings.phy.memtype])
self.address_width = LiteDRAMInterface(self.address_align, settings).address_width
bankmachine = BankMachine(n=n, address_width=self.address_width,
address_align=self.address_align,
nranks=settings.phy.nranks, settings=settings)
self.submodules.bankmachine = bankmachine
def get_cmd(self):
# cmd_request_rw_layout -> name
layout = [name for name, _ in cmd_request_rw_layout(a=self.settings.geom.addressbits,
ba=self.settings.geom.bankbits)]
request = {}
for name in layout + ["valid", "ready", "first", "last"]:
request[name] = (yield getattr(self.bankmachine.cmd, name))
request["type"] = {
(0, 0, 0): "nop",
(1, 0, 1): "write",
(1, 0, 0): "read",
(0, 1, 0): "activate",
(0, 1, 1): "precharge",
(1, 1, 0): "refresh",
}[(request["cas"], request["ras"], request["we"])]
return request
def req_address(self, row, col):
col = col & (2**self.settings.geom.colbits - 1)
row = row & (2**self.settings.geom.rowbits - 1)
split = self.settings.geom.colbits - self.address_align
return (row << split) | col
class TestBankMachine(unittest.TestCase):
def test_init(self):
BankMachineDUT(1)
def bankmachine_commands_test(self, dut, requests, generators=None):
# perform a test by simulating requests producer and return registered commands
commands = []
def producer(dut):
for req in requests:
yield dut.bankmachine.req.addr.eq(req["addr"])
yield dut.bankmachine.req.we.eq(req["we"])
yield dut.bankmachine.req.valid.eq(1)
yield
while not (yield dut.bankmachine.req.ready):
yield
yield dut.bankmachine.req.valid.eq(0)
for _ in range(req.get("delay", 0)):
yield
def req_consumer(dut):
for req in requests:
if req["we"]:
signal = dut.bankmachine.req.wdata_ready
else:
signal = dut.bankmachine.req.rdata_valid
while not (yield signal):
yield
yield
@passive
def cmd_consumer(dut):
while True:
while not (yield dut.bankmachine.cmd.valid):
yield
yield dut.bankmachine.cmd.ready.eq(1)
yield
commands.append((yield from dut.get_cmd()))
yield dut.bankmachine.cmd.ready.eq(0)
yield
all_generators = [
producer(dut),
req_consumer(dut),
cmd_consumer(dut),
timeout_generator(50 * len(requests)),
]
if generators is not None:
all_generators += [g(dut) for g in generators]
run_simulation(dut, all_generators)
return commands
def test_opens_correct_row(self):
# Verify that the correct row is activated before read/write commands
dut = BankMachineDUT(3)
requests = [
dict(addr=dut.req_address(row=0xf0, col=0x0d), we=0),
dict(addr=dut.req_address(row=0xd0, col=0x0d), we=1),
]
commands = self.bankmachine_commands_test(dut=dut, requests=requests)
# commands: activate, read (auto-precharge), activate, write
self.assertEqual(commands[0]["type"], "activate")
self.assertEqual(commands[0]["a"], 0xf0)
self.assertEqual(commands[2]["type"], "activate")
self.assertEqual(commands[2]["a"], 0xd0)
def test_correct_bank_address(self):
# Verify that `ba` always corresponds to the BankMachine number
for bn in [0, 2, 7]:
with self.subTest(bn=bn):
dut = BankMachineDUT(bn, geom_settings=dict(bankbits=3))
requests = [dict(addr=0, we=0)]
commands = self.bankmachine_commands_test(dut=dut, requests=requests)
for cmd in commands:
self.assertEqual(cmd["ba"], bn)
def test_read_write_same_row(self):
# Verify that there is only one activate when working on single row
dut = BankMachineDUT(1)
requests = [
dict(addr=dut.req_address(row=0xba, col=0xad), we=0),
dict(addr=dut.req_address(row=0xba, col=0xad), we=1),
dict(addr=dut.req_address(row=0xba, col=0xbe), we=0),
dict(addr=dut.req_address(row=0xba, col=0xbe), we=1),
]
commands = self.bankmachine_commands_test(dut=dut, requests=requests)
commands = [(cmd["type"], cmd["a"]) for cmd in commands]
expected = [
("activate", 0xba),
("read", 0xad << dut.address_align),
("write", 0xad << dut.address_align),
("read", 0xbe << dut.address_align),
("write", 0xbe << dut.address_align),
]
self.assertEqual(commands, expected)
def test_write_different_rows_with_delay(self):
# Verify that precharge is used when changing row with a delay
# this is independent form auto-precharge
for auto_precharge in [False, True]:
with self.subTest(auto_precharge=auto_precharge):
settings = dict(with_auto_precharge=auto_precharge)
dut = BankMachineDUT(1, controller_settings=settings)
requests = [
dict(addr=dut.req_address(row=0xba, col=0xad), we=1, delay=8),
dict(addr=dut.req_address(row=0xda, col=0xad), we=1),
]
commands = self.bankmachine_commands_test(dut=dut, requests=requests)
commands = [(cmd["type"], cmd["a"]) for cmd in commands]
expected = [
("activate", 0xba),
("write", 0xad << dut.address_align),
("precharge", 0xad << dut.address_align),
("activate", 0xda),
("write", 0xad << dut.address_align),
]
self.assertEqual(commands, expected)
def test_write_different_rows_with_auto_precharge(self):
# Verify that auto-precharge is used when changing row without delay
settings = dict(with_auto_precharge=True)
dut = BankMachineDUT(1, controller_settings=settings)
requests = [
dict(addr=dut.req_address(row=0xba, col=0xad), we=1),
dict(addr=dut.req_address(row=0xda, col=0xad), we=1),
]
commands = self.bankmachine_commands_test(dut=dut, requests=requests)
commands = [(cmd["type"], cmd["a"]) for cmd in commands]
expected = [
("activate", 0xba),
("write", (0xad << dut.address_align) | (1 << 10)),
("activate", 0xda),
("write", 0xad << dut.address_align),
]
self.assertEqual(commands, expected)
def test_write_different_rows_without_auto_precharge(self):
# Verify that auto-precharge is used when changing row without delay
settings = dict(with_auto_precharge=False)
dut = BankMachineDUT(1, controller_settings=settings)
requests = [
dict(addr=dut.req_address(row=0xba, col=0xad), we=1),
dict(addr=dut.req_address(row=0xda, col=0xad), we=1),
]
commands = self.bankmachine_commands_test(dut=dut, requests=requests)
commands = [(cmd["type"], cmd["a"]) for cmd in commands]
expected = [
("activate", 0xba),
("write", 0xad << dut.address_align),
("precharge", 0xad << dut.address_align),
("activate", 0xda),
("write", 0xad << dut.address_align),
]
self.assertEqual(commands, expected)
def test_burst_no_request_lost(self):
# Verify that no request is lost in fast bursts of requests regardless of cmd_buffer_depth
for cmd_buffer_depth in [8]:
settings = dict(cmd_buffer_depth=cmd_buffer_depth)
with self.subTest(**settings):
dut = BankMachineDUT(1, controller_settings=settings)
# long sequence of writes to the same row
requests = [dict(addr=dut.req_address(row=0xba, col=i), we=1) for i in range(32)]
expected = ([("activate", 0xba)] +
[("write", i << dut.address_align) for i in range(32)])
commands = self.bankmachine_commands_test(dut=dut, requests=requests)
commands = [(cmd["type"], cmd["a"]) for cmd in commands]
self.assertEqual(commands, expected)
def test_lock_until_requests_finished(self):
# Verify that lock is being held until all requests in FIFO are processed
@passive
def lock_checker(dut):
req = dut.bankmachine.req
self.assertEqual((yield req.lock), 0)
# wait until first request becomes locked
while not (yield req.valid):
yield
# wait until lock should be released (all requests in queue gets processed)
# here it happens when the final wdata_ready ends
for _ in range(3):
while not (yield req.wdata_ready):
yield
self.assertEqual((yield req.lock), 1)
yield
yield
self.assertEqual((yield req.lock), 0)
dut = BankMachineDUT(1)
# simple sequence with row change
requests = [
dict(addr=dut.req_address(row=0x1a, col=0x01), we=1),
dict(addr=dut.req_address(row=0x1b, col=0x02), we=1),
dict(addr=dut.req_address(row=0x1c, col=0x04), we=1),
]
self.bankmachine_commands_test(dut=dut, requests=requests, generators=[lock_checker])
def timing_test(self, from_cmd, to_cmd, time_expected, **dut_kwargs):
@passive
def timing_checker(dut):
def is_cmd(cmd_type, test_ready):
cmd = (yield from dut.get_cmd())
ready = cmd["ready"] if test_ready else True
return cmd["valid"] and ready and cmd["type"] == cmd_type
# time between WRITE ends (ready and valid) and PRECHARGE becomes valid
while not (yield from is_cmd(from_cmd, test_ready=True)):
yield
yield # wait until cmd deactivates in case the second cmd is the same as first
time = 1
while not (yield from is_cmd(to_cmd, test_ready=False)):
yield
time += 1
self.assertEqual(time, time_expected)
dut = BankMachineDUT(1, **dut_kwargs)
# simple sequence with row change
requests = [
dict(addr=dut.req_address(row=0xba, col=0xad), we=1),
dict(addr=dut.req_address(row=0xda, col=0xad), we=1),
]
self.bankmachine_commands_test(dut=dut, requests=requests, generators=[timing_checker])
def test_timing_write_to_precharge(self):
controller_settings = dict(with_auto_precharge=False)
timing_settings = dict(tWR=6, tCCD=4)
phy_settings = dict(cwl=2, nphases=2)
write_latency = math.ceil(phy_settings["cwl"] / phy_settings["nphases"])
precharge_time = write_latency + timing_settings["tWR"] + timing_settings["tCCD"]
self.timing_test("write", "precharge", precharge_time,
controller_settings=controller_settings,
phy_settings=phy_settings,
timing_settings=timing_settings)
def test_timing_activate_to_activate(self):
timing_settings = dict(tRC=16)
self.timing_test("activate", "activate", time_expected=16,
timing_settings=timing_settings)
def test_timing_activate_to_precharge(self):
timing_settings = dict(tRAS=32)
self.timing_test("activate", "precharge", time_expected=32,
timing_settings=timing_settings)
def test_refresh(self):
# Verify that no commands are issued during refresh and after it the row is re-activated
@passive
def refresh_generator(dut):
# wait some time for the bankmachine to start
for _ in range(16):
yield
# request a refresh
yield dut.bankmachine.refresh_req.eq(1)
while not (yield dut.bankmachine.refresh_gnt):
yield
# wait when refresh is being performed
# make sure no command is issued during refresh
for _ in range(32):
self.assertEqual((yield dut.bankmachine.cmd.valid), 0)
yield
# signalize refresh is ready
yield dut.bankmachine.refresh_req.eq(0)
dut = BankMachineDUT(1)
requests = [dict(addr=dut.req_address(row=0xba, col=i), we=1) for i in range(16)]
commands = self.bankmachine_commands_test(dut=dut, requests=requests,
generators=[refresh_generator])
commands = [(cmd["type"], cmd["a"]) for cmd in commands]
# refresh will close row, so bankmachine should re-activate it after refresh
self.assertEqual(commands.count(("activate", 0xba)), 2)
# verify that the write commands are correct
write_commands = [cmd for cmd in commands if cmd[0] == "write"]
expected_writes = [("write", i << dut.address_align) for i in range(16)]
self.assertEqual(write_commands, expected_writes)
def test_output_annotations(self):
# Verify that all commands are annotated correctly using is_* signals
checked = set()
@passive
def cmd_checker(dut):
while True:
cmd = (yield from dut.get_cmd())
if cmd["valid"]:
if cmd["type"] in ["activate", "precharge"]:
self.assertEqual(cmd["is_cmd"], 1)
self.assertEqual(cmd["is_write"], 0)
self.assertEqual(cmd["is_read"], 0)
elif cmd["type"] in ["write"]:
self.assertEqual(cmd["is_cmd"], 0)
self.assertEqual(cmd["is_write"], 1)
self.assertEqual(cmd["is_read"], 0)
elif cmd["type"] in ["read"]:
self.assertEqual(cmd["is_cmd"], 0)
self.assertEqual(cmd["is_write"], 0)
self.assertEqual(cmd["is_read"], 1)
else:
raise ValueError(cmd["type"])
checked.add(cmd["type"])
yield
dut = BankMachineDUT(1)
requests = [
dict(addr=dut.req_address(row=0xba, col=0xad), we=0),
dict(addr=dut.req_address(row=0xba, col=0xad), we=1),
dict(addr=dut.req_address(row=0xda, col=0xad), we=0),
# wait enough time for regular (not auto) precharge to be used
dict(addr=dut.req_address(row=0xda, col=0xad), we=1, delay=32),
dict(addr=dut.req_address(row=0xba, col=0xad), we=0),
dict(addr=dut.req_address(row=0xba, col=0xad), we=1),
]
self.bankmachine_commands_test(dut=dut, requests=requests, generators=[cmd_checker])
# bankmachine does not produce refresh commands
self.assertEqual(checked, {"activate", "precharge", "write", "read"})