From 2f15f3748e8dd17d1b0a36598fbbe8dd9b0e38aa Mon Sep 17 00:00:00 2001 From: Florent Kermarrec Date: Mon, 21 Mar 2016 19:30:47 +0100 Subject: [PATCH] test: use new simulator (still etherbone_tb and mac_wishbone_tb not working due to use of FullMemoryWE) --- test/arp_tb.py | 52 ++++++------- test/etherbone_tb.py | 163 +++++++++++++++++++-------------------- test/icmp_tb.py | 49 ++++++------ test/ip_tb.py | 55 ++++++-------- test/mac_core_tb.py | 66 ++++++++-------- test/mac_wishbone_tb.py | 164 ++++++++++++++++++++-------------------- test/model/phy.py | 2 +- test/udp_tb.py | 59 +++++++-------- 8 files changed, 292 insertions(+), 318 deletions(-) diff --git a/test/arp_tb.py b/test/arp_tb.py index 5e47da8..dd27dc3 100644 --- a/test/arp_tb.py +++ b/test/arp_tb.py @@ -1,5 +1,4 @@ from litex.gen import * -from litex.gen.sim.generic import run_simulation from litex.soc.interconnect import wishbone from litex.soc.interconnect.stream_sim import * @@ -23,36 +22,31 @@ class TB(Module): self.submodules.mac = LiteEthMAC(self.phy_model, dw=8, with_preamble_crc=True) self.submodules.arp = LiteEthARP(self.mac, mac_address, ip_address, 100000) - # use sys_clk for each clock_domain - self.clock_domains.cd_eth_rx = ClockDomain() - self.clock_domains.cd_eth_tx = ClockDomain() - self.comb += [ - self.cd_eth_rx.clk.eq(ClockSignal()), - self.cd_eth_rx.rst.eq(ResetSignal()), - self.cd_eth_tx.clk.eq(ClockSignal()), - self.cd_eth_tx.rst.eq(ResetSignal()), - ] - def gen_simulation(self, selfp): - selfp.cd_eth_rx.rst = 1 - selfp.cd_eth_tx.rst = 1 +def main_generator(dut): + while (yield dut.arp.table.request.ready) != 1: + yield dut.arp.table.request.valid.eq(1) + yield dut.arp.table.request.ip_address.eq(0x12345678) yield - selfp.cd_eth_rx.rst = 0 - selfp.cd_eth_tx.rst = 0 - - for i in range(100): - yield - - while selfp.arp.table.request.ready != 1: - selfp.arp.table.request.valid = 1 - selfp.arp.table.request.ip_address = 0x12345678 - yield - selfp.arp.table.request.valid = 0 - while selfp.arp.table.response.valid != 1: - selfp.arp.table.response.ready = 1 - yield - print("Received MAC : 0x{:12x}".format(selfp.arp.table.response.mac_address)) + yield dut.arp.table.request.valid.eq(0) + while (yield dut.arp.table.response.valid) != 1: + yield dut.arp.table.response.ready.eq(1) + yield + print("Received MAC : 0x{:12x}".format((yield dut.arp.table.response.mac_address))) + # XXX: find a way to exit properly + import sys + sys.exit() if __name__ == "__main__": - run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True) + tb = TB() + generators = { + "sys" : [main_generator(tb)], + "eth_tx": [tb.phy_model.phy_sink.generator(), + tb.phy_model.generator()], + "eth_rx": tb.phy_model.phy_source.generator() + } + clocks = {"sys": 10, + "eth_rx": 10, + "eth_tx": 10} + run_simulation(tb, generators, clocks, vcd_name="sim.vcd") diff --git a/test/etherbone_tb.py b/test/etherbone_tb.py index 4e0d81f..01c6b86 100644 --- a/test/etherbone_tb.py +++ b/test/etherbone_tb.py @@ -1,5 +1,4 @@ from litex.gen import * -from litex.gen.sim.generic import run_simulation from litex.soc.interconnect import wishbone from litex.soc.interconnect.stream_sim import * @@ -16,12 +15,12 @@ mac_address = 0x12345678abcd class TB(Module): def __init__(self): - self.submodules.phy_model = phy.PHY(8, debug=False) - self.submodules.mac_model = mac.MAC(self.phy_model, debug=False, loopback=False) - self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=False) - self.submodules.ip_model = ip.IP(self.mac_model, mac_address, ip_address, debug=False, loopback=False) - self.submodules.udp_model = udp.UDP(self.ip_model, ip_address, debug=False, loopback=False) - self.submodules.etherbone_model = etherbone.Etherbone(self.udp_model, debug=False) + self.submodules.phy_model = phy.PHY(8, debug=True) + self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=False) + self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=True) + self.submodules.ip_model = ip.IP(self.mac_model, mac_address, ip_address, debug=True, loopback=False) + self.submodules.udp_model = udp.UDP(self.ip_model, ip_address, debug=True, loopback=False) + self.submodules.etherbone_model = etherbone.Etherbone(self.udp_model, debug=True) self.submodules.core = LiteEthUDPIPCore(self.phy_model, mac_address, ip_address, 100000) self.submodules.etherbone = LiteEthEtherbone(self.core.udp, 20000) @@ -29,91 +28,87 @@ class TB(Module): self.submodules.sram = wishbone.SRAM(1024) self.submodules.interconnect = wishbone.InterconnectPointToPoint(self.etherbone.master.bus, self.sram.bus) - # use sys_clk for each clock_domain - self.clock_domains.cd_eth_rx = ClockDomain() - self.clock_domains.cd_eth_tx = ClockDomain() - self.comb += [ - self.cd_eth_rx.clk.eq(ClockSignal()), - self.cd_eth_rx.rst.eq(ResetSignal()), - self.cd_eth_tx.clk.eq(ClockSignal()), - self.cd_eth_tx.rst.eq(ResetSignal()), - ] - def gen_simulation(self, selfp): - selfp.cd_eth_rx.rst = 1 - selfp.cd_eth_tx.rst = 1 - yield - selfp.cd_eth_rx.rst = 0 - selfp.cd_eth_tx.rst = 0 +def main_generator(dut): + test_probe = True + test_writes = False + test_reads = False - for i in range(100): - yield + # test probe + if test_probe: + packet = etherbone.EtherbonePacket() + packet.pf = 1 + dut.etherbone_model.send(packet) + yield from self.etherbone_model.receive() + print("probe: " + str(bool(self.etherbone_model.rx_packet.pr))) - test_probe = True - test_writes = True - test_reads = True + for i in range(8): + # test writes + if test_writes: + writes_datas = [j for j in range(16)] + writes = etherbone.EtherboneWrites(base_addr=0x1000, + datas=writes_datas) + record = etherbone.EtherboneRecord() + record.writes = writes + record.reads = None + record.bca = 0 + record.rca = 0 + record.rff = 0 + record.cyc = 0 + record.wca = 0 + record.wff = 0 + record.byte_enable = 0xf + record.wcount = len(writes_datas) + record.rcount = 0 - # test probe - if test_probe: packet = etherbone.EtherbonePacket() - packet.pf = 1 + packet.records = [record] + self.etherbone_model.send(packet) + for i in range(256): + yield + + # test reads + if test_reads: + reads_addrs = [0x1000 + 4*j for j in range(16)] + reads = etherbone.EtherboneReads(base_ret_addr=0x1000, + addrs=reads_addrs) + record = etherbone.EtherboneRecord() + record.writes = None + record.reads = reads + record.bca = 0 + record.rca = 0 + record.rff = 0 + record.cyc = 0 + record.wca = 0 + record.wff = 0 + record.byte_enable = 0xf + record.wcount = 0 + record.rcount = len(reads_addrs) + + packet = etherbone.EtherbonePacket() + packet.records = [record] self.etherbone_model.send(packet) yield from self.etherbone_model.receive() - print("probe: " + str(bool(self.etherbone_model.rx_packet.pr))) + loopback_writes_datas = [] + loopback_writes_datas = self.etherbone_model.rx_packet.records.pop().writes.get_datas() - for i in range(8): - # test writes - if test_writes: - writes_datas = [j for j in range(16)] - writes = etherbone.EtherboneWrites(base_addr=0x1000, - datas=writes_datas) - record = etherbone.EtherboneRecord() - record.writes = writes - record.reads = None - record.bca = 0 - record.rca = 0 - record.rff = 0 - record.cyc = 0 - record.wca = 0 - record.wff = 0 - record.byte_enable = 0xf - record.wcount = len(writes_datas) - record.rcount = 0 + # check resultss + s, l, e = check(writes_datas, loopback_writes_datas) + print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e)) - packet = etherbone.EtherbonePacket() - packet.records = [record] - self.etherbone_model.send(packet) - for i in range(256): - yield - - # test reads - if test_reads: - reads_addrs = [0x1000 + 4*j for j in range(16)] - reads = etherbone.EtherboneReads(base_ret_addr=0x1000, - addrs=reads_addrs) - record = etherbone.EtherboneRecord() - record.writes = None - record.reads = reads - record.bca = 0 - record.rca = 0 - record.rff = 0 - record.cyc = 0 - record.wca = 0 - record.wff = 0 - record.byte_enable = 0xf - record.wcount = 0 - record.rcount = len(reads_addrs) - - packet = etherbone.EtherbonePacket() - packet.records = [record] - self.etherbone_model.send(packet) - yield from self.etherbone_model.receive() - loopback_writes_datas = [] - loopback_writes_datas = self.etherbone_model.rx_packet.records.pop().writes.get_datas() - - # check results - s, l, e = check(writes_datas, loopback_writes_datas) - print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e)) + # XXX: find a way to exit properly + import sys + sys.exit() if __name__ == "__main__": - run_simulation(TB(), ncycles=4096, vcd_name="my.vcd", keep_files=True) + tb = TB() + generators = { + "sys" : [main_generator(tb)], + "eth_tx": [tb.phy_model.phy_sink.generator(), + tb.phy_model.generator()], + "eth_rx": tb.phy_model.phy_source.generator() + } + clocks = {"sys": 10, + "eth_rx": 10, + "eth_tx": 10} + run_simulation(tb, generators, clocks, vcd_name="sim.vcd") diff --git a/test/icmp_tb.py b/test/icmp_tb.py index c008e81..97b9061 100644 --- a/test/icmp_tb.py +++ b/test/icmp_tb.py @@ -1,5 +1,4 @@ from litex.gen import * -from litex.gen.sim.generic import run_simulation from litex.soc.interconnect import wishbone from litex.soc.interconnect.stream_sim import * @@ -27,33 +26,31 @@ class TB(Module): self.submodules.ip = LiteEthIPCore(self.phy_model, mac_address, ip_address, 100000) - # use sys_clk for each clock_domain - self.clock_domains.cd_eth_rx = ClockDomain() - self.clock_domains.cd_eth_tx = ClockDomain() - self.comb += [ - self.cd_eth_rx.clk.eq(ClockSignal()), - self.cd_eth_rx.rst.eq(ResetSignal()), - self.cd_eth_tx.clk.eq(ClockSignal()), - self.cd_eth_tx.rst.eq(ResetSignal()), - ] +def main_generator(dut): + packet = MACPacket(ping_request) + packet.decode_remove_header() + packet = IPPacket(packet) + packet.decode() + packet = ICMPPacket(packet) + packet.decode() + dut.icmp_model.send(packet) - def gen_simulation(self, selfp): - selfp.cd_eth_rx.rst = 1 - selfp.cd_eth_tx.rst = 1 + for i in range(256): yield - selfp.cd_eth_rx.rst = 0 - selfp.cd_eth_tx.rst = 0 - for i in range(100): - yield - - packet = MACPacket(ping_request) - packet.decode_remove_header() - packet = IPPacket(packet) - packet.decode() - packet = ICMPPacket(packet) - packet.decode() - self.icmp_model.send(packet) + # XXX: find a way to exit properly + import sys + sys.exit() if __name__ == "__main__": - run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True) + tb = TB() + generators = { + "sys" : [main_generator(tb)], + "eth_tx": [tb.phy_model.phy_sink.generator(), + tb.phy_model.generator()], + "eth_rx": tb.phy_model.phy_source.generator() + } + clocks = {"sys": 10, + "eth_rx": 10, + "eth_tx": 10} + run_simulation(tb, generators, clocks, vcd_name="sim.vcd") diff --git a/test/ip_tb.py b/test/ip_tb.py index a7c5ac9..3669e17 100644 --- a/test/ip_tb.py +++ b/test/ip_tb.py @@ -1,5 +1,4 @@ from litex.gen import * -from litex.gen.sim.generic import run_simulation from litex.soc.interconnect import wishbone from litex.soc.interconnect.stream_sim import * @@ -23,37 +22,31 @@ class TB(Module): self.submodules.ip = LiteEthIPCore(self.phy_model, mac_address, ip_address, 100000) self.ip_port = self.ip.ip.crossbar.get_port(udp_protocol) - # use sys_clk for each clock_domain - self.clock_domains.cd_eth_rx = ClockDomain() - self.clock_domains.cd_eth_tx = ClockDomain() - self.comb += [ - self.cd_eth_rx.clk.eq(ClockSignal()), - self.cd_eth_rx.rst.eq(ResetSignal()), - self.cd_eth_tx.clk.eq(ClockSignal()), - self.cd_eth_tx.rst.eq(ResetSignal()), - ] +def main_generator(dut): + while True: + yield dut.ip_port.sink.valid.eq(1) + yield dut.ip_port.sink.last.eq(1) + yield dut.ip_port.sink.ip_address.eq(0x12345678) + yield dut.ip_port.sink.protocol.eq(udp_protocol) + + yield dut.ip_port.source.ready.eq(1) + if (yield dut.ip_port.source.valid) == 1 and (yield dut.ip_port.source.last) == 1: + print("packet from IP 0x{:08x}".format((yield dut.ip_port.sink.ip_address))) + # XXX: find a way to exit properly + import sys + sys.exit() - def gen_simulation(self, selfp): - selfp.cd_eth_rx.rst = 1 - selfp.cd_eth_tx.rst = 1 yield - selfp.cd_eth_rx.rst = 0 - selfp.cd_eth_tx.rst = 0 - - for i in range(100): - yield - - while True: - selfp.ip_port.sink.valid = 1 - selfp.ip_port.sink.last = 1 - selfp.ip_port.sink.ip_address = 0x12345678 - selfp.ip_port.sink.protocol = udp_protocol - - selfp.ip_port.source.ready = 1 - if selfp.ip_port.source.valid == 1 and selfp.ip_port.source.last == 1: - print("packet from IP 0x{:08x}".format(selfp.ip_port.sink.ip_address)) - - yield if __name__ == "__main__": - run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True) + tb = TB() + generators = { + "sys" : [main_generator(tb)], + "eth_tx": [tb.phy_model.phy_sink.generator(), + tb.phy_model.generator()], + "eth_rx": tb.phy_model.phy_source.generator() + } + clocks = {"sys": 10, + "eth_rx": 10, + "eth_tx": 10} + run_simulation(tb, generators, clocks, vcd_name="sim.vcd") diff --git a/test/mac_core_tb.py b/test/mac_core_tb.py index 9acde2e..c120734 100644 --- a/test/mac_core_tb.py +++ b/test/mac_core_tb.py @@ -1,5 +1,4 @@ from litex.gen import * -from litex.gen.sim.generic import run_simulation from litex.soc.interconnect import wishbone from litex.soc.interconnect.stream_sim import * @@ -22,42 +21,45 @@ class TB(Module): self.submodules.logger_randomizer = AckRandomizer(eth_phy_description(8), level=50) self.submodules.logger = PacketLogger(eth_phy_description(8)) - # use sys_clk for each clock_domain - self.clock_domains.cd_eth_rx = ClockDomain() - self.clock_domains.cd_eth_tx = ClockDomain() self.comb += [ - self.cd_eth_rx.clk.eq(ClockSignal()), - self.cd_eth_rx.rst.eq(ResetSignal()), - self.cd_eth_tx.clk.eq(ClockSignal()), - self.cd_eth_tx.rst.eq(ResetSignal()), + Record.connect(self.streamer.source, self.streamer_randomizer.sink), + Record.connect(self.streamer_randomizer.source, self.core.sink), + Record.connect(self.core.source, self.logger_randomizer.sink), + Record.connect(self.logger_randomizer.source, self.logger.sink) ] - self.comb += [ - self.streamer.source.connect(self.streamer_randomizer.sink), - self.streamer_randomizer.source.connect(self.core.sink), - self.core.source.connect(self.logger_randomizer.sink), - self.logger_randomizer.source.connect(self.logger.sink) - ] - def gen_simulation(self, selfp): - selfp.cd_eth_rx.rst = 1 - selfp.cd_eth_tx.rst = 1 - yield - selfp.cd_eth_rx.rst = 0 - selfp.cd_eth_tx.rst = 0 +def main_generator(dut): + for i in range(2): + packet = mac.MACPacket([i for i in range(64)]) + packet.target_mac = 0x010203040506 + packet.sender_mac = 0x090A0B0C0C0D + packet.ethernet_type = 0x0800 + packet.encode_header() + dut.streamer.send(packet) + yield from dut.logger.receive() - for i in range(8): - packet = mac.MACPacket([i for i in range(64)]) - packet.target_mac = 0x010203040506 - packet.sender_mac = 0x090A0B0C0C0D - packet.ethernet_type = 0x0800 - packet.encode_header() - yield from self.streamer.send(packet) - yield from self.logger.receive() + # check results + s, l, e = check(packet, dut.logger.packet) + print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e)) - # check results - s, l, e = check(packet, self.logger.packet) - print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e)) + # XXX: find a way to exit properly + import sys + sys.exit() if __name__ == "__main__": - run_simulation(TB(), ncycles=4000, vcd_name="my.vcd", keep_files=True) + tb = TB() + generators = { + "sys" : [main_generator(tb), + tb.streamer.generator(), + tb.streamer_randomizer.generator(), + tb.logger_randomizer.generator(), + tb.logger.generator()], + "eth_tx": [tb.phy_model.phy_sink.generator(), + tb.phy_model.generator()], + "eth_rx": tb.phy_model.phy_source.generator() + } + clocks = {"sys": 10, + "eth_rx": 10, + "eth_tx": 10} + run_simulation(tb, generators, clocks, vcd_name="sim.vcd") diff --git a/test/mac_wishbone_tb.py b/test/mac_wishbone_tb.py index 261f1ab..5faf706 100644 --- a/test/mac_wishbone_tb.py +++ b/test/mac_wishbone_tb.py @@ -1,5 +1,4 @@ from litex.gen import * -from litex.gen.sim.generic import run_simulation from litex.soc.interconnect import wishbone from litex.soc.interconnect.stream_sim import * @@ -16,30 +15,30 @@ class WishboneMaster: self.dat = 0 def write(self, adr, dat): - self.obj.cyc = 1 - self.obj.stb = 1 - self.obj.adr = adr - self.obj.we = 1 - self.obj.sel = 0xF - self.obj.dat_w = dat - while self.obj.ack == 0: + yield self.obj.cyc.eq(1) + yield self.obj.stb.eq(1) + yield self.obj.adr.eq(adr) + yield self.obj.we.eq(1) + yield self.obj.sel.eq(0xf) + yield self.obj.dat_w.eq(dat) + while (yield self.obj.ack) == 0: yield - self.obj.cyc = 0 - self.obj.stb = 0 + yield self.obj.cyc.eq(0) + yield self.obj.stb.eq(0) yield def read(self, adr): - self.obj.cyc = 1 - self.obj.stb = 1 - self.obj.adr = adr - self.obj.we = 0 - self.obj.sel = 0xF - self.obj.dat_w = 0 - while self.obj.ack == 0: + yield self.obj.cyc.eq(1) + yield self.obj.stb.eq(1) + yield self.obj.adr.eq(adr) + yield self.obj.we.eq(0) + yield self.obj.sel.eq(0xf) + yield self.obj.dat_w.eq(0) + while (yield self.obj.ack) == 0: yield - self.dat = self.obj.dat_r - self.obj.cyc = 0 - self.obj.stb = 0 + yield self.dat.eq(self.obj.dat_r) + yield self.obj.cyc.eq(0) + yield self.obj.stb.eq(0) yield @@ -48,21 +47,21 @@ class SRAMReaderDriver: self.obj = obj def start(self, slot, length): - self.obj._slot.storage = slot - self.obj._length.storage = length - self.obj._start.re = 1 + yield self.obj._slot.storage.eq(slot) + yield self.obj._length.storage.eq(length) + yield self.obj._start.re.eq(1) yield - self.obj._start.re = 0 + yield self.obj._start.re.eq(0) yield def wait_done(self): - while self.obj.ev.done.pending == 0: + while (yield self.obj.ev.done.pending) == 0: yield def clear_done(self): - self.obj.ev.done.clear = 1 + yield self.obj.ev.done.clear.eq(1) yield - self.obj.ev.done.clear = 0 + yield self.obj.ev.done.clear.eq(0) yield @@ -71,79 +70,80 @@ class SRAMWriterDriver: self.obj = obj def wait_available(self): - while self.obj.ev.available.pending == 0: + while (yield self.obj.ev.available.pending) == 0: yield def clear_available(self): - self.obj.ev.available.clear = 1 + yield self.obj.ev.available.clear.eq(1) yield - self.obj.ev.available.clear = 0 + yield self.obj.ev.available.clear.eq(0) yield class TB(Module): def __init__(self): - self.submodules.phy_model = phy.PHY(8, debug=False) - self.submodules.mac_model = mac.MAC(self.phy_model, debug=False, loopback=True) + self.submodules.phy_model = phy.PHY(8, debug=True) + self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=True) self.submodules.ethmac = LiteEthMAC(phy=self.phy_model, dw=32, interface="wishbone", with_preamble_crc=True) - # use sys_clk for each clock_domain - self.clock_domains.cd_eth_rx = ClockDomain() - self.clock_domains.cd_eth_tx = ClockDomain() - self.comb += [ - self.cd_eth_rx.clk.eq(ClockSignal()), - self.cd_eth_rx.rst.eq(ResetSignal()), - self.cd_eth_tx.clk.eq(ClockSignal()), - self.cd_eth_tx.rst.eq(ResetSignal()), - ] - def gen_simulation(self, selfp): - selfp.cd_eth_rx.rst = 1 - selfp.cd_eth_tx.rst = 1 - yield - selfp.cd_eth_rx.rst = 0 - selfp.cd_eth_tx.rst = 0 +def main_generator(dut): + wishbone_master = WishboneMaster(dut.ethmac.bus) + sram_reader_driver = SRAMReaderDriver(dut.ethmac.interface.sram.reader) + sram_writer_driver = SRAMWriterDriver(dut.ethmac.interface.sram.writer) - wishbone_master = WishboneMaster(selfp.ethmac.bus) - sram_reader_driver = SRAMReaderDriver(selfp.ethmac.interface.sram.reader) - sram_writer_driver = SRAMWriterDriver(selfp.ethmac.interface.sram.writer) + sram_writer_slots_offset = [0x000, 0x200] + sram_reader_slots_offset = [0x400, 0x600] - sram_writer_slots_offset = [0x000, 0x200] - sram_reader_slots_offset = [0x400, 0x600] + length = 150+2 - length = 150+2 + tx_payload = [seed_to_data(i, True) % 0xFF for i in range(length)] + [0, 0, 0, 0] - tx_payload = [seed_to_data(i, True) % 0xFF for i in range(length)] + [0, 0, 0, 0] + errors = 0 - errors = 0 + while True: + for i in range(20): + yield + for slot in range(2): + print("slot {}: ".format(slot), end="") + # fill tx memory + for i in range(length//4+1): + dat = int.from_bytes(tx_payload[4*i:4*(i+1)], "big") + yield from wishbone_master.write(sram_reader_slots_offset[slot]+i, dat) - while True: - for slot in range(2): - print("slot {}: ".format(slot), end="") - # fill tx memory - for i in range(length//4+1): - dat = int.from_bytes(tx_payload[4*i:4*(i+1)], "big") - yield from wishbone_master.write(sram_reader_slots_offset[slot]+i, dat) + # XXX: find a way to exit properly + import sys + sys.exit() - # send tx payload & wait - yield from sram_reader_driver.start(slot, length) - yield from sram_reader_driver.wait_done() - yield from sram_reader_driver.clear_done() - - # wait rx - yield from sram_writer_driver.wait_available() - yield from sram_writer_driver.clear_available() - - # get rx payload (loopback on PHY Model) - rx_payload = [] - for i in range(length//4+1): - yield from wishbone_master.read(sram_writer_slots_offset[slot]+i) - dat = wishbone_master.dat - rx_payload += list(dat.to_bytes(4, byteorder='big')) - - # check results - s, l, e = check(tx_payload[:length], rx_payload[:min(length, len(rx_payload))]) - print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e)) +# # send tx payload & wait +# yield from sram_reader_driver.start(slot, length) +# yield from sram_reader_driver.wait_done() +# yield from sram_reader_driver.clear_done() +# +# # wait rx +# yield from sram_writer_driver.wait_available() +# yield from sram_writer_driver.clear_available() +# +# # get rx payload (loopback on PHY Model) +# rx_payload = [] +# for i in range(length//4+1): +# yield from wishbone_master.read(sram_writer_slots_offset[slot]+i) +# dat = wishbone_master.dat +# rx_payload += list(dat.to_bytes(4, byteorder='big')) +# +# # check results +# s, l, e = check(tx_payload[:length], rx_payload[:min(length, len(rx_payload))]) +# print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e)) if __name__ == "__main__": - run_simulation(TB(), ncycles=3000, vcd_name="my.vcd", keep_files=True) + tb = TB() + generators = { + "sys" : main_generator(tb), + "eth_tx": [tb.phy_model.phy_sink.generator(), + tb.phy_model.generator()], + "eth_rx": tb.phy_model.phy_source.generator() + } + clocks = {"sys": 10, + "eth_rx": 10, + "eth_tx": 10} + run_simulation(tb, generators, clocks, vcd_name="sim.vcd") diff --git a/test/model/phy.py b/test/model/phy.py index 613cefc..7e78888 100644 --- a/test/model/phy.py +++ b/test/model/phy.py @@ -54,7 +54,7 @@ class PHY(Module): print_phy(r) self.packet = self.phy_sink.packet - def gen_simulation(self, selfp): + def generator(self): while True: yield from self.receive() if self.mac_callback is not None: diff --git a/test/udp_tb.py b/test/udp_tb.py index c71d963..9c06934 100644 --- a/test/udp_tb.py +++ b/test/udp_tb.py @@ -1,5 +1,4 @@ from litex.gen import * -from litex.gen.sim.generic import run_simulation from litex.soc.interconnect import wishbone from litex.soc.interconnect.stream_sim import * @@ -7,6 +6,7 @@ from litex.soc.interconnect.stream_sim import * from liteeth.common import * from liteeth.core import LiteEthUDPIPCore + from test.model import phy, mac, arp, ip, udp ip_address = 0x12345678 @@ -27,45 +27,38 @@ class TB(Module): self.submodules.streamer = PacketStreamer(eth_udp_user_description(dw)) self.submodules.logger = PacketLogger(eth_udp_user_description(dw)) self.comb += [ - self.streamer.source.connect(udp_port.sink), + Record.connect(self.streamer.source, udp_port.sink), udp_port.sink.ip_address.eq(0x12345678), udp_port.sink.src_port.eq(0x1234), udp_port.sink.dst_port.eq(0x5678), udp_port.sink.length.eq(64//(dw//8)), - udp_port.source.connect(self.logger.sink) + Record.connect(udp_port.source, self.logger.sink) ] - # use sys_clk for each clock_domain - self.clock_domains.cd_eth_rx = ClockDomain() - self.clock_domains.cd_eth_tx = ClockDomain() - self.comb += [ - self.cd_eth_rx.clk.eq(ClockSignal()), - self.cd_eth_rx.rst.eq(ResetSignal()), - self.cd_eth_tx.clk.eq(ClockSignal()), - self.cd_eth_tx.rst.eq(ResetSignal()), - ] +def main_generator(dut): + packet = Packet([i for i in range(64//(dut.dw//8))]) + dut.streamer.send(packet) + yield from dut.logger.receive() - def gen_simulation(self, selfp): - selfp.cd_eth_rx.rst = 1 - selfp.cd_eth_tx.rst = 1 - yield - selfp.cd_eth_rx.rst = 0 - selfp.cd_eth_tx.rst = 0 - - for i in range(100): - yield - - while True: - packet = Packet([i for i in range(64//(self.dw//8))]) - yield from self.streamer.send(packet) - yield from self.logger.receive() - - # check results - s, l, e = check(packet, self.logger.packet) - print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e)) + # check results + s, l, e = check(packet, dut.logger.packet) + print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e)) + # XXX: find a way to exit properly + import sys + sys.exit() if __name__ == "__main__": - run_simulation(TB(8), ncycles=2048, vcd_name="my.vcd", keep_files=True) - run_simulation(TB(16), ncycles=2048, vcd_name="my.vcd", keep_files=True) - run_simulation(TB(32), ncycles=2048, vcd_name="my.vcd", keep_files=True) + tb = TB(8) + generators = { + "sys" : [main_generator(tb), + tb.streamer.generator(), + tb.logger.generator()], + "eth_tx": [tb.phy_model.phy_sink.generator(), + tb.phy_model.generator()], + "eth_rx": tb.phy_model.phy_source.generator() + } + clocks = {"sys": 10, + "eth_rx": 10, + "eth_tx": 10} + run_simulation(tb, generators, clocks, vcd_name="sim.vcd")