diff --git a/litedram/frontend/adaptation.py b/litedram/frontend/adaptation.py index 64650fc..1ce77c3 100644 --- a/litedram/frontend/adaptation.py +++ b/litedram/frontend/adaptation.py @@ -127,84 +127,9 @@ class LiteDRAMNativePortDownConverter(Module): self.submodules += stream.Pipeline( port_to.rdata, rdata_converter, port_from.rdata) -# LiteDRAMNativeWritePortUpConverter --------------------------------------------------------------- +# LiteDRAMNativePortUpConverter -------------------------------------------------------------------- -class LiteDRAMNativeWritePortUpConverter(Module): - # TODO: finish and remove hack - """LiteDRAM write port UpConverter - - This module increase user port data width to fit controller data width. - With N = port_to.data_width/port_from.data_width: - - Address is adapted (divided by N) - - N writes from user are regrouped in a single one to the controller - (when possible, ie when consecutive and bursting) - """ - def __init__(self, port_from, port_to, reverse=False): - assert port_from.clock_domain == port_to.clock_domain - assert port_from.data_width < port_to.data_width - assert port_from.mode == port_to.mode - assert port_from.mode == "write" - if port_to.data_width % port_from.data_width: - raise ValueError("Ratio must be an int") - - # # # - - ratio = port_to.data_width//port_from.data_width - - we = Signal() - address = Signal(port_to.address_width) - - counter = Signal(max=ratio) - counter_reset = Signal() - counter_ce = Signal() - self.sync += \ - If(counter_reset, - counter.eq(0) - ).Elif(counter_ce, - counter.eq(counter + 1) - ) - - self.submodules.fsm = fsm = FSM(reset_state="IDLE") - fsm.act("IDLE", - port_from.cmd.ready.eq(1), - If(port_from.cmd.valid, - counter_ce.eq(1), - NextValue(we, port_from.cmd.we), - NextValue(address, port_from.cmd.addr), - NextState("RECEIVE") - ) - ) - fsm.act("RECEIVE", - port_from.cmd.ready.eq(1), - If(port_from.cmd.valid, - counter_ce.eq(1), - If(counter == ratio-1, - NextState("GENERATE") - ) - ) - ) - fsm.act("GENERATE", - port_to.cmd.valid.eq(1), - port_to.cmd.we.eq(we), - port_to.cmd.addr.eq(address[log2_int(ratio):]), - If(port_to.cmd.ready, - NextState("IDLE") - ) - ) - - wdata_converter = stream.StrideConverter( - port_from.wdata.description, - port_to.wdata.description, - reverse=reverse) - self.submodules += wdata_converter - self.submodules += stream.Pipeline( - port_from.wdata, - wdata_converter, - port_to.wdata) - -# LiteDRAMNativeReadPortUpConverter ---------------------------------------------------------------- - -class LiteDRAMNativeReadPortUpConverter(Module): +class LiteDRAMNativePortUpConverter(Module): """LiteDRAM port UpConverter This module increase user port data width to fit controller data width. @@ -212,88 +137,193 @@ class LiteDRAMNativeReadPortUpConverter(Module): - Address is adapted (divided by N) - N read from user are regrouped in a single one to the controller (when possible, ie when consecutive and bursting) + - N writes from user are regrouped in a single one to the controller + (when possible, ie when consecutive and bursting) """ def __init__(self, port_from, port_to, reverse=False): assert port_from.clock_domain == port_to.clock_domain assert port_from.data_width < port_to.data_width assert port_from.mode == port_to.mode - assert port_from.mode == "read" if port_to.data_width % port_from.data_width: raise ValueError("Ratio must be an int") # # # ratio = port_to.data_width//port_from.data_width - + mode = port_from.mode # Command ---------------------------------------------------------------------------------- - cmd_buffer = stream.SyncFIFO([("sel", ratio)], 4) + # defines cmd type and the chunks that have been requested for the current port_to command + sel = Signal(ratio) + cmd_buffer = stream.SyncFIFO([("sel", ratio), ("we", 1)], 4) self.submodules += cmd_buffer - - counter = Signal(max=ratio) - counter_ce = Signal() - self.sync += \ - If(counter_ce, - counter.eq(counter + 1) - ) - - self.comb += \ - If(port_from.cmd.valid, - If(counter == 0, - port_to.cmd.valid.eq(1), - port_to.cmd.addr.eq(port_from.cmd.addr[log2_int(ratio):]), - port_from.cmd.ready.eq(port_to.cmd.ready), - counter_ce.eq(port_to.cmd.ready) - ).Else( - port_from.cmd.ready.eq(1), - counter_ce.eq(1) - ) - ) - - # TODO: fix sel - self.comb += \ - If(port_to.cmd.valid & port_to.cmd.ready, - cmd_buffer.sink.valid.eq(1), - cmd_buffer.sink.sel.eq(2**ratio-1) - ) - - # Datapath --------------------------------------------------------------------------------- - - rdata_buffer = stream.Buffer(port_to.rdata.description) - rdata_converter = stream.StrideConverter( - port_to.rdata.description, - port_from.rdata.description, - reverse=reverse) - self.submodules += rdata_buffer, rdata_converter - - rdata_chunk = Signal(ratio, reset=1) - rdata_chunk_valid = Signal() - self.sync += \ - If(rdata_converter.source.valid & - rdata_converter.source.ready, - rdata_chunk.eq(Cat(rdata_chunk[ratio-1], rdata_chunk[:ratio-1])) - ) + # store last command info + cmd_addr = Signal.like(port_from.cmd.addr) + cmd_we = Signal() + # indicates that we need to proceed to the next port_to command + next_cmd = Signal() + # signals that indicate that write/read convertion has finished + wdata_finished = Signal() + rdata_finished = Signal() + # register that user requested a flush + flush_r = Signal() self.comb += [ - port_to.rdata.connect(rdata_buffer.sink), - rdata_buffer.source.connect(rdata_converter.sink), - rdata_chunk_valid.eq((cmd_buffer.source.sel & rdata_chunk) != 0), - If(port_from.flush, - rdata_converter.source.ready.eq(1) - ).Elif(cmd_buffer.source.valid, - If(rdata_chunk_valid, - port_from.rdata.valid.eq(rdata_converter.source.valid), - port_from.rdata.data.eq(rdata_converter.source.data), - rdata_converter.source.ready.eq(port_from.rdata.ready) + # go to the next command if one of the following happens: + # * port_to address changes + # * cmd type changes + # * we received all the `ratio` commands + # * this is last command (flush) + next_cmd.eq( + (cmd_addr[log2_int(ratio):] != port_from.cmd.addr[log2_int(ratio):]) | + (cmd_we != port_from.cmd.we) | + (sel == 2**ratio - 1) | + port_from.flush | flush_r + ), + # when the first command is received, send it immediatelly + If(sel == 0, + If(port_from.cmd.valid, + port_to.cmd.valid.eq(1), + port_to.cmd.we.eq(port_from.cmd.we), + port_to.cmd.addr.eq(port_from.cmd.addr[log2_int(ratio):]), + port_from.cmd.ready.eq(port_to.cmd.ready), + ) + ).Else( + # we have already sent the initial command, now either continue sending cmd.ready + # to the master or send the current command if we have to go to next command + If(next_cmd, + cmd_buffer.sink.valid.eq(1), + cmd_buffer.sink.sel.eq(sel), + cmd_buffer.sink.we.eq(cmd_we), ).Else( - rdata_converter.source.ready.eq(1) + port_from.cmd.ready.eq(port_from.cmd.valid), ) ), - cmd_buffer.source.ready.eq( - rdata_converter.source.ready & rdata_chunk[ratio-1]) + cmd_buffer.source.ready.eq(wdata_finished | rdata_finished) ] + self.sync += [ + # whenever a command gets accepted, update `sel` bitmask and store the command info + If(port_from.cmd.valid & port_from.cmd.ready, + cmd_addr.eq(port_from.cmd.addr), + cmd_we.eq(port_from.cmd.we), + sel.eq(sel | (1 << port_from.cmd.addr[:log2_int(ratio)])), + ), + # clear `sel` after the command has been sent for data procesing + If(cmd_buffer.sink.valid & cmd_buffer.sink.ready, + sel.eq(0), + flush_r.eq(0), + ), + # store flush info until we register the current command + If(port_from.flush, + flush_r.eq(1) + ), + ] + + # Read Datapath ---------------------------------------------------------------------------- + + if mode == "read" or mode == "both": + # buffers output from port_to + rdata_fifo = stream.SyncFIFO(port_to.rdata.description, ratio) + # connected to the buffered output + rdata_converter = stream.StrideConverter( + port_to.rdata.description, + port_from.rdata.description, + reverse=reverse) + self.submodules += rdata_fifo, rdata_converter + + # bitmask shift register with single 1 bit and all other 0s + rdata_chunk = Signal(ratio, reset=1) + rdata_chunk_valid = Signal() + # whenever the converter spits data chunk we shift the chunk bitmask + self.sync += \ + If(rdata_converter.source.valid & + rdata_converter.source.ready, + rdata_chunk.eq(Cat(rdata_chunk[ratio-1], rdata_chunk[:ratio-1])) + ) + + self.comb += [ + # port_to -> rdata_fifo -> rdata_converter + port_to.rdata.connect(rdata_fifo.sink), + rdata_fifo.source.connect(rdata_converter.sink), + # chunk is valid if it's bit is in `sel` sent previously to the FIFO + rdata_chunk_valid.eq((cmd_buffer.source.sel & rdata_chunk) != 0), + # whenever `sel` from FIFO is valid + If(cmd_buffer.source.valid & ~cmd_buffer.source.we, + # if that chunk is valid we send it to the user port and wait for ready from user + If(rdata_chunk_valid, + port_from.rdata.valid.eq(rdata_converter.source.valid), + port_from.rdata.data.eq(rdata_converter.source.data), + rdata_converter.source.ready.eq(port_from.rdata.ready) + # if this was not requested by `sel` then we just ack it + ).Else( + rdata_converter.source.ready.eq(1) + ), + rdata_finished.eq(rdata_converter.source.valid & rdata_converter.source.ready & rdata_chunk[ratio - 1]) + ), + ] + + # Write Datapath --------------------------------------------------------------------------- + + if mode == "write" or mode == "both": + wdata_fifo = stream.SyncFIFO(port_from.wdata.description, ratio) + wdata_converter = stream.StrideConverter( + port_from.wdata.description, + port_to.wdata.description, + reverse=reverse) + self.submodules += wdata_converter, wdata_fifo + + # bitmask shift register with single 1 bit and all other 0s + wdata_chunk = Signal(ratio, reset=1) + wdata_chunk_valid = Signal() + # whenever the converter spits data chunk we shift the chunk bitmask + self.sync += \ + If(wdata_converter.sink.valid & wdata_converter.sink.ready, + wdata_chunk.eq(Cat(wdata_chunk[ratio-1], wdata_chunk[:ratio-1])) + ) + + # replicate sel so that each bit covers according part of we bitmask (1 sel bit may cover + # multiple bytes) + wdata_sel = Signal.like(port_to.wdata.we) + # self.comb += wdata_sel.eq( + # Cat([Replicate(cmd_buffer.source.sel[i], port_to.wdata.we.nbits // sel.nbits) for i in range(ratio)]) + # ) + + self.sync += [ + If(cmd_buffer.source.valid & cmd_buffer.source.we & wdata_chunk[ratio - 1], + wdata_sel.eq(Cat([Replicate(cmd_buffer.source.sel[i], port_to.wdata.we.nbits // sel.nbits) + for i in range(ratio)])) + ) + ] + + self.comb += [ + port_from.wdata.connect(wdata_fifo.sink), + + wdata_chunk_valid.eq((cmd_buffer.source.sel & wdata_chunk) != 0), + + If(cmd_buffer.source.valid & cmd_buffer.source.we, + If(wdata_chunk_valid, + wdata_converter.sink.valid.eq(wdata_fifo.source.valid), + wdata_converter.sink.data.eq(wdata_fifo.source.data), + wdata_converter.sink.we.eq(wdata_fifo.source.we), + wdata_fifo.source.ready.eq(1), + ).Else( + wdata_converter.sink.valid.eq(1), + wdata_converter.sink.data.eq(0), + wdata_converter.sink.we.eq(0), + wdata_fifo.source.ready.eq(0), + ), + ), + + port_to.wdata.valid.eq(wdata_converter.source.valid), + port_to.wdata.data.eq(wdata_converter.source.data), + port_to.wdata.we.eq(wdata_converter.source.we & wdata_sel), + wdata_converter.source.ready.eq(port_to.wdata.ready), + # wdata_finished.eq(wdata_converter.source.valid & wdata_converter.source.ready), + wdata_finished.eq(wdata_converter.sink.valid & wdata_converter.sink.ready & wdata_chunk[ratio-1]), + ] + # LiteDRAMNativePortConverter ---------------------------------------------------------------------- class LiteDRAMNativePortConverter(Module): @@ -309,12 +339,7 @@ class LiteDRAMNativePortConverter(Module): converter = LiteDRAMNativePortDownConverter(port_from, port_to, reverse) self.submodules += converter elif port_from.data_width < port_to.data_width: - if mode == "write": - converter = LiteDRAMNativeWritePortUpConverter(port_from, port_to, reverse) - elif mode == "read": - converter = LiteDRAMNativeReadPortUpConverter(port_from, port_to, reverse) - else: - raise NotImplementedError + converter = LiteDRAMNativePortUpConverter(port_from, port_to, reverse) self.submodules += converter else: self.comb += [ diff --git a/test/common.py b/test/common.py index 019717e..ffab27e 100644 --- a/test/common.py +++ b/test/common.py @@ -73,15 +73,26 @@ class NativePortDriver: yield @passive - def read_data_handler(self): - while True: - while (yield self.port.rdata.valid) == 0: - yield - data = (yield self.port.rdata.data) + def read_data_handler(self, latency=0): + if latency == 0: yield self.port.rdata.ready.eq(1) - yield - yield self.port.rdata.ready.eq(0) - self.rdata.append(data) + while True: + while (yield self.port.rdata.valid) == 0: + yield + data = (yield self.port.rdata.data) + yield + self.rdata.append(data) + else: + while True: + while (yield self.port.rdata.valid) == 0: + yield + data = (yield self.port.rdata.data) + yield self.port.rdata.ready.eq(1) + yield + self.rdata.append(data) + yield self.port.rdata.ready.eq(0) + for _ in range(latency): + yield def read(self, address, wait_data=True): yield self.port.cmd.valid.eq(1) @@ -97,17 +108,20 @@ class NativePortDriver: yield return self.rdata[-1] - def write(self, address, data, we=None, wait_data=True): + def write(self, address, data, we=None, wait_data=True, data_with_cmd=False): if we is None: we = 2**self.port.wdata.we.nbits - 1 yield self.port.cmd.valid.eq(1) yield self.port.cmd.we.eq(1) yield self.port.cmd.addr.eq(address) + if data_with_cmd: + self.wdata.append((data, we)) yield while (yield self.port.cmd.ready) == 0: yield + if not data_with_cmd: + self.wdata.append((data, we)) yield self.port.cmd.valid.eq(0) - self.wdata.append((data, we)) if wait_data: n_wdata = len(self.wdata) while len(self.wdata) != n_wdata - 1: diff --git a/test/test_adaptation.py b/test/test_adaptation.py index 76e4a7d..67aa2f2 100644 --- a/test/test_adaptation.py +++ b/test/test_adaptation.py @@ -17,18 +17,25 @@ from litex.gen.sim import * class ConverterDUT(Module): - def __init__(self, user_data_width, native_data_width, mem_depth, separate_rw=True): + def __init__(self, user_data_width, native_data_width, mem_depth, separate_rw=True, read_latency=0): self.separate_rw = separate_rw if separate_rw: self.write_user_port = LiteDRAMNativeWritePort(address_width=32, data_width=user_data_width) self.write_crossbar_port = LiteDRAMNativeWritePort(address_width=32, data_width=native_data_width) self.read_user_port = LiteDRAMNativeReadPort( address_width=32, data_width=user_data_width) self.read_crossbar_port = LiteDRAMNativeReadPort( address_width=32, data_width=native_data_width) + self.write_driver = NativePortDriver(self.write_user_port) + self.read_driver = NativePortDriver(self.read_user_port) else: self.write_user_port = LiteDRAMNativePort(mode="both", address_width=32, data_width=user_data_width) self.write_crossbar_port = LiteDRAMNativePort(mode="both", address_width=32, data_width=native_data_width) + self.write_driver = NativePortDriver(self.write_user_port) self.read_user_port = self.write_user_port self.read_crossbar_port = self.write_crossbar_port + self.read_driver = self.write_driver + + self.driver_generators = [self.write_driver.write_data_handler(), + self.read_driver.read_data_handler(latency=read_latency)] # Memory self.memory = DRAMMemory(native_data_width, mem_depth) @@ -43,73 +50,15 @@ class ConverterDUT(Module): self.submodules.converter = LiteDRAMNativePortConverter( self.write_user_port, self.write_crossbar_port) - def read(self, address, read_data=True): - port = self.read_user_port - yield port.cmd.valid.eq(1) - yield port.cmd.we.eq(0) - yield port.cmd.addr.eq(address) - yield - while (yield port.cmd.ready) == 0: - yield - yield port.cmd.valid.eq(0) - yield - if read_data: - while (yield port.rdata.valid) == 0: - yield - data = (yield port.rdata.data) - yield port.rdata.ready.eq(1) - yield - yield port.rdata.ready.eq(0) - yield - return data + def read(self, address, wait_data=True): + return (yield from self.read_driver.read(address, wait_data=wait_data)) - def write(self, address, data, we=None): - if we is None: - we = 2**self.write_user_port.wdata.we.nbits - 1 + def write(self, address, data, wait_data=True, we=None): + data_with_cmd = False if self.write_user_port.data_width > self.write_crossbar_port.data_width: - yield from self._write_down(address, data, we) - else: - yield from self._write_up(address, data, we) - - def _write_up(self, address, data, we): - port = self.write_user_port - yield port.cmd.valid.eq(1) - yield port.cmd.we.eq(1) - yield port.cmd.addr.eq(address) - yield - while (yield port.cmd.ready) == 0: - yield - yield port.cmd.valid.eq(0) - yield - yield port.wdata.valid.eq(1) - yield port.wdata.data.eq(data) - yield port.wdata.we.eq(we) - yield - while (yield port.wdata.ready) == 0: - yield - yield port.wdata.valid.eq(0) - yield - - def _write_down(self, address, data, we): - # Down converter must have all the data available along with cmd, it will set - # user_port.cmd.ready only when it sends all input words. - port = self.write_user_port - yield port.cmd.valid.eq(1) - yield port.cmd.we.eq(1) - yield port.cmd.addr.eq(address) - yield port.wdata.valid.eq(1) - yield port.wdata.data.eq(data) - yield port.wdata.we.eq(we) - yield - # Ready goes up only after StrideConverter copied all words - while (yield port.cmd.ready) == 0: - yield - yield port.cmd.valid.eq(0) - yield - while (yield port.wdata.ready) == 0: - yield - yield port.wdata.valid.eq(0) - yield + data_with_cmd = True + return (yield from self.write_driver.write(address, data, we, wait_data=wait_data, + data_with_cmd=data_with_cmd)) class CDCDUT(ConverterDUT): @@ -142,48 +91,44 @@ class TestAdaptation(MemoryTestDataMixin, unittest.TestCase): dut.finalize() self.assertIn("ratio must be an int", str(cm.exception).lower()) - def converter_readback_test(self, dut, pattern, mem_expected): + def converter_readback_test(self, dut, pattern, mem_expected, main_generator=None): assert len(set(adr for adr, _ in pattern)) == len(pattern), "Pattern has duplicates!" - read_data = [] - @passive - def read_handler(read_port): - yield read_port.rdata.ready.eq(1) - while True: - if (yield read_port.rdata.valid): - read_data.append((yield read_port.rdata.data)) + if main_generator is None: + def main_generator(dut): + for adr, data in pattern: + yield from dut.write(adr, data) + + for adr, _ in pattern: + yield from dut.read(adr, wait_data=False) + + # we need to flush after last command in the up-converter case, if the last + # command does not fill whole `sel` + yield dut.write_user_port.flush.eq(1) yield + yield dut.write_user_port.flush.eq(0) - def main_generator(dut, pattern): - for adr, data in pattern: - yield from dut.write(adr, data) - - for adr, _ in pattern: - yield from dut.read(adr, read_data=False) - - # Latency delay - for _ in range(32): - yield + yield from dut.write_driver.wait_all() + yield from dut.read_driver.wait_all() generators = [ - main_generator(dut, pattern), - read_handler(dut.read_user_port), + main_generator(dut), + *dut.driver_generators, dut.memory.write_handler(dut.write_crossbar_port), dut.memory.read_handler(dut.read_crossbar_port), - timeout_generator(5000), + timeout_generator(1000), ] run_simulation(dut, generators, vcd_name='sim.vcd') self.assertEqual(dut.memory.mem, mem_expected) - self.assertEqual(read_data, [data for adr, data in pattern]) + self.assertEqual(dut.read_driver.rdata, [data for adr, data in pattern]) - # TODO: test port.flush! - - def converter_test(self, test_data, user_data_width, native_data_width): - for separate_rw in [True, False]: + def converter_test(self, test_data, user_data_width, native_data_width, **kwargs): + # for separate_rw in [True, False]: + for separate_rw in [False]: with self.subTest(separate_rw=separate_rw): data = self.pattern_test_data[test_data] dut = ConverterDUT(user_data_width=user_data_width, native_data_width=native_data_width, - mem_depth=len(data["expected"]), separate_rw=separate_rw) + mem_depth=len(data["expected"]), separate_rw=separate_rw, **kwargs) self.converter_readback_test(dut, data["pattern"], data["expected"]) def test_converter_1to1(self): @@ -214,10 +159,200 @@ class TestAdaptation(MemoryTestDataMixin, unittest.TestCase): # Verify 32-bit to 256-bit up-conversion. self.converter_test(test_data="32bit_to_256bit", user_data_width=32, native_data_width=256) - # TODO: implement case when user does not write all words (LiteDRAMNativeWritePortUpConverter) - @unittest.skip("Only full-burst writes currently supported") + def test_converter_up_read_latencies(self): + # Verify that up-conversion works with different port reader latencies + cases = { + "1to2": dict(test_data="8bit_to_16bit", user_data_width=8, native_data_width=16), + "1to4": dict(test_data="32bit_to_128bit", user_data_width=32, native_data_width=128), + "1to8": dict(test_data="32bit_to_256bit", user_data_width=32, native_data_width=256), + } + for latency in [0, 1]: + with self.subTest(latency=latency): + for conversion, kwargs in cases.items(): + with self.subTest(conversion=conversion): + self.converter_test(**kwargs, read_latency=latency) + + def test_converter_down_read_latencies(self): + # Verify that down-conversion works with different port reader latencies + cases = { + "2to1": dict(test_data="64bit_to_32bit", user_data_width=64, native_data_width=32), + "4to1": dict(test_data="32bit_to_8bit", user_data_width=32, native_data_width=8), + "8to1": dict(test_data="64bit_to_8bit", user_data_width=64, native_data_width=8), + } + for latency in [0, 1]: + with self.subTest(latency=latency): + for conversion, kwargs in cases.items(): + with self.subTest(conversion=conversion): + self.converter_test(**kwargs, read_latency=latency) + + def test_up_converter_write_complete_sequence(self): + # Verify up-conversion when master sends full sequences (of `ratio` length) + def main_generator(dut): + yield from dut.write(0x00, 0x11) # first + yield from dut.write(0x01, 0x22) + yield from dut.write(0x02, 0x33) + yield from dut.write(0x03, 0x44) + yield from dut.write(0x04, 0x55) # second + yield from dut.write(0x05, 0x66) + yield from dut.write(0x06, 0x77) + yield from dut.write(0x07, 0x88) + + yield from dut.write_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data address + 0x44332211, # 0x00 + 0x88776655, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + + for separate_rw in [False, True]: + with self.subTest(separate_rw=separate_rw): + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=separate_rw) + self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, + main_generator=main_generator) + + def test_up_converter_write_with_manual_flush(self): + # Verify that up-conversion writes incomplete data when flushed + def main_generator(dut): + yield from dut.write(0x00, 0x11, wait_data=False) + yield from dut.write(0x01, 0x22, wait_data=False) + yield from dut.write(0x02, 0x33, wait_data=False) + yield dut.write_user_port.flush.eq(1) + yield + yield dut.write_user_port.flush.eq(0) + + yield from dut.write_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data address + 0x00332211, # 0x00 + 0x00000000, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + + for separate_rw in [False, True]: + with self.subTest(separate_rw=separate_rw): + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=separate_rw) + self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, + main_generator=main_generator) + + def test_up_converter_auto_flush_on_address_change(self): + # Verify that up-conversion automatically flushes the cmd if the (shifted) address changes + def main_generator(dut): + yield from dut.write(0x00, 0x11, wait_data=False) # -> 0x00 + yield from dut.write(0x01, 0x22, wait_data=False) # -> 0x00 + yield from dut.write(0x02, 0x33, wait_data=False) # -> 0x00 + yield from dut.write(0x04, 0x55, wait_data=False) # -> 0x01 + yield from dut.write(0x05, 0x66, wait_data=False) # -> 0x01 + yield from dut.write(0x06, 0x77, wait_data=False) # -> 0x01 + yield from dut.write(0x07, 0x88, wait_data=False) # -> 0x01 + + yield from dut.write_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data address + 0x00332211, # 0x00 + 0x88776655, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + + + for separate_rw in [False, True]: + with self.subTest(separate_rw=separate_rw): + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=separate_rw) + self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, + main_generator=main_generator) + + @unittest.skip("Read after write not yet synchronised enough") + def test_up_converter_auto_flush_on_cmd_we_change(self): + # Verify that up-conversion automatically flushes the cmd when command type (write/read) changes + def main_generator(dut): + yield from dut.write(0x00, 0x11, wait_data=False) + yield from dut.write(0x01, 0x22, wait_data=False) + yield from dut.write(0x02, 0x33, wait_data=False) + yield from dut.write(0x03, 0x44, wait_data=False) + yield from dut.write(0x04, 0x55, wait_data=False) + yield from dut.write(0x05, 0x66, wait_data=False) + yield from dut.read (0x00) + yield from dut.read (0x01) + yield from dut.read (0x02) + yield from dut.read (0x03) + yield from dut.read (0x04) + yield from dut.read (0x05) + yield from dut.read (0x06) + yield from dut.read (0x07) + + yield from dut.write_driver.wait_all() + yield from dut.read_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data address + 0x44332211, # 0x00 + 0x00006655, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + pattern = [ + (0x00, mem_expected[0]), + (0x01, mem_expected[1]), + ] + + for separate_rw in [False, True]: + with self.subTest(separate_rw=separate_rw): + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=separate_rw) + self.converter_readback_test(dut, pattern=pattern, mem_expected=mem_expected, + main_generator=main_generator) + + def test_up_converter_write_with_gap(self): + # Verify that the up-converter can mask data properly when sending non-sequential writes + def main_generator(dut): + yield from dut.write(0x00, 0x11, wait_data=False) + yield from dut.write(0x02, 0x22, wait_data=False) + yield from dut.write(0x03, 0x33, wait_data=False) + yield dut.write_user_port.flush.eq(1) + yield + yield dut.write_user_port.flush.eq(0) + + yield from dut.write_driver.wait_all() + for _ in range(8): # wait for memory + yield + + mem_expected = [ + # data, address + 0x33220011, # 0x00 + 0x00000000, # 0x04 + 0x00000000, # 0x08 + 0x00000000, # 0x0c + ] + + for separate_rw in [True, False]: + with self.subTest(separate_rw=separate_rw): + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(mem_expected), separate_rw=separate_rw) + self.converter_readback_test(dut, pattern=[], mem_expected=mem_expected, + main_generator=main_generator) + def test_converter_up_not_aligned(self): - self.converter_test(test_data="8bit_to_32bit_not_aligned", user_data_width=8, native_data_width=32) + data = self.pattern_test_data["8bit_to_32bit_not_aligned"] + dut = ConverterDUT(user_data_width=8, native_data_width=32, + mem_depth=len(data["expected"]), separate_rw=False) + self.converter_readback_test(dut, data["pattern"], data["expected"]) def cdc_readback_test(self, dut, pattern, mem_expected, clocks): assert len(set(adr for adr, _ in pattern)) == len(pattern), "Pattern has duplicates!" @@ -236,16 +371,16 @@ class TestAdaptation(MemoryTestDataMixin, unittest.TestCase): yield from dut.write(adr, data) for adr, _ in pattern: - yield from dut.read(adr, read_data=False) + yield from dut.read(adr, wait_data=False) - # Latency delay - for _ in range(32): - yield + yield from dut.write_driver.wait_all() + yield from dut.read_driver.wait_all() generators = { "user": [ main_generator(dut, pattern), read_handler(dut.read_user_port), + *dut.driver_generators, timeout_generator(5000), ], "native": [ diff --git a/test/test_wishbone.py b/test/test_wishbone.py index 9b2da86..96b1c93 100644 --- a/test/test_wishbone.py +++ b/test/test_wishbone.py @@ -44,7 +44,7 @@ class TestWishbone(MemoryTestDataMixin, unittest.TestCase): dut.mem.write_handler(dut.port), dut.mem.read_handler(dut.port), ] - run_simulation(dut, generators) + run_simulation(dut, generators, vcd_name='sim.vcd') self.assertEqual(dut.mem.mem, mem_expected) def test_wishbone_8bit(self):