test: for now revert all simulation (we'll switch when missing feature of new simulator will be implemted)

This commit is contained in:
Florent Kermarrec 2015-11-13 14:47:57 +01:00
parent 57b70c640c
commit 886108eee9
9 changed files with 386 additions and 365 deletions

View File

@ -1,6 +1,7 @@
from migen import *
from litex.soc.interconnect import wishbone
from migen.fhdl.std import *
from migen.bus import wishbone
from migen.bus.transactions import *
from migen.sim.generic import run_simulation
from liteeth.common import *
from liteeth.core.mac import LiteEthMAC
@ -22,31 +23,36 @@ class TB(Module):
self.submodules.mac = LiteEthMAC(self.phy_model, dw=8, with_preamble_crc=True)
self.submodules.arp = LiteEthARP(self.mac, mac_address, ip_address, 100000)
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def main_generator(dut):
while (yield dut.arp.table.request.ack) != 1:
yield dut.arp.table.request.stb.eq(1)
yield dut.arp.table.request.ip_address.eq(0x12345678)
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
yield dut.arp.table.request.stb.eq(0)
while (yield dut.arp.table.response.stb) != 1:
yield dut.arp.table.response.ack.eq(1)
yield
print("Received MAC : 0x{:12x}".format((yield dut.arp.table.response.mac_address)))
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
for i in range(100):
yield
while selfp.arp.table.request.ack != 1:
selfp.arp.table.request.stb = 1
selfp.arp.table.request.ip_address = 0x12345678
yield
selfp.arp.table.request.stb = 0
while selfp.arp.table.response.stb != 1:
selfp.arp.table.response.ack = 1
yield
print("Received MAC : 0x{:12x}".format(selfp.arp.table.response.mac_address))
# XXX: find a way to exit properly
import sys
sys.exit()
if __name__ == "__main__":
tb = TB()
generators = {
"sys" : [main_generator(tb)],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")
run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)

View File

@ -1,12 +1,12 @@
import random
import copy
from migen import *
from litex.soc.interconnect.stream import Sink, Source
from migen.fhdl.std import *
from migen.flow.actor import Sink, Source
from migen.genlib.record import *
from liteeth.common import *
from copy import deepcopy
def print_with_prefix(s, prefix=""):
if not isinstance(s, str):
@ -49,8 +49,8 @@ def comp(p1, p2):
def check(p1, p2):
p1 = deepcopy(p1)
p2 = deepcopy(p2)
p1 = copy.deepcopy(p1)
p2 = copy.deepcopy(p2)
if isinstance(p1, int):
return 0, 1, int(p1 != p2)
else:
@ -94,7 +94,7 @@ class PacketStreamer(Module):
self.packet.done = True
def send(self, packet):
packet = deepcopy(packet)
packet = copy.deepcopy(packet)
self.packets.append(packet)
return packet
@ -103,34 +103,32 @@ class PacketStreamer(Module):
while not packet.done:
yield
def generator(self):
while True:
if len(self.packets) and self.packet.done:
self.packet = self.packets.pop(0)
if not self.packet.ongoing and not self.packet.done:
yield self.source.stb.eq(1)
if self.source.description.packetized:
yield self.source.sop.eq(1)
yield self.source.data.eq(self.packet.pop(0))
self.packet.ongoing = True
elif (yield self.source.stb) == 1 and (yield self.source.ack) == 1:
if self.source.description.packetized:
yield self.source.sop.eq(0)
if len(self.packet) == 1:
yield self.source.eop.eq(1)
if self.last_be is not None:
yield self.source.last_be.eq(self.last_be)
else:
yield self.source.eop.eq(0)
if self.last_be is not None:
yield self.source.last_be.eq(0)
if len(self.packet) > 0:
yield self.source.stb.eq(1)
yield self.source.data.eq(self.packet.pop(0))
def do_simulation(self, selfp):
if len(self.packets) and self.packet.done:
self.packet = self.packets.pop(0)
if not self.packet.ongoing and not self.packet.done:
selfp.source.stb = 1
if self.source.description.packetized:
selfp.source.sop = 1
selfp.source.data = self.packet.pop(0)
self.packet.ongoing = True
elif selfp.source.stb == 1 and selfp.source.ack == 1:
if self.source.description.packetized:
selfp.source.sop = 0
if len(self.packet) == 1:
selfp.source.eop = 1
if self.last_be is not None:
selfp.source.last_be = self.last_be
else:
self.packet.done = True
yield self.source.stb.eq(0)
yield
selfp.source.eop = 0
if self.last_be is not None:
selfp.source.last_be = 0
if len(self.packet) > 0:
selfp.source.stb = 1
selfp.source.data = self.packet.pop(0)
else:
self.packet.done = True
selfp.source.stb = 0
class PacketLogger(Module):
@ -146,21 +144,19 @@ class PacketLogger(Module):
while not self.packet.done:
yield
def generator(self):
while True:
yield self.sink.ack.eq(1)
if (yield self.sink.stb):
if self.sink.description.packetized:
if (yield self.sink.sop):
self.packet = Packet()
self.packet.append((yield self.sink.data))
else:
self.packet.append((yield self.sink.data))
if (yield self.sink.eop):
self.packet.done = True
def do_simulation(self, selfp):
selfp.sink.ack = 1
if selfp.sink.stb:
if self.sink.description.packetized:
if selfp.sink.sop:
self.packet = Packet()
self.packet.append(selfp.sink.data)
else:
self.packet.append((yield self.sink.data))
yield
self.packet.append(selfp.sink.data)
if selfp.sink.eop:
self.packet.done = True
else:
self.packet.append(selfp.sink.data)
class AckRandomizer(Module):
@ -170,22 +166,20 @@ class AckRandomizer(Module):
self.sink = Sink(description)
self.source = Source(description)
self.ce = Signal(reset=1)
self.run = Signal()
self.comb += \
If(self.ce,
If(self.run,
Record.connect(self.sink, self.source)
).Else(
self.source.stb.eq(0),
self.sink.ack.eq(0),
)
def generator(self):
while True:
n = randn(100)
if n < self.level:
yield self.ce.eq(0)
else:
yield self.ce.eq(1)
yield
def do_simulation(self, selfp):
n = randn(100)
if n < self.level:
selfp.run = 0
else:
selfp.run = 1

View File

@ -1,6 +1,7 @@
from migen import *
from litex.soc.interconnect import wishbone
from migen.fhdl.std import *
from migen.bus import wishbone
from migen.bus.transactions import *
from migen.sim.generic import run_simulation
from liteeth.common import *
from liteeth.core import LiteEthUDPIPCore
@ -15,12 +16,12 @@ mac_address = 0x12345678abcd
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=True)
self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=False)
self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=True)
self.submodules.ip_model = ip.IP(self.mac_model, mac_address, ip_address, debug=True, loopback=False)
self.submodules.udp_model = udp.UDP(self.ip_model, ip_address, debug=True, loopback=False)
self.submodules.etherbone_model = etherbone.Etherbone(self.udp_model, debug=True)
self.submodules.phy_model = phy.PHY(8, debug=False)
self.submodules.mac_model = mac.MAC(self.phy_model, debug=False, loopback=False)
self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=False)
self.submodules.ip_model = ip.IP(self.mac_model, mac_address, ip_address, debug=False, loopback=False)
self.submodules.udp_model = udp.UDP(self.ip_model, ip_address, debug=False, loopback=False)
self.submodules.etherbone_model = etherbone.Etherbone(self.udp_model, debug=False)
self.submodules.core = LiteEthUDPIPCore(self.phy_model, mac_address, ip_address, 100000)
self.submodules.etherbone = LiteEthEtherbone(self.core.udp, 20000)
@ -28,87 +29,91 @@ class TB(Module):
self.submodules.sram = wishbone.SRAM(1024)
self.submodules.interconnect = wishbone.InterconnectPointToPoint(self.etherbone.master.bus, self.sram.bus)
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def main_generator(dut):
test_probe = True
test_writes = False
test_reads = False
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
# test probe
if test_probe:
packet = etherbone.EtherbonePacket()
packet.pf = 1
dut.etherbone_model.send(packet)
yield from self.etherbone_model.receive()
print("probe: " + str(bool(self.etherbone_model.rx_packet.pr)))
for i in range(100):
yield
for i in range(8):
# test writes
if test_writes:
writes_datas = [j for j in range(16)]
writes = etherbone.EtherboneWrites(base_addr=0x1000,
datas=writes_datas)
record = etherbone.EtherboneRecord()
record.writes = writes
record.reads = None
record.bca = 0
record.rca = 0
record.rff = 0
record.cyc = 0
record.wca = 0
record.wff = 0
record.byte_enable = 0xf
record.wcount = len(writes_datas)
record.rcount = 0
test_probe = True
test_writes = True
test_reads = True
# test probe
if test_probe:
packet = etherbone.EtherbonePacket()
packet.records = [record]
self.etherbone_model.send(packet)
for i in range(256):
yield
# test reads
if test_reads:
reads_addrs = [0x1000 + 4*j for j in range(16)]
reads = etherbone.EtherboneReads(base_ret_addr=0x1000,
addrs=reads_addrs)
record = etherbone.EtherboneRecord()
record.writes = None
record.reads = reads
record.bca = 0
record.rca = 0
record.rff = 0
record.cyc = 0
record.wca = 0
record.wff = 0
record.byte_enable = 0xf
record.wcount = 0
record.rcount = len(reads_addrs)
packet = etherbone.EtherbonePacket()
packet.records = [record]
packet.pf = 1
self.etherbone_model.send(packet)
yield from self.etherbone_model.receive()
loopback_writes_datas = []
loopback_writes_datas = self.etherbone_model.rx_packet.records.pop().writes.get_datas()
print("probe: " + str(bool(self.etherbone_model.rx_packet.pr)))
# check resultss
s, l, e = check(writes_datas, loopback_writes_datas)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
for i in range(8):
# test writes
if test_writes:
writes_datas = [j for j in range(16)]
writes = etherbone.EtherboneWrites(base_addr=0x1000,
datas=writes_datas)
record = etherbone.EtherboneRecord()
record.writes = writes
record.reads = None
record.bca = 0
record.rca = 0
record.rff = 0
record.cyc = 0
record.wca = 0
record.wff = 0
record.byte_enable = 0xf
record.wcount = len(writes_datas)
record.rcount = 0
# XXX: find a way to exit properly
import sys
sys.exit()
packet = etherbone.EtherbonePacket()
packet.records = [record]
self.etherbone_model.send(packet)
for i in range(256):
yield
# test reads
if test_reads:
reads_addrs = [0x1000 + 4*j for j in range(16)]
reads = etherbone.EtherboneReads(base_ret_addr=0x1000,
addrs=reads_addrs)
record = etherbone.EtherboneRecord()
record.writes = None
record.reads = reads
record.bca = 0
record.rca = 0
record.rff = 0
record.cyc = 0
record.wca = 0
record.wff = 0
record.byte_enable = 0xf
record.wcount = 0
record.rcount = len(reads_addrs)
packet = etherbone.EtherbonePacket()
packet.records = [record]
self.etherbone_model.send(packet)
yield from self.etherbone_model.receive()
loopback_writes_datas = []
loopback_writes_datas = self.etherbone_model.rx_packet.records.pop().writes.get_datas()
# check results
s, l, e = check(writes_datas, loopback_writes_datas)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
tb = TB()
generators = {
"sys" : [main_generator(tb)],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")
run_simulation(TB(), ncycles=4096, vcd_name="my.vcd", keep_files=True)

View File

@ -1,6 +1,7 @@
from migen import *
from litex.soc.interconnect import wishbone
from migen.fhdl.std import *
from migen.bus import wishbone
from migen.bus.transactions import *
from migen.sim.generic import run_simulation
from liteeth.common import *
from liteeth.core import LiteEthIPCore
@ -26,31 +27,33 @@ class TB(Module):
self.submodules.ip = LiteEthIPCore(self.phy_model, mac_address, ip_address, 100000)
def main_generator(dut):
packet = MACPacket(ping_request)
packet.decode_remove_header()
packet = IPPacket(packet)
packet.decode()
packet = ICMPPacket(packet)
packet.decode()
dut.icmp_model.send(packet)
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
for i in range(256):
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
# XXX: find a way to exit properly
import sys
sys.exit()
for i in range(100):
yield
packet = MACPacket(ping_request)
packet.decode_remove_header()
packet = IPPacket(packet)
packet.decode()
packet = ICMPPacket(packet)
packet.decode()
self.icmp_model.send(packet)
if __name__ == "__main__":
tb = TB()
generators = {
"sys" : [main_generator(tb)],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")
run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)

View File

@ -1,6 +1,7 @@
from migen import *
from litex.soc.interconnect import wishbone
from migen.fhdl.std import *
from migen.bus import wishbone
from migen.bus.transactions import *
from migen.sim.generic import run_simulation
from liteeth.common import *
from liteeth.core import LiteEthIPCore
@ -22,32 +23,38 @@ class TB(Module):
self.submodules.ip = LiteEthIPCore(self.phy_model, mac_address, ip_address, 100000)
self.ip_port = self.ip.ip.crossbar.get_port(udp_protocol)
def main_generator(dut):
while True:
yield dut.ip_port.sink.stb.eq(1)
yield dut.ip_port.sink.sop.eq(1)
yield dut.ip_port.sink.eop.eq(1)
yield dut.ip_port.sink.ip_address.eq(0x12345678)
yield dut.ip_port.sink.protocol.eq(udp_protocol)
yield dut.ip_port.source.ack.eq(1)
if (yield dut.ip_port.source.stb) == 1 and (yield dut.ip_port.source.sop) == 1:
print("packet from IP 0x{:08x}".format((yield dut.ip_port.sink.ip_address)))
# XXX: find a way to exit properly
import sys
sys.exit()
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
for i in range(100):
yield
while True:
selfp.ip_port.sink.stb = 1
selfp.ip_port.sink.sop = 1
selfp.ip_port.sink.eop = 1
selfp.ip_port.sink.ip_address = 0x12345678
selfp.ip_port.sink.protocol = udp_protocol
selfp.ip_port.source.ack = 1
if selfp.ip_port.source.stb == 1 and selfp.ip_port.source.sop == 1:
print("packet from IP 0x{:08x}".format(selfp.ip_port.sink.ip_address))
yield
if __name__ == "__main__":
tb = TB()
generators = {
"sys" : [main_generator(tb)],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")
run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)

View File

@ -1,6 +1,7 @@
from migen import *
from litex.soc.interconnect import wishbone
from migen.fhdl.std import *
from migen.bus import wishbone
from migen.bus.transactions import *
from migen.sim.generic import run_simulation
from liteeth.common import *
from liteeth.core.mac.core import LiteEthMACCore
@ -21,6 +22,16 @@ class TB(Module):
self.submodules.logger_randomizer = AckRandomizer(eth_phy_description(8), level=50)
self.submodules.logger = PacketLogger(eth_phy_description(8))
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
self.comb += [
Record.connect(self.streamer.source, self.streamer_randomizer.sink),
Record.connect(self.streamer_randomizer.source, self.core.sink),
@ -28,38 +39,25 @@ class TB(Module):
Record.connect(self.logger_randomizer.source, self.logger.sink)
]
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
def main_generator(dut):
for i in range(2):
packet = mac.MACPacket([i for i in range(64)])
packet.target_mac = 0x010203040506
packet.sender_mac = 0x090A0B0C0C0D
packet.ethernet_type = 0x0800
packet.encode_header()
dut.streamer.send(packet)
yield from dut.logger.receive()
for i in range(8):
packet = mac.MACPacket([i for i in range(64)])
packet.target_mac = 0x010203040506
packet.sender_mac = 0x090A0B0C0C0D
packet.ethernet_type = 0x0800
packet.encode_header()
yield from self.streamer.send(packet)
yield from self.logger.receive()
# check results
s, l, e = check(packet, dut.logger.packet)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
# XXX: find a way to exit properly
import sys
sys.exit()
# check results
s, l, e = check(packet, self.logger.packet)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
tb = TB()
generators = {
"sys" : [main_generator(tb),
tb.streamer.generator(),
tb.streamer_randomizer.generator(),
tb.logger_randomizer.generator(),
tb.logger.generator()],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")
run_simulation(TB(), ncycles=4000, vcd_name="my.vcd", keep_files=True)

View File

@ -1,6 +1,7 @@
from migen import *
from litex.soc.interconnect import wishbone
from migen.fhdl.std import *
from migen.bus import wishbone
from migen.bus.transactions import *
from migen.sim.generic import run_simulation
from liteeth.common import *
from liteeth.core.mac import LiteEthMAC
@ -15,30 +16,30 @@ class WishboneMaster:
self.dat = 0
def write(self, adr, dat):
yield self.obj.cyc.eq(1)
yield self.obj.stb.eq(1)
yield self.obj.adr.eq(adr)
yield self.obj.we.eq(1)
yield self.obj.sel.eq(0xf)
yield self.obj.dat_w.eq(dat)
while (yield self.obj.ack) == 0:
self.obj.cyc = 1
self.obj.stb = 1
self.obj.adr = adr
self.obj.we = 1
self.obj.sel = 0xF
self.obj.dat_w = dat
while self.obj.ack == 0:
yield
yield self.obj.cyc.eq(0)
yield self.obj.stb.eq(0)
self.obj.cyc = 0
self.obj.stb = 0
yield
def read(self, adr):
yield self.obj.cyc.eq(1)
yield self.obj.stb.eq(1)
yield self.obj.adr.eq(adr)
yield self.obj.we.eq(0)
yield self.obj.sel.eq(0xf)
yield self.obj.dat_w.eq(0)
while (yield self.obj.ack) == 0:
self.obj.cyc = 1
self.obj.stb = 1
self.obj.adr = adr
self.obj.we = 0
self.obj.sel = 0xF
self.obj.dat_w = 0
while self.obj.ack == 0:
yield
yield self.dat.eq(self.obj.dat_r)
yield self.obj.cyc.eq(0)
yield self.obj.stb.eq(0)
self.dat = self.obj.dat_r
self.obj.cyc = 0
self.obj.stb = 0
yield
@ -47,21 +48,21 @@ class SRAMReaderDriver:
self.obj = obj
def start(self, slot, length):
yield self.obj._slot.storage.eq(slot)
yield self.obj._length.storage.eq(length)
yield self.obj._start.re.eq(1)
self.obj._slot.storage = slot
self.obj._length.storage = length
self.obj._start.re = 1
yield
yield self.obj._start.re.eq(0)
self.obj._start.re = 0
yield
def wait_done(self):
while (yield self.obj.ev.done.pending) == 0:
while self.obj.ev.done.pending == 0:
yield
def clear_done(self):
yield self.obj.ev.done.clear.eq(1)
self.obj.ev.done.clear = 1
yield
yield self.obj.ev.done.clear.eq(0)
self.obj.ev.done.clear = 0
yield
@ -70,80 +71,79 @@ class SRAMWriterDriver:
self.obj = obj
def wait_available(self):
while (yield self.obj.ev.available.pending) == 0:
while self.obj.ev.available.pending == 0:
yield
def clear_available(self):
yield self.obj.ev.available.clear.eq(1)
self.obj.ev.available.clear = 1
yield
yield self.obj.ev.available.clear.eq(0)
self.obj.ev.available.clear = 0
yield
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=True)
self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=True)
self.submodules.phy_model = phy.PHY(8, debug=False)
self.submodules.mac_model = mac.MAC(self.phy_model, debug=False, loopback=True)
self.submodules.ethmac = LiteEthMAC(phy=self.phy_model, dw=32, interface="wishbone", with_preamble_crc=True)
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
def main_generator(dut):
wishbone_master = WishboneMaster(dut.ethmac.bus)
sram_reader_driver = SRAMReaderDriver(dut.ethmac.interface.sram.reader)
sram_writer_driver = SRAMWriterDriver(dut.ethmac.interface.sram.writer)
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
sram_writer_slots_offset = [0x000, 0x200]
sram_reader_slots_offset = [0x400, 0x600]
wishbone_master = WishboneMaster(selfp.ethmac.bus)
sram_reader_driver = SRAMReaderDriver(selfp.ethmac.interface.sram.reader)
sram_writer_driver = SRAMWriterDriver(selfp.ethmac.interface.sram.writer)
length = 150+2
sram_writer_slots_offset = [0x000, 0x200]
sram_reader_slots_offset = [0x400, 0x600]
tx_payload = [seed_to_data(i, True) % 0xFF for i in range(length)] + [0, 0, 0, 0]
length = 150+2
errors = 0
tx_payload = [seed_to_data(i, True) % 0xFF for i in range(length)] + [0, 0, 0, 0]
while True:
for i in range(20):
yield
for slot in range(2):
print("slot {}: ".format(slot), end="")
# fill tx memory
for i in range(length//4+1):
dat = int.from_bytes(tx_payload[4*i:4*(i+1)], "big")
yield from wishbone_master.write(sram_reader_slots_offset[slot]+i, dat)
errors = 0
# XXX: find a way to exit properly
import sys
sys.exit()
while True:
for slot in range(2):
print("slot {}: ".format(slot), end="")
# fill tx memory
for i in range(length//4+1):
dat = int.from_bytes(tx_payload[4*i:4*(i+1)], "big")
yield from wishbone_master.write(sram_reader_slots_offset[slot]+i, dat)
# # send tx payload & wait
# yield from sram_reader_driver.start(slot, length)
# yield from sram_reader_driver.wait_done()
# yield from sram_reader_driver.clear_done()
#
# # wait rx
# yield from sram_writer_driver.wait_available()
# yield from sram_writer_driver.clear_available()
#
# # get rx payload (loopback on PHY Model)
# rx_payload = []
# for i in range(length//4+1):
# yield from wishbone_master.read(sram_writer_slots_offset[slot]+i)
# dat = wishbone_master.dat
# rx_payload += list(dat.to_bytes(4, byteorder='big'))
#
# # check results
# s, l, e = check(tx_payload[:length], rx_payload[:min(length, len(rx_payload))])
# print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
# send tx payload & wait
yield from sram_reader_driver.start(slot, length)
yield from sram_reader_driver.wait_done()
yield from sram_reader_driver.clear_done()
# wait rx
yield from sram_writer_driver.wait_available()
yield from sram_writer_driver.clear_available()
# get rx payload (loopback on PHY Model)
rx_payload = []
for i in range(length//4+1):
yield from wishbone_master.read(sram_writer_slots_offset[slot]+i)
dat = wishbone_master.dat
rx_payload += list(dat.to_bytes(4, byteorder='big'))
# check results
s, l, e = check(tx_payload[:length], rx_payload[:min(length, len(rx_payload))])
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
tb = TB()
generators = {
"sys" : main_generator(tb),
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")
run_simulation(TB(), ncycles=3000, vcd_name="my.vcd", keep_files=True)

View File

@ -54,7 +54,7 @@ class PHY(Module):
print_phy(r)
self.packet = self.phy_sink.packet
def generator(self):
def gen_simulation(self, selfp):
while True:
yield from self.receive()
if self.mac_callback is not None:

View File

@ -1,6 +1,7 @@
from migen import *
from litex.soc.interconnect import wishbone
from migen.fhdl.std import *
from migen.bus import wishbone
from migen.bus.transactions import *
from migen.sim.generic import run_simulation
from liteeth.common import *
from liteeth.core import LiteEthUDPIPCore
@ -34,30 +35,37 @@ class TB(Module):
Record.connect(udp_port.source, self.logger.sink)
]
def main_generator(dut):
packet = Packet([i for i in range(64//(dut.dw//8))])
dut.streamer.send(packet)
yield from dut.logger.receive()
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
self.clock_domains.cd_eth_tx = ClockDomain()
self.comb += [
self.cd_eth_rx.clk.eq(ClockSignal()),
self.cd_eth_rx.rst.eq(ResetSignal()),
self.cd_eth_tx.clk.eq(ClockSignal()),
self.cd_eth_tx.rst.eq(ResetSignal()),
]
# check results
s, l, e = check(packet, dut.logger.packet)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
def gen_simulation(self, selfp):
selfp.cd_eth_rx.rst = 1
selfp.cd_eth_tx.rst = 1
yield
selfp.cd_eth_rx.rst = 0
selfp.cd_eth_tx.rst = 0
for i in range(100):
yield
while True:
packet = Packet([i for i in range(64//(self.dw//8))])
yield from self.streamer.send(packet)
yield from self.logger.receive()
# check results
s, l, e = check(packet, self.logger.packet)
print("shift " + str(s) + " / length " + str(l) + " / errors " + str(e))
# XXX: find a way to exit properly
import sys
sys.exit()
if __name__ == "__main__":
tb = TB(8)
generators = {
"sys" : [main_generator(tb),
tb.streamer.generator(),
tb.logger.generator()],
"eth_tx": [tb.phy_model.phy_sink.generator(),
tb.phy_model.generator()],
"eth_rx": tb.phy_model.phy_source.generator()
}
clocks = {"sys": 10,
"eth_rx": 10,
"eth_tx": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")
run_simulation(TB(8), ncycles=2048, vcd_name="my.vcd", keep_files=True)
run_simulation(TB(16), ncycles=2048, vcd_name="my.vcd", keep_files=True)
run_simulation(TB(32), ncycles=2048, vcd_name="my.vcd", keep_files=True)