From 6943a1a4a580e012478338f844e4e30095cad5a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Boczar?= Date: Thu, 5 Nov 2020 12:12:47 +0100 Subject: [PATCH] lpddr4: initial PHY logic and simulation tests --- litedram/phy/lpddr4phy.py | 685 +++++++++++++++++++++++++++++ test/test_lpddr4.py | 887 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 1572 insertions(+) create mode 100644 litedram/phy/lpddr4phy.py create mode 100644 test/test_lpddr4.py diff --git a/litedram/phy/lpddr4phy.py b/litedram/phy/lpddr4phy.py new file mode 100644 index 0000000..9ac1b55 --- /dev/null +++ b/litedram/phy/lpddr4phy.py @@ -0,0 +1,685 @@ +import re +from functools import reduce +from operator import or_ +from collections import defaultdict + +import math + +from migen import * + +from litex.soc.interconnect.csr import * + +from litedram.common import * +from litedram.phy.dfi import * + + +def _chunks(lst, n): + for i in range(0, len(lst), n): + yield lst[i:i + n] + +def bitpattern(s): + if len(s) > 8: + return reduce(or_, [bitpattern(si) << (8*i) for i, si in enumerate(_chunks(s, 8))]) + assert len(s) == 8 + s = s.translate(s.maketrans("_-", "01")) + return int(s[::-1], 2) # LSB first, so reverse the string + +def delayed(mod, sig, cycles=1): + delay = TappedDelayLine(signal=sig, ntaps=cycles) + mod.submodules += delay + return delay.output + +class ConstBitSlip(Module): + def __init__(self, dw, i=None, o=None, slp=None, cycles=1): + self.i = Signal(dw, name='i') if i is None else i + self.o = Signal(dw, name='o') if o is None else o + assert cycles >= 1 + assert 0 <= slp <= cycles*dw-1 + slp = (cycles*dw-1) - slp + + # # # + + self.r = r = Signal((cycles+1)*dw, reset_less=True) + self.sync += r.eq(Cat(r[dw:], self.i)) + cases = {} + for i in range(cycles*dw): + cases[i] = self.o.eq(r[i+1:dw+i+1]) + self.comb += Case(slp, cases) + +# TODO: rewrite DQSPattern in common.py to support different data widths +class DQSPattern(Module): + def __init__(self, preamble=None, postamble=None, wlevel_en=0, wlevel_strobe=0, register=False): + self.preamble = Signal() if preamble is None else preamble + self.postamble = Signal() if postamble is None else postamble + self.o = Signal(16) + + # # # + + # DQS Pattern transmitted as LSB-first. + + self.comb += [ + self.o.eq(0b0101010101010101), + If(self.preamble, + self.o.eq(0b0001010101010101) + ), + If(self.postamble, + self.o.eq(0b0101010101010100) + ), + If(wlevel_en, + self.o.eq(0b0000000000000000), + If(wlevel_strobe, + self.o.eq(0b0000000000000001) + ) + ) + ] + if register: + o = Signal.like(self.o) + self.sync += o.eq(self.o) + self.o = o + +# LPDDR4PHY ---------------------------------------------------------------------------------------- + +class LPDDR4PHY(Module, AutoCSR): + def __init__(self, pads, *, + sys_clk_freq, write_ser_latency, read_des_latency, phytype, cmd_delay=None): + self.pads = pads + self.memtype = memtype = "LPDDR4" + self.nranks = nranks = 1 if not hasattr(pads, "cs_n") else len(pads.cs_n) + self.databits = databits = len(pads.dq) + self.addressbits = addressbits = 17 # for activate row address + self.bankbits = bankbits = 3 + self.nphases = nphases = 8 + self.tck = tck = 1 / (nphases*sys_clk_freq) + assert databits % 8 == 0 + + # Parameters ------------------------------------------------------------------------------- + def get_cl_cw(memtype, tck): + # MT53E256M16D1, No DBI, Set A + f_to_cl_cwl = OrderedDict() + f_to_cl_cwl[ 532e6] = ( 6, 4) # FIXME: with that low cwl, wrtap is 0 + f_to_cl_cwl[1066e6] = (10, 6) + f_to_cl_cwl[1600e6] = (14, 8) + f_to_cl_cwl[2132e6] = (20, 10) + f_to_cl_cwl[2666e6] = (24, 12) + f_to_cl_cwl[3200e6] = (28, 14) + f_to_cl_cwl[3732e6] = (32, 16) + f_to_cl_cwl[4266e6] = (36, 18) + for f, (cl, cwl) in f_to_cl_cwl.items(): + if tck >= 2/f: + return cl, cwl + raise ValueError + + # Bitslip introduces latency between from `cycles` up to `cycles + 1` + bitslip_cycles = 1 + # Commands are sent over 4 cycles of DRAM clock (sys8x) + cmd_latency = 4 + # Commands read from adapters are delayed on ConstBitSlips + ca_latency = 1 + + cl, cwl = get_cl_cw(memtype, tck) + cl_sys_latency = get_sys_latency(nphases, cl) + cwl_sys_latency = get_sys_latency(nphases, cwl) + rdphase = get_sys_phase(nphases, cl_sys_latency, cl + cmd_latency) + wrphase = get_sys_phase(nphases, cwl_sys_latency, cwl + cmd_latency) + + # When the calculated phase is negative, it means that we need to increase sys latency + def updated_latency(phase): + delay_update = 0 + while phase < 0: + phase += nphases + delay_update += 1 + return phase, delay_update + + wrphase, cwl_sys_delay = updated_latency(wrphase) + rdphase, cl_sys_delay = updated_latency(rdphase) + cwl_sys_latency += cwl_sys_delay + cl_sys_latency += cl_sys_delay + + # Read latency + read_data_delay = ca_latency + write_ser_latency + cl_sys_latency # DFI cmd -> read data on DQ + read_des_delay = read_des_latency + bitslip_cycles # data on DQ -> data on DFI rddata + read_latency = read_data_delay + read_des_delay + + # Write latency + write_latency = cwl_sys_latency + + # FIXME: remove + if __import__("os").environ.get("DEBUG") == '1': + print('cl', end=' = '); __import__('pprint').pprint(cl) + print('cwl', end=' = '); __import__('pprint').pprint(cwl) + print('cl_sys_latency', end=' = '); __import__('pprint').pprint(cl_sys_latency) + print('cwl_sys_latency', end=' = '); __import__('pprint').pprint(cwl_sys_latency) + print('rdphase', end=' = '); __import__('pprint').pprint(rdphase) + print('wrphase', end=' = '); __import__('pprint').pprint(wrphase) + print('read_data_delay', end=' = '); __import__('pprint').pprint(read_data_delay) + print('read_des_delay', end=' = '); __import__('pprint').pprint(read_des_delay) + print('read_latency', end=' = '); __import__('pprint').pprint(read_latency) + print('write_latency', end=' = '); __import__('pprint').pprint(write_latency) + + # Registers -------------------------------------------------------------------------------- + self._rst = CSRStorage() + + self._dly_sel = CSRStorage(databits//8) + + self._wlevel_en = CSRStorage() + self._wlevel_strobe = CSR() + + self._dly_sel = CSRStorage(databits//8) + + self._rdly_dq_bitslip_rst = CSR() + self._rdly_dq_bitslip = CSR() + + self._wdly_dq_bitslip_rst = CSR() + self._wdly_dq_bitslip = CSR() + + self._rdphase = CSRStorage(int(math.log2(nphases)), reset=rdphase) + self._wrphase = CSRStorage(int(math.log2(nphases)), reset=wrphase) + + # PHY settings ----------------------------------------------------------------------------- + self.settings = PhySettings( + phytype = phytype, + memtype = memtype, + databits = databits, + dfi_databits = 2*databits, + nranks = nranks, + nphases = nphases, + rdphase = self._rdphase.storage, + wrphase = self._wrphase.storage, + cl = cl, + cwl = cwl, + read_latency = read_latency, + write_latency = write_latency, + cmd_latency = cmd_latency, + cmd_delay = cmd_delay, + ) + + # DFI Interface ---------------------------------------------------------------------------- + # Due to the fact that LPDDR4 has 16n prefetch we use 8 phases to be able to read/write a + # whole burst during a single controller clock cycle. PHY should use sys8x clock. + self.dfi = dfi = Interface(addressbits, bankbits, nranks, 2*databits, nphases=8) + + # # # + + adapters = [DFIPhaseAdapter(phase) for phase in self.dfi.phases] + self.submodules += adapters + + # Now prepare the data by converting the sequences on adapters into sequences on the pads. + # We have to ignore overlapping commands, and module timings have to ensure that there are + # no overlapping commands anyway. + # Pads: reset_n, CS, CKE, CK, CA[5:0], DMI[1:0], DQ[15:0], DQS[1:0], ODT_CA + self.ck_clk = Signal(2*nphases) + self.ck_cke = Signal(nphases) + self.ck_odt = Signal(nphases) + self.ck_reset_n = Signal(nphases) + self.ck_cs = Signal(nphases) + self.ck_ca = [Signal(nphases) for _ in range(6)] + self.ck_dmi_o = [Signal(2*nphases) for _ in range(2)] + self.ck_dmi_i = [Signal(2*nphases) for _ in range(2)] + self.dmi_oe = Signal() + self.ck_dq_o = [Signal(2*nphases) for _ in range(databits)] + self.ck_dq_i = [Signal(2*nphases) for _ in range(databits)] + self.dq_oe = Signal() + self.ck_dqs_o = [Signal(2*nphases) for _ in range(2)] + self.ck_dqs_i = [Signal(2*nphases) for _ in range(2)] + self.dqs_oe = Signal() + + # Clocks ----------------------------------------------------------------------------------- + self.comb += self.ck_clk.eq(bitpattern("-_-_-_-_" * 2)) + + # Simple commands -------------------------------------------------------------------------- + self.comb += [ + self.ck_cke.eq(Cat(delayed(self, phase.cke) for phase in self.dfi.phases)), + self.ck_odt.eq(Cat(delayed(self, phase.odt) for phase in self.dfi.phases)), + self.ck_reset_n.eq(Cat(delayed(self, phase.reset_n) for phase in self.dfi.phases)), + ] + + # LPDDR4 Commands -------------------------------------------------------------------------- + # Each command can span several phases (up to 4), so we must ignore overlapping commands, + # but in general, module timings should be set in a way that overlapping will never happen. + + # Create a history of valid adapters used for masking overlapping ones. + # TODO: make optional, as it takes up resources and the controller should ensure no overlaps + valids = ConstBitSlip(dw=nphases, cycles=1, slp=0) + self.submodules += valids + self.comb += valids.i.eq(Cat(a.valid for a in adapters)) + # valids_hist = valids.r + valids_hist = Signal.like(valids.r) + # TODO: especially make this part optional + for i in range(len(valids_hist)): + was_valid_before = reduce(or_, valids_hist[max(0, i-3):i], 0) + self.comb += valids_hist[i].eq(valids.r[i] & ~was_valid_before) + + cs_per_adapter = [] + ca_per_adapter = defaultdict(list) + for phase, adapter in enumerate(adapters): + # The signals from an adapter can be used if there were no commands on 3 previous cycles + allowed = ~reduce(or_, valids_hist[nphases+phase - 3:nphases+phase]) + + # Use CS and CA of given adapter slipped by `phase` bits + cs_bs = ConstBitSlip(dw=nphases, cycles=1, slp=phase) + self.submodules += cs_bs + self.comb += cs_bs.i.eq(Cat(adapter.cs)), + cs_mask = Replicate(allowed, len(cs_bs.o)) + cs = cs_bs.o & cs_mask + cs_per_adapter.append(cs) + + # For CA we need to do the same for each bit + ca_bits = [] + for bit in range(6): + ca_bs = ConstBitSlip(dw=nphases, cycles=1, slp=phase) + self.submodules += ca_bs + ca_bit_hist = [adapter.ca[i][bit] for i in range(4)] + self.comb += ca_bs.i.eq(Cat(*ca_bit_hist)), + ca_mask = Replicate(allowed, len(ca_bs.o)) + ca = ca_bs.o & ca_mask + ca_per_adapter[bit].append(ca) + + # OR all the masked signals + self.comb += self.ck_cs.eq(reduce(or_, cs_per_adapter)) + for bit in range(6): + self.comb += self.ck_ca[bit].eq(reduce(or_, ca_per_adapter[bit])) + + # DQ --------------------------------------------------------------------------------------- + dq_oe = Signal() + self.comb += self.dq_oe.eq(delayed(self, dq_oe, cycles=1)) + + for bit in range(self.databits): + # output + self.submodules += BitSlip( + dw = 2*nphases, + cycles = bitslip_cycles, + rst = (self._dly_sel.storage[bit//8] & self._wdly_dq_bitslip_rst.re) | self._rst.storage, + slp = self._dly_sel.storage[bit//8] & self._wdly_dq_bitslip.re, + i = Cat(*[self.dfi.phases[i//2].wrdata[i%2 * self.databits + bit] for i in range(2*nphases)]), + o = self.ck_dq_o[bit], + ) + + # input + dq_i_bs = Signal(2*nphases) + self.submodules += BitSlip( + dw = 2*nphases, + cycles = bitslip_cycles, + rst = (self._dly_sel.storage[bit//8] & self._rdly_dq_bitslip_rst.re) | self._rst.storage, + slp = self._dly_sel.storage[bit//8] & self._rdly_dq_bitslip.re, + i = self.ck_dq_i[bit], + o = dq_i_bs, + ) + for i in range(2*nphases): + self.comb += self.dfi.phases[i//2].rddata[i%2 * self.databits + bit].eq(dq_i_bs[i]) + + # DQS -------------------------------------------------------------------------------------- + dqs_oe = Signal() + dqs_preamble = Signal() + dqs_postamble = Signal() + dqs_pattern = DQSPattern( + preamble = dqs_preamble, # FIXME: are defined the opposite way (common.py) ??? + postamble = dqs_postamble, + wlevel_en = self._wlevel_en.storage, + wlevel_strobe = self._wlevel_strobe.re) + self.submodules += dqs_pattern + self.comb += [ + self.dqs_oe.eq(delayed(self, dqs_oe, cycles=1)), + ] + + for bit in range(self.databits//8): + # output + self.submodules += BitSlip( + dw = 2*nphases, + cycles = bitslip_cycles, + rst = (self._dly_sel.storage[bit//8] & self._wdly_dq_bitslip_rst.re) | self._rst.storage, + slp = self._dly_sel.storage[bit//8] & self._wdly_dq_bitslip.re, + i = dqs_pattern.o, + o = self.ck_dqs_o[bit], + ) + + # DMI -------------------------------------------------------------------------------------- + # DMI signal is used for Data Mask or Data Bus Invertion depending on Mode Registers values. + # With DM and DBI disabled, this signal is a Don't Care. + # With DM enabled, masking is performed only when the command used is WRITE-MASKED. + # TODO: use WRITE-MASKED for all write commands, and configure Mode Registers for that + # during DRAM initialization (we don't want to support DBI). + for bin in range(self.databits//8): + self.comb += self.ck_dmi_o[bit].eq(0) + + # Read Control Path ------------------------------------------------------------------------ + # Creates a delay line of read commands coming from the DFI interface. The output is used to + # signal a valid read data to the DFI interface. + # + # The read data valid is asserted for 1 sys_clk cycle when the data is available on the DFI + # interface, the latency is the sum of the OSERDESE2, CAS, ISERDESE2 and Bitslip latencies. + rddata_en = TappedDelayLine( + signal = reduce(or_, [dfi.phases[i].rddata_en for i in range(nphases)]), + ntaps = self.settings.read_latency + ) + self.submodules += rddata_en + + self.comb += [phase.rddata_valid.eq(rddata_en.output | self._wlevel_en.storage) for phase in dfi.phases] + + # Write Control Path ----------------------------------------------------------------------- + wrtap = cwl_sys_latency - 1 + assert wrtap >= 1 + + # Create a delay line of write commands coming from the DFI interface. This taps are used to + # control DQ/DQS tristates. + wrdata_en = TappedDelayLine( + signal = reduce(or_, [dfi.phases[i].wrdata_en for i in range(nphases)]), + ntaps = wrtap + 2 + ) + self.submodules += wrdata_en + + self.comb += dq_oe.eq(wrdata_en.taps[wrtap]) + self.comb += If(self._wlevel_en.storage, dqs_oe.eq(1)).Else(dqs_oe.eq(dqs_preamble | dq_oe | dqs_postamble)) + + # Write DQS Postamble/Preamble Control Path ------------------------------------------------ + # Generates DQS Preamble 1 cycle before the first write and Postamble 1 cycle after the last + # write. During writes, DQS tristate is configured as output for at least 3 sys_clk cycles: + # 1 for Preamble, 1 for the Write and 1 for the Postamble. + self.comb += dqs_preamble.eq( wrdata_en.taps[wrtap - 1] & ~wrdata_en.taps[wrtap + 0]) + self.comb += dqs_postamble.eq(wrdata_en.taps[wrtap + 1] & ~wrdata_en.taps[wrtap + 0]) + +class DFIPhaseAdapter(Module): + # We must perform mapping of DFI commands to the LPDDR4 commands set on CA bus. + # LPDDR4 "small command" consists of 2 words CA[5:0] sent on the bus in 2 subsequent + # cycles. First cycle is marked with CS high, second with CS low. + # Then most "big commands" consist of 2 "small commands" (e.g. ACTIVATE-1, ACTIVATE-2). + # If a command uses 1 "small command", then it shall go as cmd2 so that all command + # timings can be counted from the same moment (cycle of cmd2 CS low). + def __init__(self, dfi_phase): + # CS/CA values for 4 SDR cycles + self.cs = Signal(4) + self.ca = Array([Signal(6) for _ in range(4)]) + self.valid = Signal() + + # # # + + self.submodules.cmd1 = Command(dfi_phase) + self.submodules.cmd2 = Command(dfi_phase) + self.comb += [ + self.cs[:2].eq(self.cmd1.cs), + self.cs[2:].eq(self.cmd2.cs), + self.ca[0].eq(self.cmd1.ca[0]), + self.ca[1].eq(self.cmd1.ca[1]), + self.ca[2].eq(self.cmd2.ca[0]), + self.ca[3].eq(self.cmd2.ca[1]), + ] + + dfi_cmd = Signal(3) + self.comb += dfi_cmd.eq(Cat(~dfi_phase.we_n, ~dfi_phase.ras_n, ~dfi_phase.cas_n)), + _cmd = { # cas, ras, we + "NOP": 0b000, + "ACT": 0b010, + "RD": 0b100, + "WR": 0b101, + "PRE": 0b011, + "REF": 0b110, + "ZQC": 0b001, + "MRS": 0b111, + } + + def cmds(cmd1, cmd2, valid=1): + return self.cmd1.set(cmd1) + self.cmd2.set(cmd2) + [self.valid.eq(valid)] + + self.comb += If(dfi_phase.cs_n == 0, # require dfi.cs_n + Case(dfi_cmd, { + _cmd["ACT"]: cmds("ACTIVATE-1", "ACTIVATE-2"), + _cmd["RD"]: cmds("READ-1", "CAS-2"), + _cmd["WR"]: cmds("WRITE-1", "CAS-2"), # TODO: masked write + _cmd["PRE"]: cmds("DESELECT", "PRECHARGE"), + _cmd["REF"]: cmds("DESELECT", "REFRESH"), + # TODO: ZQC init/short/long? start/latch? + # _cmd["ZQC"]: [ + # *cmds("DESELECT", "MPC"), + # self.cmd2.mpc.eq(0b1001111), + # ], + _cmd["MRS"]: cmds("MRW-1", "MRW-2"), + "default": cmds("DESELECT", "DESELECT", valid=0), + }) + ) + +class Command(Module): + # String description of 1st and 2nd edge of each command, later parsed to construct + # the value. CS is assumed to be H for 1st edge and L for 2nd edge. + TRUTH_TABLE = { + "MRW-1": ["L H H L L OP7", "MA0 MA1 MA2 MA3 MA4 MA5"], + "MRW-2": ["L H H L H OP6", "OP0 OP1 OP2 OP3 OP4 OP5"], + "MRR-1": ["L H H H L V", "MA0 MA1 MA2 MA3 MA4 MA5"], + "REFRESH": ["L L L H L AB", "BA0 BA1 BA2 V V V"], + "ACTIVATE-1": ["H L R12 R13 R14 R15", "BA0 BA1 BA2 R16 R10 R11"], + "ACTIVATE-2": ["H H R6 R7 R8 R9", "R0 R1 R2 R3 R4 R5"], + "WRITE-1": ["L L H L L BL", "BA0 BA1 BA2 V C9 AP"], + "MASK WRITE-1": ["L L H H L BL", "BA0 BA1 BA2 V C9 AP"], + "READ-1": ["L H L L L BL", "BA0 BA1 BA2 V C9 AP"], + "CAS-2": ["L H L L H C8", "C2 C3 C4 C5 C6 C7"], + "PRECHARGE": ["L L L L H AB", "BA0 BA1 BA2 V V V"], + "MPC": ["L L L L L OP6", "OP0 OP1 OP2 OP3 OP4 OP5"], + "DESELECT": ["X X X X X X", "X X X X X X"], + } + + for cmd, (subcmd1, subcmd2) in TRUTH_TABLE.items(): + assert len(subcmd1.split()) == 6, (cmd, subcmd1) + assert len(subcmd2.split()) == 6, (cmd, subcmd2) + + def __init__(self, dfi_phase): + self.cs = Signal(2) + self.ca = Array([Signal(6), Signal(6)]) # CS high, CS low + self.mpc = Signal(7) # special OP values for multipurpose command + self.dfi = dfi_phase + + def set(self, cmd): + ops = [] + for i, description in enumerate(self.TRUTH_TABLE[cmd]): + for j, bit in enumerate(description.split()): + ops.append(self.ca[i][j].eq(self.parse_bit(bit, is_mpc=cmd == "MPC"))) + if cmd != "DESELECT": + ops.append(self.cs[0].eq(1)) + return ops + + def parse_bit(self, bit, is_mpc=False): + rules = { + "H": lambda: 1, # high + "L": lambda: 0, # low + "V": lambda: 0, # defined logic + "X": lambda: 0, # don't care + "BL": lambda: 0, # on-the-fly burst length, not using + "AP": lambda: self.dfi.address[10], # auto precharge + "AB": lambda: self.dfi.address[10], # all banks + "BA(\d+)": lambda i: self.dfi.bank[i], + "R(\d+)": lambda i: self.dfi.address[i], # row + "C(\d+)": lambda i: self.dfi.address[i], # column + "MA(\d+)": lambda i: self.dfi.address[8+i], # mode register address + # mode register value, or op code for MPC + "OP(\d+)": lambda i: self.mpc[i] if is_mpc else self.dfi.address[i], + } + for pattern, value in rules.items(): + m = re.match(pattern, bit) + if m: + args = [int(g) for g in m.groups()] + return value(*args) + raise ValueError(bit) + +# SimulationPHY ------------------------------------------------------------------------------------ + +class LPDDR4SimulationPads(Module): + def __init__(self, databits=16): + self.clk_p = Signal() + self.clk_n = Signal() + self.cke = Signal() + self.odt = Signal() + self.reset_n = Signal() + self.cs = Signal() + self.ca = Signal(6) + # signals for checking actual tristate lines state (PHY reads these) + self.dq = Signal(databits) + self.dqs = Signal(databits//8) + self.dmi = Signal(databits//8) + # internal tristates i/o that should be driven for simulation + self.dq_o = Signal(databits) # PHY drives these + self.dq_i = Signal(databits) # DRAM chip (simulator) drives these + self.dq_oe = Signal() # PHY drives these + self.dqs_o = Signal(databits//8) + self.dqs_i = Signal(databits//8) + self.dqs_oe = Signal() + self.dmi_o = Signal(databits//8) + self.dmi_i = Signal(databits//8) + self.dmi_oe = Signal() + + self.comb += [ + If(self.dq_oe, self.dq.eq(self.dq_o)).Else(self.dq.eq(self.dq_i)), + If(self.dqs_oe, self.dqs.eq(self.dqs_o)).Else(self.dqs.eq(self.dqs_i)), + If(self.dmi_oe, self.dmi.eq(self.dmi_o)).Else(self.dmi.eq(self.dmi_i)), + ] + + +class SimulationPHY(LPDDR4PHY): + def __init__(self, sys_clk_freq=100e6, aligned_reset_zero=False): + pads = LPDDR4SimulationPads() + self.submodules += pads + super().__init__(pads, + sys_clk_freq = sys_clk_freq, + write_ser_latency = Serializer.LATENCY, + read_des_latency = Deserializer.LATENCY, + phytype = "SimulationPHY") + + def add_reset_value(phase, kwargs): + if aligned_reset_zero and phase == 0: + kwargs["reset_value"] = 0 + + # Serialization + def serialize(**kwargs): + name = 'ser_' + kwargs.pop('name', '') + ser = Serializer(o_dw=1, name=name.strip('_'), **kwargs) + self.submodules += ser + + def deserialize(**kwargs): + name = 'des_' + kwargs.pop('name', '') + des = Deserializer(i_dw=1, name=name.strip('_'), **kwargs) + self.submodules += des + + def ser_sdr(phase=0, **kwargs): + clkdiv = {0: "sys8x", 90: "sys8x_90"}[phase] + # clk = {0: "sys", 90: "sys_11_25"}[phase] + clk = {0: "sys", 90: "sys"}[phase] + add_reset_value(phase, kwargs) + serialize(clk=clk, clkdiv=clkdiv, i_dw=8, **kwargs) + + def ser_ddr(phase=0, **kwargs): + # for simulation we require sys8x_ddr clock (=sys16x) + clkdiv = {0: "sys8x_ddr", 90: "sys8x_90_ddr"}[phase] + # clk = {0: "sys", 90: "sys_11_25"}[phase] + clk = {0: "sys", 90: "sys"}[phase] + add_reset_value(phase, kwargs) + serialize(clk=clk, clkdiv=clkdiv, i_dw=16, **kwargs) + + def des_ddr(phase=0, **kwargs): + clkdiv = {0: "sys8x_ddr", 90: "sys8x_90_ddr"}[phase] + clk = {0: "sys", 90: "sys_11_25"}[phase] + add_reset_value(phase, kwargs) + deserialize(clk=clk, clkdiv=clkdiv, o_dw=16, **kwargs) + + # Clock is shifted 180 degrees to get rising edge in the middle of SDR signals. + # To achieve that we send negated clock on clk_p and non-negated on clk_n. + ser_ddr(i=~self.ck_clk, o=self.pads.clk_p, name='clk_p') + ser_ddr(i=self.ck_clk, o=self.pads.clk_n, name='clk_n') + + ser_sdr(i=self.ck_cke, o=self.pads.cke, name='cke') + ser_sdr(i=self.ck_odt, o=self.pads.odt, name='odt') + ser_sdr(i=self.ck_reset_n, o=self.pads.reset_n, name='reset_n') + + # Command/address + ser_sdr(i=self.ck_cs, o=self.pads.cs, name='cs') + for i in range(6): + ser_sdr(i=self.ck_ca[i], o=self.pads.ca[i], name=f'ca{i}') + + # Tristate I/O (separate for simulation) + for i in range(self.databits//8): + ser_ddr(i=self.ck_dmi_o[i], o=self.pads.dmi_o[i], name=f'dmi_o{i}') + des_ddr(o=self.ck_dmi_i[i], i=self.pads.dmi[i], name=f'dmi_i{i}') + ser_ddr(i=self.ck_dqs_o[i], o=self.pads.dqs_o[i], name=f'dqs_o{i}', phase=90) + des_ddr(o=self.ck_dqs_i[i], i=self.pads.dqs[i], name=f'dqs_i{i}', phase=90) + for i in range(self.databits): + ser_ddr(i=self.ck_dq_o[i], o=self.pads.dq_o[i], name=f'dq_o{i}') + des_ddr(o=self.ck_dq_i[i], i=self.pads.dq[i], name=f'dq_i{i}') + # Output enable signals + self.comb += self.pads.dmi_oe.eq(delayed(self, self.dmi_oe, cycles=Serializer.LATENCY)) + self.comb += self.pads.dqs_oe.eq(delayed(self, self.dqs_oe, cycles=Serializer.LATENCY)) + self.comb += self.pads.dq_oe.eq(delayed(self, self.dq_oe, cycles=Serializer.LATENCY)) + +class Serializer(Module): + """Serialize given input signal + + It latches the input data on the rising edge of `clk`. Output data counter `cnt` is incremented + on rising edges of `clkdiv` and it determines current slice of `i` that is presented on `o`. + `latency` is specified in `clk` cycles. + + NOTE: both `clk` and `clkdiv` should be phase aligned. + NOTE: `reset_value` is set to `ratio - 1` so that on the first clock edge after reset it is 0 + """ + LATENCY = 1 + + def __init__(self, clk, clkdiv, i_dw, o_dw, i=None, o=None, reset=None, reset_value=-1, name=None): + assert i_dw > o_dw + assert i_dw % o_dw == 0 + ratio = i_dw // o_dw + + sd_clk = getattr(self.sync, clk) + sd_clkdiv = getattr(self.sync, clkdiv) + + if i is None: i = Signal(i_dw) + if o is None: o = Signal(o_dw) + if reset is None: reset = Signal() + + self.i = i + self.o = o + self.reset = reset + + if reset_value < 0: + reset_value = ratio + reset_value + + cnt = Signal(max=ratio, reset=reset_value, name='{}_cnt'.format(name) if name is not None else None) + sd_clkdiv += If(reset | cnt == ratio - 1, cnt.eq(0)).Else(cnt.eq(cnt + 1)) + + i_d = Signal.like(self.i) + sd_clk += i_d.eq(self.i) + i_array = Array([i_d[n*o_dw:(n+1)*o_dw] for n in range(ratio)]) + self.comb += self.o.eq(i_array[cnt]) + +class Deserializer(Module): + """Deserialize given input signal + + Latches the input data on the rising edges of `clkdiv` and stores them in the `o_pre` buffer. + Additional latency cycle is used to ensure that the last input bit is deserialized correctly. + + NOTE: both `clk` and `clkdiv` should be phase aligned. + NOTE: `reset_value` is set to `ratio - 1` so that on the first clock edge after reset it is 0 + """ + LATENCY = 2 + + def __init__(self, clk, clkdiv, i_dw, o_dw, i=None, o=None, reset=None, reset_value=-1, name=None): + assert i_dw < o_dw + assert o_dw % i_dw == 0 + ratio = o_dw // i_dw + + sd_clk = getattr(self.sync, clk) + sd_clkdiv = getattr(self.sync, clkdiv) + + if i is None: i = Signal(i_dw) + if o is None: o = Signal(o_dw) + if reset is None: reset = Signal() + + self.i = i + self.o = o + self.reset = reset + + if reset_value < 0: + reset_value = ratio + reset_value + + cnt = Signal(max=ratio, reset=reset_value, name='{}_cnt'.format(name) if name is not None else None) + sd_clkdiv += If(reset, cnt.eq(0)).Else(cnt.eq(cnt + 1)) + + o_pre = Signal.like(self.o) + o_array = Array([o_pre[n*i_dw:(n+1)*i_dw] for n in range(ratio)]) + sd_clkdiv += o_array[cnt].eq(self.i) + # we need to ensure that the last bit will be correct if clocks are phase aligned + o_pre_d = Signal.like(self.o) + sd_clk += o_pre_d.eq(o_pre) + sd_clk += self.o.eq(Cat(o_pre_d[:-1], o_pre[-1])) # would work as self.comb (at least in simulation) diff --git a/test/test_lpddr4.py b/test/test_lpddr4.py new file mode 100644 index 0000000..a857fbb --- /dev/null +++ b/test/test_lpddr4.py @@ -0,0 +1,887 @@ +import re +import copy +import pprint +import random +import unittest +import itertools +from collections import defaultdict +from typing import Mapping, Sequence + +from migen import * + +from litedram.phy import dfi +from litedram.phy.lpddr4phy import SimulationPHY, Serializer, Deserializer + +from litex.gen.sim import run_simulation as _run_simulation + + +def bit(n, val): + return (val & (1 << n)) >> n + +def chunks(lst, n): + for i in range(0, len(lst), n): + yield lst[i:i + n] + + +def run_simulation(dut, generators, debug_clocks=False, **kwargs): + # Migen simulator supports reset signals so we could add CRG to start all the signals + # in the same time, however the clock signals will still be visible in the VCD dump + # and the generators we assign to them will still work before reset. For this reason we + # use clocks set up in such a way that we have all the phase aligned clocks start in tick + # 1 (not zero), so that we avoid any issues with clock alignment. + # + # NOTE: On hardware proper reset must be ensured! + # + # The simulation should start like this: + # sys |_-------------- + # sys_11_25 |___------------ + # sys8x |_----____----__ + # sys8x_ddr |_--__--__--__-- + # sys8x_90 |___----____---- + # sys8x_90_ddr |-__--__--__--__ + # + # sys8x_90_ddr does not trigger at the simulation start (not an edge), + # BUT a generator starts before first edge, so a `yield` is needed to wait until the first + # rising edge! + clocks = { + "sys": (64, 31), + "sys_11_25": (64, 29), # aligned to sys8x_90 (phase shift of 11.25) + "sys8x": ( 8, 3), + "sys8x_ddr": ( 4, 1), + "sys8x_90": ( 8, 1), + "sys8x_90_ddr": ( 4, 3), + } + + if debug_clocks: + class DUT(Module): + def __init__(self, dut): + self.submodules.dut = dut + for clk in clocks: + setattr(self.clock_domains, "cd_{}".format(clk), ClockDomain(clk)) + cd = getattr(self, 'cd_{}'.format(clk)) + self.comb += cd.rst.eq(0) + + s = Signal(4, name='dbg_{}'.format(clk)) + sd = getattr(self.sync, clk) + sd += s.eq(s + 1) + dut = DUT(dut) + + _run_simulation(dut, generators, clocks, **kwargs) + + +class TestSimSerializers(unittest.TestCase): + @staticmethod + def data_generator(i, datas): + for data in datas: + yield i.eq(data) + yield + yield i.eq(0) + yield + + @staticmethod + def data_checker(o, datas, n, latency, yield1=False): + if yield1: + yield + for _ in range(latency): + yield + for _ in range(n): + datas.append((yield o)) + yield + yield + + def serializer_test(self, *, data_width, datas, clk, clkdiv, latency, clkgen=None, clkcheck=None, **kwargs): + clkgen = clkgen if clkgen is not None else clk + clkcheck = clkcheck if clkcheck is not None else clkdiv + + received = [] + dut = Serializer(clk=clk, clkdiv=clkdiv, i_dw=data_width, o_dw=1) + generators = { + clkgen: self.data_generator(dut.i, datas), + clkcheck: self.data_checker(dut.o, received, n=len(datas) * data_width, latency=latency * data_width, yield1=True), + } + run_simulation(dut, generators, **kwargs) + + received = list(chunks(received, data_width)) + datas = [[bit(i, d) for i in range(data_width)] for d in datas] + self.assertEqual(received, datas) + + def deserializer_test(self, *, data_width, datas, clk, clkdiv, latency, clkgen=None, clkcheck=None, **kwargs): + clkgen = clkgen if clkgen is not None else clkdiv + clkcheck = clkcheck if clkcheck is not None else clk + + datas = [[bit(i, d) for i in range(data_width)] for d in datas] + + received = [] + dut = Deserializer(clk=clk, clkdiv=clkdiv, i_dw=1, o_dw=data_width) + generators = { + clkgen: self.data_generator(dut.i, itertools.chain(*datas)), + clkcheck: self.data_checker(dut.o, received, n=len(datas), latency=latency), + } + + run_simulation(dut, generators, **kwargs) + + received = [[bit(i, d) for i in range(data_width)] for d in received] + self.assertEqual(received, datas) + + DATA_8 = [0b11001100, 0b11001100, 0b00110011, 0b00110011, 0b10101010] + DATA_16 = [0b1100110011001100, 0b0011001100110011, 0b0101010101010101] + + ARGS_8 = dict( + data_width = 8, + datas = DATA_8, + clk = "sys", + clkdiv = "sys8x", + latency = Serializer.LATENCY, + ) + + ARGS_16 = dict( + data_width = 16, + datas = DATA_16, + clk = "sys", + clkdiv = "sys8x_ddr", + latency = Serializer.LATENCY, + ) + + def _s(default, **kwargs): + def test(self): + new = default.copy() + new.update(kwargs) + self.serializer_test(**new) + return test + + def _d(default, **kwargs): + def test(self): + new = default.copy() + new["latency"] = Deserializer.LATENCY + new.update(kwargs) + self.deserializer_test(**new) + return test + + test_sim_serializer_8 = _s(ARGS_8) + test_sim_serializer_8_phase90 = _s(ARGS_8, clk="sys_11_25", clkdiv="sys8x_90") + # when clkgen and clk are not phase aligned (clk is delayed), there will be lower latency + test_sim_serializer_8_phase90_gen0 = _s(ARGS_8, clk="sys_11_25", clkdiv="sys8x_90", clkgen="sys", latency=Serializer.LATENCY - 1) + test_sim_serializer_8_phase90_check0 = _s(ARGS_8, clk="sys_11_25", clkdiv="sys8x_90", clkcheck="sys8x") + + test_sim_serializer_16 = _s(ARGS_16) + test_sim_serializer_16_phase90 = _s(ARGS_16, clk="sys_11_25", clkdiv="sys8x_90_ddr") + test_sim_serializer_16_phase90_gen0 = _s(ARGS_16, clk="sys_11_25", clkdiv="sys8x_90_ddr", clkgen="sys", latency=Serializer.LATENCY - 1) + test_sim_serializer_16_phase90_check0 = _s(ARGS_16, clk="sys_11_25", clkdiv="sys8x_90_ddr", clkcheck="sys8x_ddr") + + # for phase aligned clocks the latency will be bigger (preferably avoid phase aligned reading?) + test_sim_deserializer_8 = _d(ARGS_8, latency=Deserializer.LATENCY + 1) + test_sim_deserializer_8_check90 = _d(ARGS_8, clkcheck="sys_11_25") + test_sim_deserializer_8_gen90_check90 = _d(ARGS_8, clkcheck="sys_11_25", clkgen="sys8x_90") + test_sim_deserializer_8_phase90 = _d(ARGS_8, clk="sys_11_25", clkdiv="sys8x_90", latency=Deserializer.LATENCY + 1) + test_sim_deserializer_8_phase90_check0 = _d(ARGS_8, clk="sys_11_25", clkdiv="sys8x_90", clkcheck="sys", latency=Deserializer.LATENCY + 1) + + test_sim_deserializer_16 = _d(ARGS_16, latency=Deserializer.LATENCY + 1) + test_sim_deserializer_16_check90 = _d(ARGS_16, clkcheck="sys_11_25") + test_sim_deserializer_16_gen90_check90 = _d(ARGS_16, clkcheck="sys_11_25", clkgen="sys8x_90_ddr") + test_sim_deserializer_16_phase90 = _d(ARGS_16, clk="sys_11_25", clkdiv="sys8x_90_ddr", latency=Deserializer.LATENCY + 1) + test_sim_deserializer_16_phase90_check0 = _d(ARGS_16, clk="sys_11_25", clkdiv="sys8x_90_ddr", clkcheck="sys", latency=Deserializer.LATENCY + 1) + + +BOLD = '\033[1m' +HIGHLIGHT = '\033[91m' +CLEAR = '\033[0m' + +def highlight(s, hl=True): + return BOLD + (HIGHLIGHT if hl else '') + s + CLEAR + + +class PadsHistory(defaultdict): + def __init__(self): + super().__init__(str) + + def format(self, hl_cycle=None, hl_signal=None, underline_cycle=False, key_strw=None): + if key_strw is None: + key_strw = max(len(k) for k in self) + lines = [] + for k in self: + vals = list(self[k]) + if hl_cycle is not None and hl_signal is not None: + vals = [highlight(val, hl=hl_signal == k) if i == hl_cycle else val + for i, val in enumerate(vals)] + hist = ' '.join(''.join(chunk) for chunk in chunks(vals, 8)) + line = '{:{n}} {}'.format(k + ':', hist, n=key_strw+1) + lines.append(line) + if underline_cycle: + assert hl_cycle is not None + n = hl_cycle + hl_cycle//8 + line = ' ' * (key_strw+1) + ' ' + ' ' * n + '^' + lines.append(line) + if hl_signal is not None and hl_cycle is None: + keys = list(self.keys()) + sig_i = keys.index(hl_signal) + lines = ['{} {}'.format('>' if i == sig_i else ' ', line) for i, line in enumerate(lines)] + return '\n'.join(lines) + + @staticmethod + def width_for(histories): + keys = itertools.chain.from_iterable(h.keys() for h in histories) + return max(len(k) for k in keys) + +class PadChecker: + def __init__(self, pads, signals: Mapping[str, str]): + # signals: {sig: values}, values: a string of '0'/'1'/'x'/' ' + self.pads = pads + self.signals = signals + self.history = PadsHistory() # registered values + self.ref_history = PadsHistory() # expected values + + assert all(v in '01x' for values in signals.values() for v in values) + + lengths = [len(vals) for vals in signals.values()] + assert all(l == lengths[0] for l in lengths) + + @property + def length(self): + values = list(self.signals.values()) + return len(values[0]) if values else 1 + + def run(self): + for i in range(self.length): + for sig, vals in self.signals.items(): + # transform numbered signal names to pad indicies (e.g. dq1 -> dq[1]) + m = re.match(r'([a-zA-Z_]+)(\d+)', sig) + pad = getattr(self.pads, m.group(1))[int(m.group(2))] if m else getattr(self.pads, sig) + + # save the value at current cycle + val = vals[i] + self.history[sig] += str((yield pad)) + self.ref_history[sig] += val + yield + + def find_error(self, start=0): + for i in range(start, self.length): + for sig in self.history: + val = self.history[sig][i] + ref = self.ref_history[sig][i] + if ref != 'x' and val != ref: + return (i, sig, val, ref) + return None + + def summary(self, **kwargs): + error = self.find_error() + cycle, sig = None, None + if error is not None: + cycle, sig, val, ref = error + lines = [] + lines.append(self.history.format(hl_cycle=cycle, hl_signal=sig, **kwargs)) + lines.append('vs ref:') + lines.append(self.ref_history.format(hl_cycle=cycle, hl_signal=sig, **kwargs)) + return '\n'.join(lines) + + @staticmethod + def assert_ok(test_case, clock_checkers): + # clock_checkers: {clock: PadChecker(...), ...} + errors = list(filter(None, [c.find_error() for c in clock_checkers.values()])) + if errors: + all_histories = [c.history for c in clock_checkers.values()] + all_histories += [c.ref_history for c in clock_checkers.values()] + key_strw = PadsHistory.width_for(all_histories) + summaries = ['{}\n{}'.format(highlight(clock, hl=False), checker.summary(key_strw=key_strw)) + for clock, checker in clock_checkers.items()] + first_error = min(errors, key=lambda e: e[0]) # first error + i, sig, val, ref = first_error + msg = f'Cycle {i} Signal `{sig}`: {val} vs {ref}\n' + test_case.assertEqual(val, ref, msg=msg + '\n'.join(summaries)) + +def dfi_names(cmd=True, wrdata=True, rddata=True): + names = [] + if cmd: names += [name for name, _, _ in dfi.phase_cmd_description(1, 1, 1)] + if wrdata: names += [name for name, _, _ in dfi.phase_wrdata_description(16)] + if rddata: names += [name for name, _, _ in dfi.phase_rddata_description(16)] + return names + + +class DFIPhaseValues(dict): + """Dictionary {dfi_signal_name: value}""" + def __init__(self, **kwargs): + # widths are not important + names = dfi_names() + for sig in kwargs: + assert sig in names + super().__init__(**kwargs) + + +class DFISequencer: + Cycle = int + DFIPhase = int + DFISequence = Sequence[Mapping[DFIPhase, DFIPhaseValues]] + + def __init__(self, sequence: DFISequence = []): + # sequence: [{phase: {sig: value}}] + self.sequence = [] # generated on DFI + self.read_sequence = [] # read from DFI + self.expected_sequence = [] # expected to read from DFI + + # split sequence into read/write + for cycle in sequence: + read = {} + write = {} + for p, phase in cycle.items(): + read[p] = DFIPhaseValues() + write[p] = DFIPhaseValues() + for sig, val in phase.items(): + is_write = sig in dfi_names(rddata=False) + ["rddata_en"] + target = write[p] if is_write else read[p] + target[sig] = val + self.sequence.append(write) + self.expected_sequence.append(read) + + def add(self, dfi_cycle: Mapping[DFIPhase, DFIPhaseValues]): + self.sequence.append(dfi_cycle) + + def _dfi_reset_values(self): + return {sig: 1 if sig.endswith("_n") else 0 for sig in dfi_names()} + + def _reset(self, dfi): + for phase in dfi.phases: + for sig, val in self._dfi_reset_values().items(): + yield getattr(phase, sig).eq(val) + + def assert_ok(self, test_case): + # expected: should contain only input signals + names = ["rddata", "rddata_valid"] + for cyc, (read, expected) in enumerate(zip(self.read_sequence, self.expected_sequence)): + for p in expected: + for sig in expected[p]: + assert sig in names, f"`{sig}` is not DFI input signal" + val = read[p][sig] + ref = expected[p][sig] + if sig in ["wrdata", "rddata"]: + err = f"Cycle {cyc} signal `{sig}`: 0x{val:08x} vs 0x{ref:08x}" + else: + err = f"Cycle {cyc} signal `{sig}`: {val:} vs {ref}" + err += "\nread: \n{}".format(pprint.pformat(self.read_sequence)) + err += "\nexpected: \n{}".format(pprint.pformat(self.expected_sequence)) + test_case.assertEqual(val, ref, msg=err) + + def generator(self, dfi): + names = dfi_names(cmd=True, wrdata=True, rddata=False) + ["rddata_en"] + for per_phase in self.sequence: + # reset in case of any previous changes + (yield from self._reset(dfi)) + # set values + for phase, values in per_phase.items(): + for sig, val in values.items(): + assert sig in names, f"`{sig}` is not DFI output signal" + yield getattr(dfi.phases[phase], sig).eq(val) + yield + (yield from self._reset(dfi)) + yield + + def reader(self, dfi): + yield # do not include data read on start (a.k.a. cycle=-1) + for _ in range(len(self.expected_sequence)): + phases = {} + for i, p in enumerate(dfi.phases): + values = DFIPhaseValues(rddata_en=(yield p.rddata_en), rddata=(yield p.rddata), + rddata_valid=(yield p.rddata_valid)) + phases[i] = values + self.read_sequence.append(phases) + yield + + +def dfi_data_to_dq(dq_i, dfi_phases, dfi_name, nphases=8): + # data on DQ should go in a pattern: + # dq0: p0.wrdata[0], p0.wrdata[16], p1.wrdata[0], p1.wrdata[16], ... + # dq1: p0.wrdata[1], p0.wrdata[17], p1.wrdata[1], p1.wrdata[17], ... + for p in range(nphases): + data = dfi_phases[p][dfi_name] + yield bit(0 + dq_i, data) + yield bit(16 + dq_i, data) + +def dq_pattern(i, dfi_data, dfi_name): + return ''.join(str(v) for v in dfi_data_to_dq(i, dfi_data, dfi_name)) + + +class TestLPDDR4(unittest.TestCase): + CMD_LATENCY = 2 + + def run_test(self, dut, dfi_sequence, pad_checkers: Mapping[str, Mapping[str, str]], pad_generators=None, **kwargs): + # pad_checkers: {clock: {sig: values}} + dfi = DFISequencer(dfi_sequence) + checkers = {clk: PadChecker(dut.pads, pad_signals) for clk, pad_signals in pad_checkers.items()} + generators = defaultdict(list) + generators["sys"].append(dfi.generator(dut.dfi)) + generators["sys"].append(dfi.reader(dut.dfi)) + for clock, checker in checkers.items(): + generators[clock].append(checker.run()) + pad_generators = pad_generators or {} + for clock, gens in pad_generators.items(): + gens = gens if isinstance(gens, list) else [gens] + for gen in gens: + generators[clock].append(gen(dut.pads)) + run_simulation(dut, generators, **kwargs) + PadChecker.assert_ok(self, checkers) + dfi.assert_ok(self) + + def test_lpddr4_cs_phase_0(self): + # Test that CS is serialized correctly when sending command on phase 0 + latency = '00000000' * self.CMD_LATENCY + self.run_test(SimulationPHY(), + dfi_sequence = [ + {0: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1)}, # p0: READ + ], + pad_checkers = {"sys8x_90": { + 'cs': latency + '10100000', + }}, + ) + + def test_lpddr4_clk(self): + # Test clock serialization, first few cycles are undefined so ignore them + latency = 'xxxxxxxx' * self.CMD_LATENCY + self.run_test(SimulationPHY(), + dfi_sequence = [ + {3: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1)}, + ], + pad_checkers = {"sys8x_90_ddr": { + 'clk_p': latency + '01010101' * 3, + }}, + ) + + def test_lpddr4_cs_multiple_phases(self): + # Test that CS is serialized on different phases and that overlapping commands are handled + latency = '00000000' * self.CMD_LATENCY + self.run_test(SimulationPHY(), + dfi_sequence = [ + {0: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1)}, + {3: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1)}, + { + 1: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1), + 4: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1), # should be ignored + }, + { + 1: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1), + 5: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1), # should NOT be ignored + }, + {6: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1)}, # crosses cycle boundaries + {0: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1)}, # should be ignored + {2: dict(cs_n=1, cas_n=0, ras_n=1, we_n=1)}, # ignored due to cs_n=1 + ], + pad_checkers = {"sys8x_90": { + 'cs': latency + ''.join([ + '10100000', # p0 + '00010100', # p3 + '01010000', # p1, p4 ignored + '01010101', # p1, p5 + '00000010', # p6 (cyc 0) + '10000000', # p6 (cyc 1), p0 ignored + '00000000', # p2 ignored + ]) + }}, + ) + + def test_lpddr4_ca_sequencing(self): + # Test proper serialization of commands to CA pads and that overlapping commands are handled + latency = '00000000' * self.CMD_LATENCY + read = dict(cs_n=0, cas_n=0, ras_n=1, we_n=1) + self.run_test(SimulationPHY(), + dfi_sequence = [ + {0: read, 3: read}, # p4 should be ignored + {0: read, 4: read}, + {6: read}, + {0: read}, # ignored + ], + pad_checkers = {"sys8x_90": { + 'cs': latency + '10100000' + '10101010' + '00000010' + '10000000', + 'ca0': latency + '00000000' + '00000000' + '00000000' + '00000000', + 'ca1': latency + '10100000' + '10101010' + '00000010' + '10000000', + 'ca2': latency + '00000000' + '00000000' + '00000000' + '00000000', + 'ca3': latency + '0x000000' + '0x000x00' + '0000000x' + '00000000', + 'ca4': latency + '00100000' + '00100010' + '00000000' + '10000000', + 'ca5': latency + '00000000' + '00000000' + '00000000' + '00000000', + }}, + ) + + def test_lpddr4_ca_addressing(self): + # Test that bank/address for different commands are correctly serialized to CA pads + latency = '00000000' * self.CMD_LATENCY + read = dict(cs_n=0, cas_n=0, ras_n=1, we_n=1, bank=0b101, address=0b1100110011) # actually invalid because CA[1:0] should always be 0 + write_ap = dict(cs_n=0, cas_n=0, ras_n=1, we_n=0, bank=0b111, address=0b10000000000) + activate = dict(cs_n=0, cas_n=1, ras_n=0, we_n=1, bank=0b010, address=0b11110000111100001) + refresh_ab = dict(cs_n=0, cas_n=0, ras_n=0, we_n=1, bank=0b100, address=0b10000000000) + precharge = dict(cs_n=0, cas_n=1, ras_n=0, we_n=0, bank=0b011, address=0) + mrw = dict(cs_n=0, cas_n=0, ras_n=0, we_n=0, bank=0, address=(0b110011 << 8) | 0b10101010) # 6-bit address | 8-bit op code + self.run_test(SimulationPHY(), + dfi_sequence = [ + {0: read, 4: write_ap}, + {0: activate, 4: refresh_ab}, + {0: precharge, 4: mrw}, + ], + pad_checkers = {"sys8x_90": { + # note that refresh and precharge have a single command so these go as cmd2 + # rd wr act ref pre mrw + 'cs': latency + '1010'+'1010' + '1010'+'0010' + '0010'+'1010', + 'ca0': latency + '0100'+'0100' + '1011'+'0000' + '0001'+'0100', + 'ca1': latency + '1010'+'0110' + '0110'+'0000' + '0001'+'1111', + 'ca2': latency + '0101'+'1100' + '0010'+'0001' + '0000'+'1010', + 'ca3': latency + '0x01'+'0x00' + '1110'+'001x' + '000x'+'0001', + 'ca4': latency + '0110'+'0010' + '1010'+'000x' + '001x'+'0110', + 'ca5': latency + '0010'+'0100' + '1001'+'001x' + '000x'+'1101', + }}, + ) + + def test_lpddr4_command_pads(self): + # Test serialization of DFI command pins (cs/cke/odt/reset_n) + latency = '00000000' * self.CMD_LATENCY + read = dict(cs_n=0, cas_n=0, ras_n=1, we_n=1) + self.run_test(SimulationPHY(), + dfi_sequence = [ + { + 0: dict(cke=1, odt=1, reset_n=1, **read), + 2: dict(cke=0, odt=1, reset_n=0, **read), + 3: dict(cke=1, odt=0, reset_n=0, **read), + 5: dict(cke=0, odt=1, reset_n=1, **read), + 7: dict(cke=0, odt=0, reset_n=0, **read), + }, + ], + pad_checkers = {"sys8x_90": { + 'cs': latency + '10100101', # p2, p3, p7 ignored + 'cke': latency + '10010000', + 'odt': latency + '10100100', + 'reset_n': latency + '11001110', + }}, + ) + + def test_lpddr4_dq_out(self): + # Test serialization of dfi wrdata to DQ pads + dut = SimulationPHY() + zero = '00000000' * 2 # zero for 1 sysclk clock in sys8x_ddr clock domain + + dfi_data = { + 0: dict(wrdata=0x11112222), + 1: dict(wrdata=0x33334444), + 2: dict(wrdata=0x55556666), + 3: dict(wrdata=0x77778888), + 4: dict(wrdata=0x9999aaaa), + 5: dict(wrdata=0xbbbbcccc), + 6: dict(wrdata=0xddddeeee), + 7: dict(wrdata=0xffff0000), + } + dfi_wrdata_en = {0: dict(wrdata_en=1)} # wrdata_en=1 required on any single phase + + self.run_test(dut, + dfi_sequence = [dfi_wrdata_en, {}, dfi_data], + pad_checkers = {"sys8x_90_ddr": { + f'dq{i}': (self.CMD_LATENCY+1)*zero + zero + dq_pattern(i, dfi_data, "wrdata") + zero for i in range(16) + }}, + ) + + def test_lpddr4_dq_only_1cycle(self): + # Test that DQ data is sent to pads only during expected cycle, on other cycles there is no data + dut = SimulationPHY() + zero = '00000000' * 2 + + dfi_data = { + 0: dict(wrdata=0x11112222), + 1: dict(wrdata=0x33334444), + 2: dict(wrdata=0x55556666), + 3: dict(wrdata=0x77778888), + 4: dict(wrdata=0x9999aaaa), + 5: dict(wrdata=0xbbbbcccc), + 6: dict(wrdata=0xddddeeee), + 7: dict(wrdata=0xffff0000), + } + dfi_wrdata_en = copy.deepcopy(dfi_data) + dfi_wrdata_en[0].update(dict(wrdata_en=1)) + + self.run_test(dut, + dfi_sequence = [dfi_wrdata_en, dfi_data, dfi_data], + pad_checkers = {"sys8x_90_ddr": { + f'dq{i}': (self.CMD_LATENCY+1)*zero + zero + dq_pattern(i, dfi_data, "wrdata") + zero for i in range(16) + }}, + ) + + def test_lpddr4_dqs(self): + # Test serialization of DQS pattern in relation to DQ data, with proper preamble and postamble + zero = '00000000' * 2 + + self.run_test(SimulationPHY(), + dfi_sequence = [ + {0: dict(wrdata_en=1)}, + {}, + { # to get 10101010... pattern on dq0 and only 1s on others + 0: dict(wrdata=0xfffeffff), + 1: dict(wrdata=0xfffeffff), + 2: dict(wrdata=0xfffeffff), + 3: dict(wrdata=0xfffeffff), + 4: dict(wrdata=0xfffeffff), + 5: dict(wrdata=0xfffeffff), + 6: dict(wrdata=0xfffeffff), + 7: dict(wrdata=0xfffeffff), + }, + ], + pad_checkers = { + "sys8x_90_ddr": { + 'dq0': (self.CMD_LATENCY+1)*zero + '00000000'+'00000000' + '10101010'+'10101010' + '00000000'+'00000000' + zero, + 'dq1': (self.CMD_LATENCY+1)*zero + '00000000'+'00000000' + '11111111'+'11111111' + '00000000'+'00000000' + zero, + }, + "sys8x_ddr": { # preamble, pattern, preamble + 'dqs0': (self.CMD_LATENCY+1)*zero + '01010101'+'01010100' + '01010101'+'01010101' + '00010101'+'01010101' + zero, + 'dqs1': (self.CMD_LATENCY+1)*zero + '01010101'+'01010100' + '01010101'+'01010101' + '00010101'+'01010101' + zero, + } + }, + ) + + def test_lpddr4_dmi_no_mask(self): + # Test proper output on DMI pads. We don't implement masking now, so nothing should be sent to DMI pads + zero = '00000000' * 2 + + self.run_test(SimulationPHY(), + dfi_sequence = [ + {0: dict(wrdata_en=1)}, + {}, + { + 0: dict(wrdata=0xffffffff), + 1: dict(wrdata=0xffffffff), + 2: dict(wrdata=0xffffffff), + 3: dict(wrdata=0xffffffff), + 4: dict(wrdata=0xffffffff), + 5: dict(wrdata=0xffffffff), + 6: dict(wrdata=0xffffffff), + 7: dict(wrdata=0xffffffff), + }, + ], + pad_checkers = { + "sys8x_90_ddr": { + 'dq0': (self.CMD_LATENCY+1)*zero + zero + '11111111'+'11111111' + 2*zero, + }, + "sys8x_ddr": { + 'dmi0': (self.CMD_LATENCY+1)*zero + (3 + 1)*zero, + 'dmi1': (self.CMD_LATENCY+1)*zero + (3 + 1)*zero, + } + }, + ) + + def test_lpddr4_dq_in_rddata_valid(self): + # Test that rddata_valid is set with correct delay + read_latency = 8 # settings.read_latency + dfi_sequence = [ + {0: dict(rddata_en=1)}, # command is issued by MC (appears on next cycle) + *[{p: dict(rddata_valid=0) for p in range(8)} for _ in range(read_latency - 1)], # nothing is sent during write latency + {p: dict(rddata_valid=1) for p in range(8)}, + {}, + ] + + self.run_test(SimulationPHY(), + dfi_sequence = dfi_sequence, + pad_checkers = {}, + pad_generators = {}, + ) + + def test_lpddr4_dq_in_rddata(self): + # Test that data on DQ pads is deserialized correctly to DFI rddata. + # We assume that when there are no commands, PHY will still still deserialize the data, + # which is generally true (tristate oe is 0 whenever we are not writing). + dfi_data = { + 0: dict(rddata=0x11112222), + 1: dict(rddata=0x33334444), + 2: dict(rddata=0x55556666), + 3: dict(rddata=0x77778888), + 4: dict(rddata=0x9999aaaa), + 5: dict(rddata=0xbbbbcccc), + 6: dict(rddata=0xddddeeee), + 7: dict(rddata=0xffff0000), + } + + def sim_dq(pads): + for _ in range(16 * 1): # wait 1 sysclk cycle + yield + for cyc in range(16): # send a burst of data on pads + for bit in range(16): + yield pads.dq_i[bit].eq(int(dq_pattern(bit, dfi_data, "rddata")[cyc])) + yield + for bit in range(16): + yield pads.dq_i[bit].eq(0) + yield + + read_des_delay = 3 # phy.read_des_delay + dfi_sequence = [ + {}, # wait 1 sysclk cycle + *[{} for _ in range(read_des_delay)], + dfi_data, + {}, + ] + + self.run_test(SimulationPHY(), + dfi_sequence = dfi_sequence, + pad_checkers = {}, + pad_generators = { + "sys8x_90_ddr": sim_dq, + }, + ) + + def test_lpddr4_cmd_write(self): + # Test whole WRITE command sequence verifying data on pads and write_latency from MC perspective + phy = SimulationPHY() + zero = '00000000' * 2 + write_latency = phy.settings.write_latency + wrphase = phy.settings.wrphase.reset.value + + dfi_data = { + 0: dict(wrdata=0x11112222), + 1: dict(wrdata=0x33334444), + 2: dict(wrdata=0x55556666), + 3: dict(wrdata=0x77778888), + 4: dict(wrdata=0x9999aaaa), + 5: dict(wrdata=0xbbbbcccc), + 6: dict(wrdata=0xddddeeee), + 7: dict(wrdata=0xffff0000), + } + dfi_sequence = [ + {wrphase: dict(cs_n=0, cas_n=0, ras_n=1, we_n=0, wrdata_en=1)}, + *[{} for _ in range(write_latency - 1)], + dfi_data, + {}, + {}, + {}, + {}, + {}, + ] + + self.run_test(phy, + dfi_sequence = dfi_sequence, + pad_checkers = { + "sys8x_90": { + "cs": "00000000"*2 + "00001010" + "00000000"*2, + "ca0": "00000000"*2 + "00000000" + "00000000"*2, + "ca1": "00000000"*2 + "00000010" + "00000000"*2, + "ca2": "00000000"*2 + "00001000" + "00000000"*2, + "ca3": "00000000"*2 + "00000000" + "00000000"*2, + "ca4": "00000000"*2 + "00000010" + "00000000"*2, + "ca5": "00000000"*2 + "00000000" + "00000000"*2, + }, + "sys8x_90_ddr": { + f'dq{i}': (self.CMD_LATENCY+1)*zero + zero + dq_pattern(i, dfi_data, "wrdata") + zero + for i in range(16) + }, + "sys8x_ddr": { + "dqs0": (self.CMD_LATENCY+1)*zero + '01010101'+'01010100' + '01010101'+'01010101' + '00010101'+'01010101' + zero, + }, + }, + ) + + def test_lpddr4_cmd_read(self): + # Test whole READ command sequence simulating DRAM response and verifying read_latency from MC perspective + phy = SimulationPHY() + zero = '00000000' * 2 + read_latency = phy.settings.read_latency + rdphase = phy.settings.rdphase.reset.value + + dfi_data = { + 0: dict(rddata=0x11112222, rddata_valid=1), + 1: dict(rddata=0x33334444, rddata_valid=1), + 2: dict(rddata=0x55556666, rddata_valid=1), + 3: dict(rddata=0x77778888, rddata_valid=1), + 4: dict(rddata=0x9999aaaa, rddata_valid=1), + 5: dict(rddata=0xbbbbcccc, rddata_valid=1), + 6: dict(rddata=0xddddeeee, rddata_valid=1), + 7: dict(rddata=0xffff0000, rddata_valid=1), + } + dfi_sequence = [ + {rdphase: dict(cs_n=0, cas_n=0, ras_n=1, we_n=1, rddata_en=1)}, + *[{} for _ in range(read_latency - 1)], + dfi_data, + {}, + {}, + {}, + {}, + {}, + ] + + class Simulator: + def __init__(self, dfi_data, test_case, cl): + self.dfi_data = dfi_data + self.read_cmd = False + self.test_case = test_case + self.cl = cl + + @passive + def cmd_checker(self, pads): + # Monitors CA/CS for a READ command + read = [ + 0b000010, # READ-1 (1) BL=0 + 0b000000, # READ-1 (2) BA=0, C9=0, AP=0 + 0b010010, # CAS-2 (1) C8=0 + 0b000000, # CAS-2 (2) C=0 + ] + + def check_ca(i): + err = "{}: CA = 0b{:06b}, expected = 0b{:06b}".format(i, (yield pads.ca), read[i]) + self.test_case.assertEqual((yield pads.ca), read[i], msg=err) + + while True: + while not (yield pads.cs): + yield + yield from check_ca(0) + yield + yield from check_ca(1) + yield + self.test_case.assertEqual((yield pads.cs), 1, msg="Found CS on 1st cycle but not on 3rd cycle") + yield from check_ca(2) + yield + yield from check_ca(3) + self.read_cmd = True + + @passive + def dq_generator(self, pads): + # After a READ command is received, wait CL and send data + while True: + while not self.read_cmd: + yield + dfi_data = self.dfi_data.pop(0) + for _ in range(2*self.cl + 1): + yield + self.read_cmd = False + for cyc in range(16): + for bit in range(16): + yield pads.dq_i[bit].eq(int(dq_pattern(bit, dfi_data, "rddata")[cyc])) + yield + for bit in range(16): + yield pads.dq_i[bit].eq(0) + + @passive + def dqs_generator(self, pads): + # After a READ command is received, wait CL and send data strobe + while True: + while not self.read_cmd: + yield + for _ in range(2*self.cl - 1): # DQS to transmit DQS preamble + yield + for cyc in range(16 + 1): # send a burst of data on pads + for bit in range(2): + yield pads.dqs_i[bit].eq(int((cyc + 1) % 2)) + yield + for bit in range(2): + yield pads.dqs_i[bit].eq(0) + + sim = Simulator([dfi_data], self, cl=14) + self.run_test(phy, + dfi_sequence = dfi_sequence, + pad_checkers = { + "sys8x_90": { + "cs": "00000000"*2 + rdphase*"0" + "1010" + "00000000"*2, + "ca0": "00000000"*2 + rdphase*"0" + "0000" + "00000000"*2, + "ca1": "00000000"*2 + rdphase*"0" + "1010" + "00000000"*2, + "ca2": "00000000"*2 + rdphase*"0" + "0000" + "00000000"*2, + "ca3": "00000000"*2 + rdphase*"0" + "0000" + "00000000"*2, + "ca4": "00000000"*2 + rdphase*"0" + "0010" + "00000000"*2, + "ca5": "00000000"*2 + rdphase*"0" + "0000" + "00000000"*2, + }, + "sys8x_90_ddr": { #? + f'dq{i}': (self.CMD_LATENCY+2)*zero + zero + dq_pattern(i, dfi_data, "rddata") + zero + for i in range(16) + }, + "sys8x_ddr": { + "dqs0": (self.CMD_LATENCY+2)*zero + '00000000'+'00000001' + '01010101'+'01010101' + zero, + }, + }, + pad_generators = { + "sys8x_ddr": [sim.dq_generator, sim.dqs_generator], + "sys8x_90": sim.cmd_checker, + }, + )