litex/lib/sata/link/test/bfm.py

196 lines
4.6 KiB
Python
Raw Normal View History

2014-11-12 12:20:34 -05:00
import subprocess
2014-11-11 12:47:34 -05:00
from migen.fhdl.std import *
from lib.sata.std import *
2014-12-02 15:34:16 -05:00
from lib.sata.link.test.common import *
2014-11-11 12:47:34 -05:00
class PHYDword:
2014-11-11 12:47:34 -05:00
def __init__(self, dat=0):
self.dat = dat
self.start = 1
self.done = 0
class PHYSource(Module):
def __init__(self):
self.source = Source(phy_layout(32))
2014-11-11 12:47:34 -05:00
###
self.dword = PHYDword()
2014-11-11 12:47:34 -05:00
2014-12-03 03:17:51 -05:00
def send(self, dword):
self.dword = dword
2014-11-11 12:47:34 -05:00
def do_simulation(self, selfp):
2014-12-02 15:34:16 -05:00
selfp.source.stb = 1
selfp.source.charisk = 0b0000
for k, v in primitives.items():
if v == self.dword.dat:
selfp.source.charisk = 0b0001
selfp.source.data = self.dword.dat
2014-11-11 12:47:34 -05:00
class PHYSink(Module):
def __init__(self):
self.sink = Sink(phy_layout(32))
2014-11-11 12:47:34 -05:00
###
self.dword = PHYDword()
2014-11-11 12:47:34 -05:00
def receive(self):
self.dword.done = 0
while self.dword.done == 0:
yield
def do_simulation(self, selfp):
self.dword.done = 0
selfp.sink.ack = 1
if selfp.sink.stb == 1:
self.dword.done = 1
self.dword.dat = selfp.sink.data
class PHYLayer(Module):
def __init__(self, debug):
self.debug = debug
2014-11-11 12:47:34 -05:00
self.submodules.rx = PHYSink()
self.submodules.tx = PHYSource()
2014-11-11 12:47:34 -05:00
self.source = self.tx.source
self.sink = self.rx.sink
2014-11-11 12:47:34 -05:00
2014-12-03 03:17:51 -05:00
def send(self, dword):
packet = PHYDword(dword)
self.tx.send(packet)
2014-11-11 12:47:34 -05:00
def receive(self):
yield from self.rx.receive()
if self.debug:
print(self)
2014-11-11 12:47:34 -05:00
2014-12-03 03:17:51 -05:00
def __repr__(self):
receiving = "%08x " %self.rx.dword.dat
receiving += decode_primitive(self.rx.dword.dat)
2014-12-03 03:17:51 -05:00
receiving += " "*(16-len(receiving))
sending = "%08x " %self.tx.dword.dat
sending += decode_primitive(self.tx.dword.dat)
2014-12-03 03:17:51 -05:00
sending += " "*(16-len(sending))
return receiving + sending
class LinkPacket(list):
def __init__(self):
self.ongoing = False
self.scrambled_datas = self.import_scrambler_datas()
2014-12-03 03:17:51 -05:00
def import_scrambler_datas(self):
with subprocess.Popen(["./scrambler"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
process.stdin.write("0x10000".encode("ASCII"))
out, err = process.communicate()
return [int(e, 16) for e in out.decode("utf-8").split("\n")[:-1]]
2014-11-12 12:20:34 -05:00
class LinkRXPacket(LinkPacket):
def decode(self):
self.descramble()
return self.check_crc()
2014-11-12 12:20:34 -05:00
def descramble(self):
for i in range(len(self)):
self[i] = self[i] ^ self.scrambled_datas[i]
2014-12-03 03:17:51 -05:00
def check_crc(self):
stdin = ""
for v in self[:-1]:
stdin += "0x%08x " %v
stdin += "exit"
with subprocess.Popen("./crc", stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
process.stdin.write(stdin.encode("ASCII"))
out, err = process.communicate()
crc = int(out.decode("ASCII"), 16)
r = (self[-1] == crc)
self.pop()
return r
2014-11-12 12:20:34 -05:00
class LinkTXPacket(LinkPacket):
def encode(self):
self.scramble()
self.insert_crc()
2014-11-12 12:20:34 -05:00
def scramble(self):
for i in range(len(self)):
self[i] = self[i] ^ self.scrambled_datas[i]
def insert_crc(self):
2014-12-02 14:02:43 -05:00
stdin = ""
for v in self[:-1]:
2014-12-02 14:02:43 -05:00
stdin += "0x%08x " %v
stdin += "exit"
with subprocess.Popen("./crc", stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
process.stdin.write(stdin.encode("ASCII"))
out, err = process.communicate()
crc = int(out.decode("ASCII"), 16)
self.append(crc)
def transport_callback(packet):
print("----")
for v in packet:
print("%08x" %v)
print("----")
class LinkLayer(Module):
def __init__(self, phy, debug, hold_random_level=0):
self.phy = phy
self.debug = debug
self.hold_random_level = hold_random_level
self.tx_packet = LinkTXPacket()
self.rx_packet = LinkRXPacket()
self.rx_cont = False
def callback(self, dword):
2014-12-03 05:12:26 -05:00
if dword == primitives["CONT"]:
self.rx_cont = True
2014-12-03 05:12:26 -05:00
elif is_primitive(dword):
self.rx_cont = False
2014-12-03 05:12:26 -05:00
2014-11-12 12:20:34 -05:00
if dword == primitives["X_RDY"]:
2014-12-03 03:17:51 -05:00
self.phy.send(primitives["R_RDY"])
2014-12-02 15:34:16 -05:00
elif dword == primitives["WTRM"]:
2014-12-03 03:17:51 -05:00
self.phy.send(primitives["R_OK"])
2014-11-12 12:20:34 -05:00
2014-12-02 15:34:16 -05:00
elif dword == primitives["HOLD"]:
2014-12-03 03:17:51 -05:00
self.phy.send(primitives["HOLDA"])
2014-12-02 15:34:16 -05:00
elif dword == primitives["EOF"]:
self.rx_packet.decode()
transport_callback(self.rx_packet)
self.rx_packet.ongoing = False
2014-11-12 12:20:34 -05:00
elif self.rx_packet.ongoing:
2014-12-02 15:34:16 -05:00
if dword != primitives["HOLD"]:
n = randn(100)
2014-12-03 03:17:51 -05:00
if n < self.hold_random_level:
self.phy.send(primitives["HOLD"])
2014-12-02 15:34:16 -05:00
else:
2014-12-03 03:17:51 -05:00
self.phy.send(primitives["R_RDY"])
2014-12-03 05:12:26 -05:00
if not is_primitive(dword):
if not self.rx_cont:
2014-12-03 05:12:26 -05:00
self.rx_packet.append(dword)
2014-12-02 15:34:16 -05:00
elif dword == primitives["SOF"]:
self.rx_packet = LinkRXPacket()
self.rx_packet.ongoing = True
def send(self, packet):
pass
2014-11-11 12:47:34 -05:00
def gen_simulation(self, selfp):
2014-12-03 03:17:51 -05:00
self.phy.send(primitives["SYNC"])
2014-11-11 12:47:34 -05:00
while True:
yield from self.phy.receive()
self.callback(self.phy.rx.dword.dat)
class BFM(Module):
def __init__(self, dw, debug=False, hold_random_level=0):
###
self.submodules.phy = PHYLayer(debug)
self.submodules.link = LinkLayer(self.phy, debug, hold_random_level)