litex/misoclib/com/liteusb/test/core_tb.py

148 lines
4.8 KiB
Python
Raw Normal View History

import binascii
from migen.fhdl.std import *
from migen.flow.actor import *
from migen.fhdl.specials import *
from migen.sim.generic import run_simulation
from misoclib.com.liteusb.common import *
from misoclib.com.liteusb.core import LiteUSBCore
from misoclib.com.liteusb.test.common import *
# XXX for now use it from liteeth to avoid duplication
from misoclib.com.liteeth.test.common import *
def crc32(l):
crc = []
crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4, "little")
for byte in crc_bytes:
crc.append(int(byte))
return crc
class USBPacket(Packet):
def __init__(self, init=[]):
Packet.__init__(self, init)
self.crc_error = False
def check_remove_crc(self):
if comp(self[-4:], crc32(self[:-4])):
for i in range(4):
self.pop()
return False
else:
return True
def decode_remove_header(self):
header = []
for byte in self[:packet_header.length]:
header.append(self.pop(0))
for k, v in sorted(packet_header.fields.items()):
setattr(self, k, get_field_data(v, header))
def decode(self):
# XXX Header should be protected by CRC
self.decode_remove_header()
self.crc_error = self.check_remove_crc()
if self.crc_error:
raise ValueError # XXX handle this properly
def encode_header(self):
header = 0
for k, v in sorted(packet_header.fields.items()):
value = merge_bytes(split_bytes(getattr(self, k),
math.ceil(v.width/8)),
"little")
header += (value << v.offset+(v.byte*8))
for d in split_bytes(header, packet_header.length):
self.insert(0, d)
def insert_crc(self):
for d in crc32(self):
self.append(d)
def encode(self):
# XXX Header should be protected by CRC
self.insert_crc()
self.encode_header()
def __repr__(self):
r = "--------\n"
for k in sorted(packet_header.fields.keys()):
r += k + " : 0x{:0x}\n".format(getattr(self, k))
r += "payload: "
for d in self:
r += "{:02x}".format(d)
return r
class PHYModel(Module):
def __init__(self):
self.sink = Sink(phy_description(8))
self.source = Source(phy_description(8))
class TB(Module):
def __init__(self):
self.submodules.phy = PHYModel()
self.submodules.core = LiteUSBCore(self.phy)
self.submodules.phy_streamer = PacketStreamer(phy_description(8))
self.submodules.phy_streamer_randomizer = AckRandomizer(phy_description(8), level=0)
self.submodules.phy_logger_randomizer = AckRandomizer(phy_description(8), level=0)
self.submodules.phy_logger = PacketLogger(phy_description(8))
self.submodules.core_streamer = PacketStreamer(user_description(8))
self.submodules.core_streamer_randomizer = AckRandomizer(user_description(8), level=10)
self.submodules.core_logger = PacketLogger(user_description(8))
self.submodules.core_logger_randomizer = AckRandomizer(user_description(8), level=10)
user_port = self.core.crossbar.get_port(0x12)
self.comb += [
Record.connect(self.phy_streamer.source, self.phy_streamer_randomizer.sink),
Record.connect(self.phy_streamer_randomizer.source, self.phy.source),
Record.connect(self.core_streamer.source, self.core_streamer_randomizer.sink),
Record.connect(self.core_streamer_randomizer.source, user_port.sink),
Record.connect(user_port.source, self.core_logger_randomizer.sink),
Record.connect(self.core_logger_randomizer.source, self.core_logger.sink),
Record.connect(self.phy.sink, self.phy_logger_randomizer.sink),
Record.connect(self.phy_logger_randomizer.source, self.phy_logger.sink)
]
def gen_simulation(self, selfp):
packet = USBPacket([i for i in range(128)])
packet.preamble = 0x5AA55AA5
packet.dst = 0x12
packet.length = 128 + 4
packet.encode()
yield from self.phy_streamer.send(packet)
for i in range(32):
yield
print(self.core_logger.packet)
selfp.core_streamer.source.dst = 0x12
selfp.core_streamer.source.length = 128 + 4
packet = Packet([i for i in range(128)])
yield from self.core_streamer.send(packet)
for i in range(32):
yield
for d in self.phy_logger.packet:
print("%02x" %d, end="")
print("")
packet = USBPacket(self.phy_logger.packet)
packet.decode()
print(packet)
def main():
run_simulation(TB(), ncycles=2000, vcd_name="my.vcd", keep_files=True)
if __name__ == "__main__":
main()