# This file is Copyright (c) 2017-2019 Florent Kermarrec # This file is Copyright (c) 2020 Antmicro # License: BSD import unittest from migen import * from litex.soc.interconnect.stream import * from litedram.common import LiteDRAMNativePort, LiteDRAMNativeWritePort, LiteDRAMNativeReadPort from litedram.frontend.adaptation import LiteDRAMNativePortConverter, LiteDRAMNativePortCDC from test.common import * from litex.gen.sim import * class ConverterDUT(Module): def __init__(self, user_data_width, native_data_width, mem_depth, separate_rw=True, read_latency=0): self.separate_rw = separate_rw if separate_rw: self.write_user_port = LiteDRAMNativeWritePort(address_width=32, data_width=user_data_width) self.write_crossbar_port = LiteDRAMNativeWritePort(address_width=32, data_width=native_data_width) self.read_user_port = LiteDRAMNativeReadPort( address_width=32, data_width=user_data_width) self.read_crossbar_port = LiteDRAMNativeReadPort( address_width=32, data_width=native_data_width) self.write_driver = NativePortDriver(self.write_user_port) self.read_driver = NativePortDriver(self.read_user_port) else: self.write_user_port = LiteDRAMNativePort(mode="both", address_width=32, data_width=user_data_width) self.write_crossbar_port = LiteDRAMNativePort(mode="both", address_width=32, data_width=native_data_width) self.write_driver = NativePortDriver(self.write_user_port) self.read_user_port = self.write_user_port self.read_crossbar_port = self.write_crossbar_port self.read_driver = self.write_driver self.driver_generators = [self.write_driver.write_data_handler(), self.read_driver.read_data_handler(latency=read_latency)] # Memory self.memory = DRAMMemory(native_data_width, mem_depth) def do_finalize(self): if self.separate_rw: self.submodules.write_converter = LiteDRAMNativePortConverter( self.write_user_port, self.write_crossbar_port) self.submodules.read_converter = LiteDRAMNativePortConverter( self.read_user_port, self.read_crossbar_port) else: self.submodules.converter = LiteDRAMNativePortConverter( self.write_user_port, self.write_crossbar_port) def read(self, address, wait_data=True): return (yield from self.read_driver.read(address, wait_data=wait_data)) def write(self, address, data, wait_data=True, we=None): data_with_cmd = False if self.write_user_port.data_width > self.write_crossbar_port.data_width: data_with_cmd = True return (yield from self.write_driver.write(address, data, we, wait_data=wait_data, data_with_cmd=data_with_cmd)) class CDCDUT(ConverterDUT): def do_finalize(self): # Change clock domains self.write_user_port.clock_domain = "user" self.read_user_port.clock_domain = "user" self.write_crossbar_port.clock_domain = "native" self.read_crossbar_port.clock_domain = "native" # Add CDC self.submodules.write_converter = LiteDRAMNativePortCDC( port_from = self.write_user_port, port_to = self.write_crossbar_port) self.submodules.read_converter = LiteDRAMNativePortCDC( port_from = self.read_user_port, port_to = self.read_crossbar_port) class TestAdaptation(MemoryTestDataMixin, unittest.TestCase): def test_converter_down_ratio_must_be_integer(self): with self.assertRaises(ValueError) as cm: dut = ConverterDUT(user_data_width=64, native_data_width=24, mem_depth=128) dut.finalize() self.assertIn("ratio must be an int", str(cm.exception).lower()) def test_converter_up_ratio_must_be_integer(self): with self.assertRaises(ValueError) as cm: dut = ConverterDUT(user_data_width=32, native_data_width=48, mem_depth=128) dut.finalize() self.assertIn("ratio must be an int", str(cm.exception).lower()) def converter_readback_test(self, dut, pattern, mem_expected, main_generator=None): assert len(set(adr for adr, _ in pattern)) == len(pattern), "Pattern has duplicates!" if main_generator is None: def main_generator(dut): for adr, data in pattern: yield from dut.write(adr, data) for adr, _ in pattern: yield from dut.read(adr, wait_data=False) # we need to flush after last command in the up-converter case, if the last # command does not fill whole `sel` yield dut.write_user_port.flush.eq(1) yield yield dut.write_user_port.flush.eq(0) yield from dut.write_driver.wait_all() yield from dut.read_driver.wait_all() generators = [ main_generator(dut), *dut.driver_generators, dut.memory.write_handler(dut.write_crossbar_port), dut.memory.read_handler(dut.read_crossbar_port), timeout_generator(1000), ] run_simulation(dut, generators, vcd_name='sim.vcd') self.assertEqual(dut.memory.mem, mem_expected) self.assertEqual(dut.read_driver.rdata, [data for adr, data in pattern]) def converter_test(self, test_data, user_data_width, native_data_width, **kwargs): # for separate_rw in [True, False]: for separate_rw in [False]: with self.subTest(separate_rw=separate_rw): data = self.pattern_test_data[test_data] dut = ConverterDUT(user_data_width=user_data_width, native_data_width=native_data_width, mem_depth=len(data["expected"]), separate_rw=separate_rw, **kwargs) self.converter_readback_test(dut, data["pattern"], data["expected"]) def test_converter_1to1(self): # Verify 64-bit to 64-bit identify-conversion. self.converter_test(test_data="64bit", user_data_width=64, native_data_width=64) def test_converter_2to1(self): # Verify 64-bit to 32-bit down-conversion. self.converter_test(test_data="64bit_to_32bit", user_data_width=64, native_data_width=32) def test_converter_4to1(self): # Verify 32-bit to 8-bit down-conversion. self.converter_test(test_data="32bit_to_8bit", user_data_width=32, native_data_width=8) def test_converter_8to1(self): # Verify 64-bit to 8-bit down-conversion. self.converter_test(test_data="64bit_to_8bit", user_data_width=64, native_data_width=8) def test_converter_1to2(self): # Verify 8-bit to 16-bit up-conversion. self.converter_test(test_data="8bit_to_16bit", user_data_width=8, native_data_width=16) def test_converter_1to4(self): # Verify 32-bit to 128-bit up-conversion. self.converter_test(test_data="32bit_to_128bit", user_data_width=32, native_data_width=128) def test_converter_1to8(self): # Verify 32-bit to 256-bit up-conversion. self.converter_test(test_data="32bit_to_256bit", user_data_width=32, native_data_width=256) def test_converter_up_read_latencies(self): # Verify that up-conversion works with different port reader latencies cases = { "1to2": dict(test_data="8bit_to_16bit", user_data_width=8, native_data_width=16), "1to4": dict(test_data="32bit_to_128bit", user_data_width=32, native_data_width=128), "1to8": dict(test_data="32bit_to_256bit", user_data_width=32, native_data_width=256), } for latency in [0, 1]: with self.subTest(latency=latency): for conversion, kwargs in cases.items(): with self.subTest(conversion=conversion): self.converter_test(**kwargs, read_latency=latency) def test_converter_down_read_latencies(self): # Verify that down-conversion works with different port reader latencies cases = { "2to1": dict(test_data="64bit_to_32bit", user_data_width=64, native_data_width=32), "4to1": dict(test_data="32bit_to_8bit", user_data_width=32, native_data_width=8), "8to1": dict(test_data="64bit_to_8bit", user_data_width=64, native_data_width=8), } for latency in [0, 1]: with self.subTest(latency=latency): for conversion, kwargs in cases.items(): with self.subTest(conversion=conversion): self.converter_test(**kwargs, read_latency=latency) def test_up_converter_write_complete_sequence(self): # Verify up-conversion when master sends full sequences (of `ratio` length) def main_generator(dut): yield from dut.write(0x00, 0x11) # first yield from dut.write(0x01, 0x22) yield from dut.write(0x02, 0x33) yield from dut.write(0x03, 0x44) yield from dut.write(0x04, 0x55) # second yield from dut.write(0x05, 0x66) yield from dut.write(0x06, 0x77) yield from dut.write(0x07, 0x88) yield from dut.write_driver.wait_all() for _ in range(8): # wait for memory yield mem_expected = [ # data address 0x44332211, # 0x00 0x88776655, # 0x04 0x00000000, # 0x08 0x00000000, # 0x0c ] for separate_rw in [False, True]: with self.subTest(separate_rw=separate_rw): dut = ConverterDUT(user_data_width=8, native_data_width=32, mem_depth=len(mem_expected), separate_rw=separate_rw) self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, main_generator=main_generator) def test_up_converter_write_with_manual_flush(self): # Verify that up-conversion writes incomplete data when flushed def main_generator(dut): yield from dut.write(0x00, 0x11, wait_data=False) yield from dut.write(0x01, 0x22, wait_data=False) yield from dut.write(0x02, 0x33, wait_data=False) yield dut.write_user_port.flush.eq(1) yield yield dut.write_user_port.flush.eq(0) yield from dut.write_driver.wait_all() for _ in range(8): # wait for memory yield mem_expected = [ # data address 0x00332211, # 0x00 0x00000000, # 0x04 0x00000000, # 0x08 0x00000000, # 0x0c ] for separate_rw in [False, True]: with self.subTest(separate_rw=separate_rw): dut = ConverterDUT(user_data_width=8, native_data_width=32, mem_depth=len(mem_expected), separate_rw=separate_rw) self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, main_generator=main_generator) def test_up_converter_auto_flush_on_address_change(self): # Verify that up-conversion automatically flushes the cmd if the (shifted) address changes def main_generator(dut): yield from dut.write(0x00, 0x11, wait_data=False) # -> 0x00 yield from dut.write(0x01, 0x22, wait_data=False) # -> 0x00 yield from dut.write(0x02, 0x33, wait_data=False) # -> 0x00 yield from dut.write(0x04, 0x55, wait_data=False) # -> 0x01 yield from dut.write(0x05, 0x66, wait_data=False) # -> 0x01 yield from dut.write(0x06, 0x77, wait_data=False) # -> 0x01 yield from dut.write(0x07, 0x88, wait_data=False) # -> 0x01 yield from dut.write_driver.wait_all() for _ in range(8): # wait for memory yield mem_expected = [ # data address 0x00332211, # 0x00 0x88776655, # 0x04 0x00000000, # 0x08 0x00000000, # 0x0c ] for separate_rw in [False, True]: with self.subTest(separate_rw=separate_rw): dut = ConverterDUT(user_data_width=8, native_data_width=32, mem_depth=len(mem_expected), separate_rw=separate_rw) self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, main_generator=main_generator) @unittest.skip("Read after write not yet synchronised enough") def test_up_converter_auto_flush_on_cmd_we_change(self): # Verify that up-conversion automatically flushes the cmd when command type (write/read) changes def main_generator(dut): yield from dut.write(0x00, 0x11, wait_data=False) yield from dut.write(0x01, 0x22, wait_data=False) yield from dut.write(0x02, 0x33, wait_data=False) yield from dut.write(0x03, 0x44, wait_data=False) yield from dut.write(0x04, 0x55, wait_data=False) yield from dut.write(0x05, 0x66, wait_data=False) yield from dut.read (0x00) yield from dut.read (0x01) yield from dut.read (0x02) yield from dut.read (0x03) yield from dut.read (0x04) yield from dut.read (0x05) yield from dut.read (0x06) yield from dut.read (0x07) yield from dut.write_driver.wait_all() yield from dut.read_driver.wait_all() for _ in range(8): # wait for memory yield mem_expected = [ # data address 0x44332211, # 0x00 0x00006655, # 0x04 0x00000000, # 0x08 0x00000000, # 0x0c ] pattern = [ (0x00, mem_expected[0]), (0x01, mem_expected[1]), ] for separate_rw in [False, True]: with self.subTest(separate_rw=separate_rw): dut = ConverterDUT(user_data_width=8, native_data_width=32, mem_depth=len(mem_expected), separate_rw=separate_rw) self.converter_readback_test(dut, pattern=pattern, mem_expected=mem_expected, main_generator=main_generator) def test_up_converter_write_with_gap(self): # Verify that the up-converter can mask data properly when sending non-sequential writes def main_generator(dut): yield from dut.write(0x00, 0x11, wait_data=False) yield from dut.write(0x02, 0x22, wait_data=False) yield from dut.write(0x03, 0x33, wait_data=False) yield dut.write_user_port.flush.eq(1) yield yield dut.write_user_port.flush.eq(0) yield from dut.write_driver.wait_all() for _ in range(8): # wait for memory yield mem_expected = [ # data, address 0x33220011, # 0x00 0x00000000, # 0x04 0x00000000, # 0x08 0x00000000, # 0x0c ] for separate_rw in [True, False]: with self.subTest(separate_rw=separate_rw): dut = ConverterDUT(user_data_width=8, native_data_width=32, mem_depth=len(mem_expected), separate_rw=separate_rw) self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, main_generator=main_generator) def test_converter_up_not_aligned(self): data = self.pattern_test_data["8bit_to_32bit_not_aligned"] dut = ConverterDUT(user_data_width=8, native_data_width=32, mem_depth=len(data["expected"]), separate_rw=False) self.converter_readback_test(dut, data["pattern"], data["expected"]) def cdc_readback_test(self, dut, pattern, mem_expected, clocks): assert len(set(adr for adr, _ in pattern)) == len(pattern), "Pattern has duplicates!" read_data = [] @passive def read_handler(read_port): yield read_port.rdata.ready.eq(1) while True: if (yield read_port.rdata.valid): read_data.append((yield read_port.rdata.data)) yield def main_generator(dut, pattern): for adr, data in pattern: yield from dut.write(adr, data) for adr, _ in pattern: yield from dut.read(adr, wait_data=False) yield from dut.write_driver.wait_all() yield from dut.read_driver.wait_all() generators = { "user": [ main_generator(dut, pattern), read_handler(dut.read_user_port), *dut.driver_generators, timeout_generator(5000), ], "native": [ dut.memory.write_handler(dut.write_crossbar_port), dut.memory.read_handler(dut.read_crossbar_port), ], } run_simulation(dut, generators, clocks) self.assertEqual(dut.memory.mem, mem_expected) self.assertEqual(read_data, [data for adr, data in pattern]) def test_port_cdc_same_clocks(self): # Verify CDC with same clocks (frequency and phase). data = self.pattern_test_data["32bit"] dut = CDCDUT(user_data_width=32, native_data_width=32, mem_depth=len(data["expected"])) clocks = { "user": 10, "native": (7, 3), } self.cdc_readback_test(dut, data["pattern"], data["expected"], clocks=clocks) def test_port_cdc_different_period(self): # Verify CDC with different clock frequencies. data = self.pattern_test_data["32bit"] dut = CDCDUT(user_data_width=32, native_data_width=32, mem_depth=len(data["expected"])) clocks = { "user": 10, "native": 7, } self.cdc_readback_test(dut, data["pattern"], data["expected"], clocks=clocks) def test_port_cdc_out_of_phase(self): # Verify CDC with different clock phases. data = self.pattern_test_data["32bit"] dut = CDCDUT(user_data_width=32, native_data_width=32, mem_depth=len(data["expected"])) clocks = { "user": 10, "native": (7, 3), } self.cdc_readback_test(dut, data["pattern"], data["expected"], clocks=clocks)