test: use new simulator (still etherbone_tb and mac_wishbone_tb not working due to use of FullMemoryWE)

This commit is contained in:
Florent Kermarrec 2016-03-21 19:30:47 +01:00
parent 657ba4cb16
commit 2f15f3748e
8 changed files with 292 additions and 318 deletions

View File

@ -1,5 +1,4 @@
from litex.gen import *
from litex.gen.sim.generic import run_simulation
from litex.soc.interconnect import wishbone
from litex.soc.interconnect.stream_sim import *
@ -23,36 +22,31 @@ class TB(Module):
self.submodules.mac = LiteEthMAC(self.phy_model, dw=8, with_preamble_crc=True)
self.submodules.arp = LiteEthARP(self.mac, mac_address, ip_address, 100000)
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
def main_generator(dut):
while (yield dut.arp.table.request.ready) != 1:
yield dut.arp.table.request.valid.eq(1)
yield dut.arp.table.request.ip_address.eq(0x12345678)
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
for i in range(100):
yield
while selfp.arp.table.request.ready != 1:
selfp.arp.table.request.valid = 1
selfp.arp.table.request.ip_address = 0x12345678
yield
selfp.arp.table.request.valid = 0
while selfp.arp.table.response.valid != 1:
selfp.arp.table.response.ready = 1
yield
print("Received MAC : 0x{:12x}".format(selfp.arp.table.response.mac_address))
yield dut.arp.table.request.valid.eq(0)
while (yield dut.arp.table.response.valid) != 1:
yield dut.arp.table.response.ready.eq(1)
yield
print("Received MAC : 0x{:12x}".format((yield dut.arp.table.response.mac_address)))
# XXX: find a way to exit properly
import sys
sys.exit()
if __name__ == "__main__":
run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)
tb = TB()
generators = {
"sys" : [main_generator(tb)],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")

View File

@ -1,5 +1,4 @@
from litex.gen import *
from litex.gen.sim.generic import run_simulation
from litex.soc.interconnect import wishbone
from litex.soc.interconnect.stream_sim import *
@ -16,12 +15,12 @@ mac_address = 0x12345678abcd
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=False)
self.submodules.mac_model = mac.MAC(self.phy_model, debug=False, loopback=False)
self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=False)
self.submodules.ip_model = ip.IP(self.mac_model, mac_address, ip_address, debug=False, loopback=False)
self.submodules.udp_model = udp.UDP(self.ip_model, ip_address, debug=False, loopback=False)
self.submodules.etherbone_model = etherbone.Etherbone(self.udp_model, debug=False)
self.submodules.phy_model = phy.PHY(8, debug=True)
self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=False)
self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=True)
self.submodules.ip_model = ip.IP(self.mac_model, mac_address, ip_address, debug=True, loopback=False)
self.submodules.udp_model = udp.UDP(self.ip_model, ip_address, debug=True, loopback=False)
self.submodules.etherbone_model = etherbone.Etherbone(self.udp_model, debug=True)
self.submodules.core = LiteEthUDPIPCore(self.phy_model, mac_address, ip_address, 100000)
self.submodules.etherbone = LiteEthEtherbone(self.core.udp, 20000)
@ -29,91 +28,87 @@ class TB(Module):
self.submodules.sram = wishbone.SRAM(1024)
self.submodules.interconnect = wishbone.InterconnectPointToPoint(self.etherbone.master.bus, self.sram.bus)
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
def main_generator(dut):
test_probe = True
test_writes = False
test_reads = False
for i in range(100):
yield
# test probe
if test_probe:
packet = etherbone.EtherbonePacket()
packet.pf = 1
dut.etherbone_model.send(packet)
yield from self.etherbone_model.receive()
print("probe: " + str(bool(self.etherbone_model.rx_packet.pr)))
test_probe = True
test_writes = True
test_reads = True
for i in range(8):
# test writes
if test_writes:
writes_datas = [j for j in range(16)]
writes = etherbone.EtherboneWrites(base_addr=0x1000,
datas=writes_datas)
record = etherbone.EtherboneRecord()
record.writes = writes
record.reads = None
record.bca = 0
record.rca = 0
record.rff = 0
record.cyc = 0
record.wca = 0
record.wff = 0
record.byte_enable = 0xf
record.wcount = len(writes_datas)
record.rcount = 0
# test probe
if test_probe:
packet = etherbone.EtherbonePacket()
packet.pf = 1
packet.records = [record]
self.etherbone_model.send(packet)
for i in range(256):
yield
# test reads
if test_reads:
reads_addrs = [0x1000 + 4*j for j in range(16)]
reads = etherbone.EtherboneReads(base_ret_addr=0x1000,
addrs=reads_addrs)
record = etherbone.EtherboneRecord()
record.writes = None
record.reads = reads
record.bca = 0
record.rca = 0
record.rff = 0
record.cyc = 0
record.wca = 0
record.wff = 0
record.byte_enable = 0xf
record.wcount = 0
record.rcount = len(reads_addrs)
packet = etherbone.EtherbonePacket()
packet.records = [record]
self.etherbone_model.send(packet)
yield from self.etherbone_model.receive()
print("probe: " + str(bool(self.etherbone_model.rx_packet.pr)))
loopback_writes_datas = []
loopback_writes_datas = self.etherbone_model.rx_packet.records.pop().writes.get_datas()
for i in range(8):
# test writes
if test_writes:
writes_datas = [j for j in range(16)]
writes = etherbone.EtherboneWrites(base_addr=0x1000,
datas=writes_datas)
record = etherbone.EtherboneRecord()
record.writes = writes
record.reads = None
record.bca = 0
record.rca = 0
record.rff = 0
record.cyc = 0
record.wca = 0
record.wff = 0
record.byte_enable = 0xf
record.wcount = len(writes_datas)
record.rcount = 0
# check resultss
s, l, e = check(writes_datas, loopback_writes_datas)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
packet = etherbone.EtherbonePacket()
packet.records = [record]
self.etherbone_model.send(packet)
for i in range(256):
yield
# test reads
if test_reads:
reads_addrs = [0x1000 + 4*j for j in range(16)]
reads = etherbone.EtherboneReads(base_ret_addr=0x1000,
addrs=reads_addrs)
record = etherbone.EtherboneRecord()
record.writes = None
record.reads = reads
record.bca = 0
record.rca = 0
record.rff = 0
record.cyc = 0
record.wca = 0
record.wff = 0
record.byte_enable = 0xf
record.wcount = 0
record.rcount = len(reads_addrs)
packet = etherbone.EtherbonePacket()
packet.records = [record]
self.etherbone_model.send(packet)
yield from self.etherbone_model.receive()
loopback_writes_datas = []
loopback_writes_datas = self.etherbone_model.rx_packet.records.pop().writes.get_datas()
# check results
s, l, e = check(writes_datas, loopback_writes_datas)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
# XXX: find a way to exit properly
import sys
sys.exit()
if __name__ == "__main__":
run_simulation(TB(), ncycles=4096, vcd_name="my.vcd", keep_files=True)
tb = TB()
generators = {
"sys" : [main_generator(tb)],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")

View File

@ -1,5 +1,4 @@
from litex.gen import *
from litex.gen.sim.generic import run_simulation
from litex.soc.interconnect import wishbone
from litex.soc.interconnect.stream_sim import *
@ -27,33 +26,31 @@ class TB(Module):
self.submodules.ip = LiteEthIPCore(self.phy_model, mac_address, ip_address, 100000)
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def main_generator(dut):
packet = MACPacket(ping_request)
packet.decode_remove_header()
packet = IPPacket(packet)
packet.decode()
packet = ICMPPacket(packet)
packet.decode()
dut.icmp_model.send(packet)
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
for i in range(256):
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
for i in range(100):
yield
packet = MACPacket(ping_request)
packet.decode_remove_header()
packet = IPPacket(packet)
packet.decode()
packet = ICMPPacket(packet)
packet.decode()
self.icmp_model.send(packet)
# XXX: find a way to exit properly
import sys
sys.exit()
if __name__ == "__main__":
run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)
tb = TB()
generators = {
"sys" : [main_generator(tb)],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")

View File

@ -1,5 +1,4 @@
from litex.gen import *
from litex.gen.sim.generic import run_simulation
from litex.soc.interconnect import wishbone
from litex.soc.interconnect.stream_sim import *
@ -23,37 +22,31 @@ class TB(Module):
self.submodules.ip = LiteEthIPCore(self.phy_model, mac_address, ip_address, 100000)
self.ip_port = self.ip.ip.crossbar.get_port(udp_protocol)
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def main_generator(dut):
while True:
yield dut.ip_port.sink.valid.eq(1)
yield dut.ip_port.sink.last.eq(1)
yield dut.ip_port.sink.ip_address.eq(0x12345678)
yield dut.ip_port.sink.protocol.eq(udp_protocol)
yield dut.ip_port.source.ready.eq(1)
if (yield dut.ip_port.source.valid) == 1 and (yield dut.ip_port.source.last) == 1:
print("packet from IP 0x{:08x}".format((yield dut.ip_port.sink.ip_address)))
# XXX: find a way to exit properly
import sys
sys.exit()
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
for i in range(100):
yield
while True:
selfp.ip_port.sink.valid = 1
selfp.ip_port.sink.last = 1
selfp.ip_port.sink.ip_address = 0x12345678
selfp.ip_port.sink.protocol = udp_protocol
selfp.ip_port.source.ready = 1
if selfp.ip_port.source.valid == 1 and selfp.ip_port.source.last == 1:
print("packet from IP 0x{:08x}".format(selfp.ip_port.sink.ip_address))
yield
if __name__ == "__main__":
run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)
tb = TB()
generators = {
"sys" : [main_generator(tb)],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")

View File

@ -1,5 +1,4 @@
from litex.gen import *
from litex.gen.sim.generic import run_simulation
from litex.soc.interconnect import wishbone
from litex.soc.interconnect.stream_sim import *
@ -22,42 +21,45 @@ class TB(Module):
self.submodules.logger_randomizer = AckRandomizer(eth_phy_description(8), level=50)
self.submodules.logger = PacketLogger(eth_phy_description(8))
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
Record.connect(self.streamer.source, self.streamer_randomizer.sink),
Record.connect(self.streamer_randomizer.source, self.core.sink),
Record.connect(self.core.source, self.logger_randomizer.sink),
Record.connect(self.logger_randomizer.source, self.logger.sink)
]
self.comb += [
self.streamer.source.connect(self.streamer_randomizer.sink),
self.streamer_randomizer.source.connect(self.core.sink),
self.core.source.connect(self.logger_randomizer.sink),
self.logger_randomizer.source.connect(self.logger.sink)
]
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
def main_generator(dut):
for i in range(2):
packet = mac.MACPacket([i for i in range(64)])
packet.target_mac = 0x010203040506
packet.sender_mac = 0x090A0B0C0C0D
packet.ethernet_type = 0x0800
packet.encode_header()
dut.streamer.send(packet)
yield from dut.logger.receive()
for i in range(8):
packet = mac.MACPacket([i for i in range(64)])
packet.target_mac = 0x010203040506
packet.sender_mac = 0x090A0B0C0C0D
packet.ethernet_type = 0x0800
packet.encode_header()
yield from self.streamer.send(packet)
yield from self.logger.receive()
# check results
s, l, e = check(packet, dut.logger.packet)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
# check results
s, l, e = check(packet, self.logger.packet)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
# XXX: find a way to exit properly
import sys
sys.exit()
if __name__ == "__main__":
run_simulation(TB(), ncycles=4000, vcd_name="my.vcd", keep_files=True)
tb = TB()
generators = {
"sys" : [main_generator(tb),
tb.streamer.generator(),
tb.streamer_randomizer.generator(),
tb.logger_randomizer.generator(),
tb.logger.generator()],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")

View File

@ -1,5 +1,4 @@
from litex.gen import *
from litex.gen.sim.generic import run_simulation
from litex.soc.interconnect import wishbone
from litex.soc.interconnect.stream_sim import *
@ -16,30 +15,30 @@ class WishboneMaster:
self.dat = 0
def write(self, adr, dat):
self.obj.cyc = 1
self.obj.stb = 1
self.obj.adr = adr
self.obj.we = 1
self.obj.sel = 0xF
self.obj.dat_w = dat
while self.obj.ack == 0:
yield self.obj.cyc.eq(1)
yield self.obj.stb.eq(1)
yield self.obj.adr.eq(adr)
yield self.obj.we.eq(1)
yield self.obj.sel.eq(0xf)
yield self.obj.dat_w.eq(dat)
while (yield self.obj.ack) == 0:
yield
self.obj.cyc = 0
self.obj.stb = 0
yield self.obj.cyc.eq(0)
yield self.obj.stb.eq(0)
yield
def read(self, adr):
self.obj.cyc = 1
self.obj.stb = 1
self.obj.adr = adr
self.obj.we = 0
self.obj.sel = 0xF
self.obj.dat_w = 0
while self.obj.ack == 0:
yield self.obj.cyc.eq(1)
yield self.obj.stb.eq(1)
yield self.obj.adr.eq(adr)
yield self.obj.we.eq(0)
yield self.obj.sel.eq(0xf)
yield self.obj.dat_w.eq(0)
while (yield self.obj.ack) == 0:
yield
self.dat = self.obj.dat_r
self.obj.cyc = 0
self.obj.stb = 0
yield self.dat.eq(self.obj.dat_r)
yield self.obj.cyc.eq(0)
yield self.obj.stb.eq(0)
yield
@ -48,21 +47,21 @@ class SRAMReaderDriver:
self.obj = obj
def start(self, slot, length):
self.obj._slot.storage = slot
self.obj._length.storage = length
self.obj._start.re = 1
yield self.obj._slot.storage.eq(slot)
yield self.obj._length.storage.eq(length)
yield self.obj._start.re.eq(1)
yield
self.obj._start.re = 0
yield self.obj._start.re.eq(0)
yield
def wait_done(self):
while self.obj.ev.done.pending == 0:
while (yield self.obj.ev.done.pending) == 0:
yield
def clear_done(self):
self.obj.ev.done.clear = 1
yield self.obj.ev.done.clear.eq(1)
yield
self.obj.ev.done.clear = 0
yield self.obj.ev.done.clear.eq(0)
yield
@ -71,79 +70,80 @@ class SRAMWriterDriver:
self.obj = obj
def wait_available(self):
while self.obj.ev.available.pending == 0:
while (yield self.obj.ev.available.pending) == 0:
yield
def clear_available(self):
self.obj.ev.available.clear = 1
yield self.obj.ev.available.clear.eq(1)
yield
self.obj.ev.available.clear = 0
yield self.obj.ev.available.clear.eq(0)
yield
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=False)
self.submodules.mac_model = mac.MAC(self.phy_model, debug=False, loopback=True)
self.submodules.phy_model = phy.PHY(8, debug=True)
self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=True)
self.submodules.ethmac = LiteEthMAC(phy=self.phy_model, dw=32, interface="wishbone", with_preamble_crc=True)
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
def main_generator(dut):
wishbone_master = WishboneMaster(dut.ethmac.bus)
sram_reader_driver = SRAMReaderDriver(dut.ethmac.interface.sram.reader)
sram_writer_driver = SRAMWriterDriver(dut.ethmac.interface.sram.writer)
wishbone_master = WishboneMaster(selfp.ethmac.bus)
sram_reader_driver = SRAMReaderDriver(selfp.ethmac.interface.sram.reader)
sram_writer_driver = SRAMWriterDriver(selfp.ethmac.interface.sram.writer)
sram_writer_slots_offset = [0x000, 0x200]
sram_reader_slots_offset = [0x400, 0x600]
sram_writer_slots_offset = [0x000, 0x200]
sram_reader_slots_offset = [0x400, 0x600]
length = 150+2
length = 150+2
tx_payload = [seed_to_data(i, True) % 0xFF for i in range(length)] + [0, 0, 0, 0]
tx_payload = [seed_to_data(i, True) % 0xFF for i in range(length)] + [0, 0, 0, 0]
errors = 0
errors = 0
while True:
for i in range(20):
yield
for slot in range(2):
print("slot {}: ".format(slot), end="")
# fill tx memory
for i in range(length//4+1):
dat = int.from_bytes(tx_payload[4*i:4*(i+1)], "big")
yield from wishbone_master.write(sram_reader_slots_offset[slot]+i, dat)
while True:
for slot in range(2):
print("slot {}: ".format(slot), end="")
# fill tx memory
for i in range(length//4+1):
dat = int.from_bytes(tx_payload[4*i:4*(i+1)], "big")
yield from wishbone_master.write(sram_reader_slots_offset[slot]+i, dat)
# XXX: find a way to exit properly
import sys
sys.exit()
# send tx payload & wait
yield from sram_reader_driver.start(slot, length)
yield from sram_reader_driver.wait_done()
yield from sram_reader_driver.clear_done()
# wait rx
yield from sram_writer_driver.wait_available()
yield from sram_writer_driver.clear_available()
# get rx payload (loopback on PHY Model)
rx_payload = []
for i in range(length//4+1):
yield from wishbone_master.read(sram_writer_slots_offset[slot]+i)
dat = wishbone_master.dat
rx_payload += list(dat.to_bytes(4, byteorder='big'))
# check results
s, l, e = check(tx_payload[:length], rx_payload[:min(length, len(rx_payload))])
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
# # send tx payload & wait
# yield from sram_reader_driver.start(slot, length)
# yield from sram_reader_driver.wait_done()
# yield from sram_reader_driver.clear_done()
#
# # wait rx
# yield from sram_writer_driver.wait_available()
# yield from sram_writer_driver.clear_available()
#
# # get rx payload (loopback on PHY Model)
# rx_payload = []
# for i in range(length//4+1):
# yield from wishbone_master.read(sram_writer_slots_offset[slot]+i)
# dat = wishbone_master.dat
# rx_payload += list(dat.to_bytes(4, byteorder='big'))
#
# # check results
# s, l, e = check(tx_payload[:length], rx_payload[:min(length, len(rx_payload))])
# print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
run_simulation(TB(), ncycles=3000, vcd_name="my.vcd", keep_files=True)
tb = TB()
generators = {
"sys" : main_generator(tb),
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")

View File

@ -54,7 +54,7 @@ class PHY(Module):
print_phy(r)
self.packet = self.phy_sink.packet
def gen_simulation(self, selfp):
def generator(self):
while True:
yield from self.receive()
if self.mac_callback is not None:

View File

@ -1,5 +1,4 @@
from litex.gen import *
from litex.gen.sim.generic import run_simulation
from litex.soc.interconnect import wishbone
from litex.soc.interconnect.stream_sim import *
@ -7,6 +6,7 @@ from litex.soc.interconnect.stream_sim import *
from liteeth.common import *
from liteeth.core import LiteEthUDPIPCore
from test.model import phy, mac, arp, ip, udp
ip_address = 0x12345678
@ -27,45 +27,38 @@ class TB(Module):
self.submodules.streamer = PacketStreamer(eth_udp_user_description(dw))
self.submodules.logger = PacketLogger(eth_udp_user_description(dw))
self.comb += [
self.streamer.source.connect(udp_port.sink),
Record.connect(self.streamer.source, udp_port.sink),
udp_port.sink.ip_address.eq(0x12345678),
udp_port.sink.src_port.eq(0x1234),
udp_port.sink.dst_port.eq(0x5678),
udp_port.sink.length.eq(64//(dw//8)),
udp_port.source.connect(self.logger.sink)
Record.connect(udp_port.source, self.logger.sink)
]
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def main_generator(dut):
packet = Packet([i for i in range(64//(dut.dw//8))])
dut.streamer.send(packet)
yield from dut.logger.receive()
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
for i in range(100):
yield
while True:
packet = Packet([i for i in range(64//(self.dw//8))])
yield from self.streamer.send(packet)
yield from self.logger.receive()
# check results
s, l, e = check(packet, self.logger.packet)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
# check results
s, l, e = check(packet, dut.logger.packet)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
# XXX: find a way to exit properly
import sys
sys.exit()
if __name__ == "__main__":
run_simulation(TB(8), ncycles=2048, vcd_name="my.vcd", keep_files=True)
run_simulation(TB(16), ncycles=2048, vcd_name="my.vcd", keep_files=True)
run_simulation(TB(32), ncycles=2048, vcd_name="my.vcd", keep_files=True)
tb = TB(8)
generators = {
"sys" : [main_generator(tb),
tb.streamer.generator(),
tb.logger.generator()],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")