diff --git a/litedram/core/crossbar.py b/litedram/core/crossbar.py index f75ab96..582e003 100644 --- a/litedram/core/crossbar.py +++ b/litedram/core/crossbar.py @@ -15,7 +15,7 @@ from litex.soc.interconnect import stream from litedram.common import * from litedram.core.controller import * -from litedram.frontend.adaptation import * +from litedram.frontend.adapter import * # LiteDRAMCrossbar --------------------------------------------------------------------------------- diff --git a/litedram/frontend/adaptation.py b/litedram/frontend/adapter.py similarity index 98% rename from litedram/frontend/adaptation.py rename to litedram/frontend/adapter.py index eec154a..1c3fa5f 100644 --- a/litedram/frontend/adaptation.py +++ b/litedram/frontend/adapter.py @@ -1,4 +1,5 @@ -# This file is Copyright (c) 2016-2019 Florent Kermarrec +# This file is Copyright (c) 2016-2020 Florent Kermarrec +# This file is Copyright (c) 2020 Antmicro # License: BSD from migen import * @@ -26,8 +27,7 @@ class LiteDRAMNativePortCDC(Module): # # # - cmd_fifo = stream.AsyncFIFO( - [("we", 1), ("addr", address_width)], cmd_depth) + cmd_fifo = stream.AsyncFIFO([("we", 1), ("addr", address_width)], cmd_depth) cmd_fifo = ClockDomainsRenamer( {"write": clock_domain_from, "read": clock_domain_to})(cmd_fifo) diff --git a/litedram/frontend/wishbone.py b/litedram/frontend/wishbone.py index 911a56d..6a96aec 100644 --- a/litedram/frontend/wishbone.py +++ b/litedram/frontend/wishbone.py @@ -9,7 +9,7 @@ from migen import * from litex.soc.interconnect import stream from litedram.common import LiteDRAMNativePort -from litedram.frontend.adaptation import LiteDRAMNativePortConverter +from litedram.frontend.adapter import LiteDRAMNativePortConverter # LiteDRAMWishbone2Native -------------------------------------------------------------------------- diff --git a/test/test_adaptation.py b/test/test_adaptation.py index 455b781..7c2722b 100644 --- a/test/test_adaptation.py +++ b/test/test_adaptation.py @@ -9,7 +9,7 @@ from migen import * from litex.soc.interconnect.stream import * from litedram.common import LiteDRAMNativePort, LiteDRAMNativeWritePort, LiteDRAMNativeReadPort -from litedram.frontend.adaptation import LiteDRAMNativePortConverter, LiteDRAMNativePortCDC +from litedram.frontend.adapter import LiteDRAMNativePortConverter, LiteDRAMNativePortCDC from test.common import * @@ -76,7 +76,7 @@ class CDCDUT(ConverterDUT): port_to = self.read_crossbar_port) -class TestAdaptation(MemoryTestDataMixin, unittest.TestCase): +class TestAdapter(MemoryTestDataMixin, unittest.TestCase): def test_down_converter_ratio_must_be_integer(self): with self.assertRaises(ValueError) as cm: dut = ConverterDUT(user_data_width=64, native_data_width=24, mem_depth=128) diff --git a/test/test_adapter.py b/test/test_adapter.py new file mode 100644 index 0000000..7c2722b --- /dev/null +++ b/test/test_adapter.py @@ -0,0 +1,405 @@ +# This file is Copyright (c) 2017-2019 Florent Kermarrec +# This file is Copyright (c) 2020 Antmicro +# License: BSD + +import unittest + +from migen import * + +from litex.soc.interconnect.stream import * + +from litedram.common import LiteDRAMNativePort, LiteDRAMNativeWritePort, LiteDRAMNativeReadPort +from litedram.frontend.adapter import LiteDRAMNativePortConverter, LiteDRAMNativePortCDC + +from test.common import * + +from litex.gen.sim import * + + +class ConverterDUT(Module): + def __init__(self, user_data_width, native_data_width, mem_depth, separate_rw=True, read_latency=0): + self.separate_rw = separate_rw + if separate_rw: + self.write_user_port = LiteDRAMNativeWritePort(address_width=32, data_width=user_data_width) + self.write_crossbar_port = LiteDRAMNativeWritePort(address_width=32, data_width=native_data_width) + self.read_user_port = LiteDRAMNativeReadPort( address_width=32, data_width=user_data_width) + self.read_crossbar_port = LiteDRAMNativeReadPort( address_width=32, data_width=native_data_width) + self.write_driver = NativePortDriver(self.write_user_port) + self.read_driver = NativePortDriver(self.read_user_port) + else: + self.write_user_port = LiteDRAMNativePort(mode="both", address_width=32, data_width=user_data_width) + self.write_crossbar_port = LiteDRAMNativePort(mode="both", address_width=32, data_width=native_data_width) + self.write_driver = NativePortDriver(self.write_user_port) + self.read_user_port = self.write_user_port + self.read_crossbar_port = self.write_crossbar_port + self.read_driver = self.write_driver + + self.driver_generators = [self.write_driver.write_data_handler(), + self.read_driver.read_data_handler(latency=read_latency)] + + # Memory + self.memory = DRAMMemory(native_data_width, mem_depth) + + def do_finalize(self): + if self.separate_rw: + self.submodules.write_converter = LiteDRAMNativePortConverter( + self.write_user_port, self.write_crossbar_port) + self.submodules.read_converter = LiteDRAMNativePortConverter( + self.read_user_port, self.read_crossbar_port) + else: + self.submodules.converter = LiteDRAMNativePortConverter( + self.write_user_port, self.write_crossbar_port) + + def read(self, address, **kwargs): + return (yield from self.read_driver.read(address, **kwargs)) + + def write(self, address, data, **kwargs): + if self.write_user_port.data_width > self.write_crossbar_port.data_width: + kwargs["data_with_cmd"] = True + return (yield from self.write_driver.write(address, data, **kwargs)) + + +class CDCDUT(ConverterDUT): + def do_finalize(self): + # Change clock domains + self.write_user_port.clock_domain = "user" + self.read_user_port.clock_domain = "user" + self.write_crossbar_port.clock_domain = "native" + self.read_crossbar_port.clock_domain = "native" + + # Add CDC + self.submodules.write_converter = LiteDRAMNativePortCDC( + port_from = self.write_user_port, + port_to = self.write_crossbar_port) + self.submodules.read_converter = LiteDRAMNativePortCDC( + port_from = self.read_user_port, + port_to = self.read_crossbar_port) + + +class TestAdapter(MemoryTestDataMixin, unittest.TestCase): + def test_down_converter_ratio_must_be_integer(self): + with self.assertRaises(ValueError) as cm: + dut = ConverterDUT(user_data_width=64, native_data_width=24, mem_depth=128) + dut.finalize() + self.assertIn("ratio must be an int", str(cm.exception).lower()) + + def test_up_converter_ratio_must_be_integer(self): + with self.assertRaises(ValueError) as cm: + dut = ConverterDUT(user_data_width=32, native_data_width=48, mem_depth=128) + dut.finalize() + self.assertIn("ratio must be an int", str(cm.exception).lower()) + + def converter_readback_test(self, dut, pattern, mem_expected, main_generator=None): + assert len(set(adr for adr, _ in pattern)) == len(pattern), "Pattern has duplicates!" + + if main_generator is None: + def main_generator(dut): + for adr, data in pattern: + yield from dut.write(adr, data) + + for adr, _ in pattern[:-1]: + yield from dut.read(adr, wait_data=False) + # use cmd.last to indicate last command in the sequence + # this is needed for the cases in up-converter when it cannot be deduced + # that port_to.cmd should be sent + adr, _ = pattern[-1] + yield from dut.read(adr, wait_data=False, last=1) + + yield from dut.write_driver.wait_all() + yield from dut.read_driver.wait_all() + + generators = [ + main_generator(dut), + *dut.driver_generators, + dut.memory.write_handler(dut.write_crossbar_port), + dut.memory.read_handler(dut.read_crossbar_port), + timeout_generator(1000), + ] + run_simulation(dut, generators, vcd_name='sim.vcd') + self.assertEqual(dut.memory.mem, mem_expected) + self.assertEqual(dut.read_driver.rdata, [data for adr, data in pattern]) + + def converter_test(self, test_data, user_data_width, native_data_width, **kwargs): + for separate_rw in [True, False]: + with self.subTest(separate_rw=separate_rw): + data = self.pattern_test_data[test_data] + dut = ConverterDUT(user_data_width=user_data_width, native_data_width=native_data_width, + mem_depth=len(data["expected"]), separate_rw=separate_rw, **kwargs) + self.converter_readback_test(dut, data["pattern"], data["expected"]) + + def test_converter_1to1(self): + # Verify 64-bit to 64-bit identify-conversion. + self.converter_test(test_data="64bit", user_data_width=64, native_data_width=64) + + def test_converter_2to1(self): + # Verify 64-bit to 32-bit down-conversion. + self.converter_test(test_data="64bit_to_32bit", user_data_width=64, native_data_width=32) + + def test_converter_4to1(self): + # Verify 32-bit to 8-bit down-conversion. + self.converter_test(test_data="32bit_to_8bit", user_data_width=32, native_data_width=8) + + def test_converter_8to1(self): + # Verify 64-bit to 8-bit down-conversion. + self.converter_test(test_data="64bit_to_8bit", user_data_width=64, native_data_width=8) + + def test_converter_1to2(self): + # Verify 8-bit to 16-bit up-conversion. + self.converter_test(test_data="8bit_to_16bit", user_data_width=8, native_data_width=16) + + def test_converter_1to4(self): + # Verify 32-bit to 128-bit up-conversion. + self.converter_test(test_data="32bit_to_128bit", user_data_width=32, native_data_width=128) + + def test_converter_1to8(self): + # Verify 32-bit to 256-bit up-conversion. + self.converter_test(test_data="32bit_to_256bit", user_data_width=32, native_data_width=256) + + def test_up_converter_read_latencies(self): + # Verify that up-conversion works with different port reader latencies + cases = { + "1to2": dict(test_data="8bit_to_16bit", user_data_width=8, native_data_width=16), + "1to4": dict(test_data="32bit_to_128bit", user_data_width=32, native_data_width=128), + "1to8": dict(test_data="32bit_to_256bit", user_data_width=32, native_data_width=256), + } + for latency in [0, 1]: + with self.subTest(latency=latency): + for conversion, kwargs in cases.items(): + with self.subTest(conversion=conversion): + self.converter_test(**kwargs, read_latency=latency) + + def test_down_converter_read_latencies(self): + # Verify that down-conversion works with different port reader latencies + cases = { + "2to1": dict(test_data="64bit_to_32bit", user_data_width=64, native_data_width=32), + "4to1": dict(test_data="32bit_to_8bit", user_data_width=32, native_data_width=8), + "8to1": dict(test_data="64bit_to_8bit", user_data_width=64, native_data_width=8), + } + for latency in [0, 1]: + with self.subTest(latency=latency): + for conversion, kwargs in cases.items(): + with self.subTest(conversion=conversion): + self.converter_test(**kwargs, read_latency=latency) + + def test_up_converter_write_complete_sequence(self): + # Verify up-conversion when master sends full sequences (of `ratio` length) + def main_generator(dut): + yield from dut.write(0x00, 0x11) # first + yield from dut.write(0x01, 0x22) + yield from dut.write(0x02, 0x33) + yield from dut.write(0x03, 0x44) + yield from dut.write(0x04, 0x55) # second + yield from dut.write(0x05, 0x66) + yield from dut.write(0x06, 0x77) + yield from dut.write(0x07, 0x88) + + yield from dut.write_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data address + 0x44332211, # 0x00 + 0x88776655, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + + for separate_rw in [True, False]: + with self.subTest(separate_rw=separate_rw): + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=separate_rw) + self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, + main_generator=main_generator) + + def test_up_converter_write_with_manual_flush(self): + # Verify that up-conversion writes incomplete data when it receives cmd.last + def main_generator(dut): + yield from dut.write(0x00, 0x11, wait_data=False) + yield from dut.write(0x01, 0x22, wait_data=False) + yield from dut.write(0x02, 0x33, wait_data=False, last=1) + + yield from dut.write_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data address + 0x00332211, # 0x00 + 0x00000000, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + + for separate_rw in [True, False]: + with self.subTest(separate_rw=separate_rw): + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=separate_rw) + self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, + main_generator=main_generator) + + def test_up_converter_auto_flush_on_address_change(self): + # Verify that up-conversion automatically flushes the cmd if the (shifted) address changes + def main_generator(dut): + yield from dut.write(0x00, 0x11, wait_data=False) # -> 0x00 + yield from dut.write(0x01, 0x22, wait_data=False) # -> 0x00 + yield from dut.write(0x02, 0x33, wait_data=False) # -> 0x00 + yield from dut.write(0x04, 0x55, wait_data=False) # -> 0x01 + yield from dut.write(0x05, 0x66, wait_data=False) # -> 0x01 + yield from dut.write(0x06, 0x77, wait_data=False) # -> 0x01 + yield from dut.write(0x07, 0x88, wait_data=False) # -> 0x01 + + yield from dut.write_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data address + 0x00332211, # 0x00 + 0x88776655, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + + + for separate_rw in [True, False]: + with self.subTest(separate_rw=separate_rw): + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=separate_rw) + self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, + main_generator=main_generator) + + def test_up_converter_auto_flush_on_cmd_we_change(self): + # Verify that up-conversion automatically flushes the cmd when command type (write/read) changes + def main_generator(dut): + yield from dut.write(0x00, 0x11, wait_data=False) + yield from dut.write(0x01, 0x22, wait_data=False) + yield from dut.read(0x00, wait_data=False) + yield from dut.read(0x01, wait_data=False) + yield from dut.read(0x02, wait_data=False) + yield from dut.read(0x03, wait_data=False) + + yield from dut.write_driver.wait_all() + yield from dut.read_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data address + 0x00002211, # 0x00 + 0x00000000, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + pattern = [ + (0x00, 0x11), + (0x01, 0x22), + (0x02, 0x00), + (0x03, 0x00), + ] + + # with separate_rw=True we will fail because read will happen before write completes + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=False) + self.converter_readback_test(dut, pattern=pattern, mem_expected=mem_expected, + main_generator=main_generator) + + def test_up_converter_write_with_gap(self): + # Verify that the up-converter can mask data properly when sending non-sequential writes + def main_generator(dut): + yield from dut.write(0x00, 0x11, wait_data=False) + yield from dut.write(0x02, 0x22, wait_data=False) + yield from dut.write(0x03, 0x33, wait_data=False, last=1) + + yield from dut.write_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data, address + 0x33220011, # 0x00 + 0x00000000, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + + for separate_rw in [True, False]: + with self.subTest(separate_rw=separate_rw): + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=separate_rw) + self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, + main_generator=main_generator) + + def test_up_converter_not_aligned(self): + data = self.pattern_test_data["8bit_to_32bit_not_aligned"] + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(data["expected"]), separate_rw=False) + self.converter_readback_test(dut, data["pattern"], data["expected"]) + + def cdc_readback_test(self, dut, pattern, mem_expected, clocks): + assert len(set(adr for adr, _ in pattern)) == len(pattern), "Pattern has duplicates!" + read_data = [] + + @passive + def read_handler(read_port): + yield read_port.rdata.ready.eq(1) + while True: + if (yield read_port.rdata.valid): + read_data.append((yield read_port.rdata.data)) + yield + + def main_generator(dut, pattern): + for adr, data in pattern: + yield from dut.write(adr, data) + + for adr, _ in pattern: + yield from dut.read(adr, wait_data=False) + + yield from dut.write_driver.wait_all() + yield from dut.read_driver.wait_all() + + generators = { + "user": [ + main_generator(dut, pattern), + read_handler(dut.read_user_port), + *dut.driver_generators, + timeout_generator(5000), + ], + "native": [ + dut.memory.write_handler(dut.write_crossbar_port), + dut.memory.read_handler(dut.read_crossbar_port), + ], + } + run_simulation(dut, generators, clocks) + self.assertEqual(dut.memory.mem, mem_expected) + self.assertEqual(read_data, [data for adr, data in pattern]) + + def test_port_cdc_same_clocks(self): + # Verify CDC with same clocks (frequency and phase). + data = self.pattern_test_data["32bit"] + dut = CDCDUT(user_data_width=32, native_data_width=32, mem_depth=len(data["expected"])) + clocks = { + "user": 10, + "native": (7, 3), + } + self.cdc_readback_test(dut, data["pattern"], data["expected"], clocks=clocks) + + def test_port_cdc_different_period(self): + # Verify CDC with different clock frequencies. + data = self.pattern_test_data["32bit"] + dut = CDCDUT(user_data_width=32, native_data_width=32, mem_depth=len(data["expected"])) + clocks = { + "user": 10, + "native": 7, + } + self.cdc_readback_test(dut, data["pattern"], data["expected"], clocks=clocks) + + def test_port_cdc_out_of_phase(self): + # Verify CDC with different clock phases. + data = self.pattern_test_data["32bit"] + dut = CDCDUT(user_data_width=32, native_data_width=32, mem_depth=len(data["expected"])) + clocks = { + "user": 10, + "native": (7, 3), + } + self.cdc_readback_test(dut, data["pattern"], data["expected"], clocks=clocks)