litex/lib/sata/test/bfm.py

465 lines
12 KiB
Python
Raw Normal View History

2014-11-12 12:20:34 -05:00
import subprocess
2014-11-11 12:47:34 -05:00
from migen.fhdl.std import *
from lib.sata.common import *
2014-12-05 11:48:01 -05:00
from lib.sata.test.common import *
2014-12-14 05:44:12 -05:00
# PHY Layer model
class PHYDword:
2014-11-11 12:47:34 -05:00
def __init__(self, dat=0):
self.dat = dat
self.start = 1
self.done = 0
class PHYSource(Module):
def __init__(self):
2014-12-14 04:52:56 -05:00
self.source = Source(phy_description(32))
2014-11-11 12:47:34 -05:00
###
self.dword = PHYDword()
2014-11-11 12:47:34 -05:00
2014-12-03 03:17:51 -05:00
def send(self, dword):
self.dword = dword
2014-11-11 12:47:34 -05:00
def do_simulation(self, selfp):
2014-12-02 15:34:16 -05:00
selfp.source.stb = 1
selfp.source.charisk = 0b0000
for k, v in primitives.items():
if v == self.dword.dat:
selfp.source.charisk = 0b0001
selfp.source.data = self.dword.dat
2014-11-11 12:47:34 -05:00
class PHYSink(Module):
def __init__(self):
2014-12-14 04:52:56 -05:00
self.sink = Sink(phy_description(32))
2014-11-11 12:47:34 -05:00
###
self.dword = PHYDword()
2014-11-11 12:47:34 -05:00
def receive(self):
self.dword.done = 0
while self.dword.done == 0:
yield
def do_simulation(self, selfp):
self.dword.done = 0
selfp.sink.ack = 1
if selfp.sink.stb == 1:
self.dword.done = 1
self.dword.dat = selfp.sink.data
class PHYLayer(Module):
2014-12-05 14:26:09 -05:00
def __init__(self, debug=False):
self.debug = debug
2014-11-11 12:47:34 -05:00
self.submodules.rx = PHYSink()
self.submodules.tx = PHYSource()
2014-11-11 12:47:34 -05:00
self.source = self.tx.source
self.sink = self.rx.sink
2014-11-11 12:47:34 -05:00
2014-12-03 03:17:51 -05:00
def send(self, dword):
packet = PHYDword(dword)
self.tx.send(packet)
2014-11-11 12:47:34 -05:00
def receive(self):
if self.debug:
print(self)
2014-12-05 19:23:03 -05:00
yield from self.rx.receive()
2014-11-11 12:47:34 -05:00
2014-12-03 03:17:51 -05:00
def __repr__(self):
receiving = "%08x " %self.rx.dword.dat
receiving += decode_primitive(self.rx.dword.dat)
2014-12-03 03:17:51 -05:00
receiving += " "*(16-len(receiving))
sending = "%08x " %self.tx.dword.dat
sending += decode_primitive(self.tx.dword.dat)
2014-12-03 03:17:51 -05:00
sending += " "*(16-len(sending))
return receiving + sending
2014-12-14 05:44:12 -05:00
# Link Layer model
2014-12-05 19:23:03 -05:00
def import_scrambler_datas():
with subprocess.Popen(["./scrambler"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
process.stdin.write("0x10000".encode("ASCII"))
out, err = process.communicate()
return [int(e, 16) for e in out.decode("utf-8").split("\n")[:-1]]
class LinkPacket(list):
def __init__(self, init=[]):
self.ongoing = False
self.done = False
2014-12-05 19:23:03 -05:00
self.scrambled_datas = import_scrambler_datas()
for dword in init:
self.append(dword)
2014-12-03 03:17:51 -05:00
class LinkRXPacket(LinkPacket):
def decode(self):
self.descramble()
return self.check_crc()
2014-11-12 12:20:34 -05:00
def descramble(self):
for i in range(len(self)):
self[i] = self[i] ^ self.scrambled_datas[i]
2014-12-03 03:17:51 -05:00
def check_crc(self):
stdin = ""
for v in self[:-1]:
stdin += "0x%08x " %v
stdin += "exit"
with subprocess.Popen("./crc", stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
process.stdin.write(stdin.encode("ASCII"))
out, err = process.communicate()
crc = int(out.decode("ASCII"), 16)
r = (self[-1] == crc)
self.pop()
return r
2014-11-12 12:20:34 -05:00
class LinkTXPacket(LinkPacket):
def encode(self):
self.insert_crc()
2014-12-05 14:26:09 -05:00
self.scramble()
2014-11-12 12:20:34 -05:00
def scramble(self):
for i in range(len(self)):
self[i] = self[i] ^ self.scrambled_datas[i]
def insert_crc(self):
2014-12-02 14:02:43 -05:00
stdin = ""
2014-12-05 14:26:09 -05:00
for v in self:
2014-12-02 14:02:43 -05:00
stdin += "0x%08x " %v
stdin += "exit"
with subprocess.Popen("./crc", stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
process.stdin.write(stdin.encode("ASCII"))
out, err = process.communicate()
crc = int(out.decode("ASCII"), 16)
self.append(crc)
class LinkLayer(Module):
2014-12-05 14:26:09 -05:00
def __init__(self, phy, debug=False, random_level=0):
self.phy = phy
self.debug = debug
2014-12-05 14:26:09 -05:00
self.random_level = random_level
self.tx_packets = []
self.tx_packet = LinkTXPacket()
self.rx_packet = LinkRXPacket()
2014-12-05 19:23:03 -05:00
self.rx_cont = False
2014-12-05 19:23:03 -05:00
self.rx_last = 0
2014-12-05 15:27:26 -05:00
self.tx_cont = False
2014-12-05 19:23:03 -05:00
self.tx_cont_nb = -1
self.tx_lasts = [0, 0, 0]
self.scrambled_datas = import_scrambler_datas()
2014-12-04 19:13:55 -05:00
self.transport_callback = None
2014-12-05 14:26:09 -05:00
self.send_state = ""
self.send_states = ["RDY", "SOF", "DATA", "EOF", "WTRM"]
2014-12-04 19:13:55 -05:00
def set_transport_callback(self, callback):
self.transport_callback = callback
2014-12-05 19:23:03 -05:00
def remove_cont(self, dword):
if dword == primitives["HOLD"]:
if self.rx_cont:
self.tx_lasts = [0, 0, 0]
2014-12-03 05:12:26 -05:00
if dword == primitives["CONT"]:
self.rx_cont = True
2014-12-03 05:12:26 -05:00
elif is_primitive(dword):
2014-12-05 19:23:03 -05:00
self.rx_last = dword
self.rx_cont = False
2014-12-05 19:23:03 -05:00
if self.rx_cont:
dword = self.rx_last
return dword
2014-12-03 05:12:26 -05:00
2014-12-05 19:23:03 -05:00
def callback(self, dword):
2014-11-12 12:20:34 -05:00
if dword == primitives["X_RDY"]:
2014-12-03 03:17:51 -05:00
self.phy.send(primitives["R_RDY"])
2014-12-02 15:34:16 -05:00
elif dword == primitives["WTRM"]:
2014-12-03 03:17:51 -05:00
self.phy.send(primitives["R_OK"])
2014-12-05 14:26:09 -05:00
if self.rx_packet.ongoing:
self.rx_packet.decode()
if self.transport_callback is not None:
self.transport_callback(self.rx_packet)
self.rx_packet.ongoing = False
2014-12-02 15:34:16 -05:00
elif dword == primitives["HOLD"]:
2014-12-03 03:17:51 -05:00
self.phy.send(primitives["HOLDA"])
2014-12-02 15:34:16 -05:00
elif dword == primitives["EOF"]:
2014-12-05 14:26:09 -05:00
pass
elif self.rx_packet.ongoing:
2014-12-02 15:34:16 -05:00
if dword != primitives["HOLD"]:
n = randn(100)
2014-12-05 14:26:09 -05:00
if n < self.random_level:
2014-12-03 03:17:51 -05:00
self.phy.send(primitives["HOLD"])
2014-12-02 15:34:16 -05:00
else:
2014-12-05 14:26:09 -05:00
self.phy.send(primitives["R_IP"])
2014-12-03 05:12:26 -05:00
if not is_primitive(dword):
self.rx_packet.append(dword)
2014-12-02 15:34:16 -05:00
elif dword == primitives["SOF"]:
self.rx_packet = LinkRXPacket()
self.rx_packet.ongoing = True
2014-12-05 14:26:09 -05:00
def send(self, dword):
if self.send_state == "RDY":
self.phy.send(primitives["X_RDY"])
if dword == primitives["R_RDY"]:
self.send_state = "SOF"
elif self.send_state == "SOF":
self.phy.send(primitives["SOF"])
self.send_state = "DATA"
elif self.send_state == "DATA":
2014-12-05 15:27:26 -05:00
if dword == primitives["HOLD"]:
self.phy.send(primitives["HOLDA"])
else:
self.phy.send(self.tx_packet.pop(0))
if len(self.tx_packet) == 0:
self.send_state = "EOF"
2014-12-05 14:26:09 -05:00
elif self.send_state == "EOF":
self.phy.send(primitives["EOF"])
self.send_state = "WTRM"
elif self.send_state == "WTRM":
self.phy.send(primitives["WTRM"])
if dword == primitives["R_OK"]:
self.tx_packet.done = True
elif dword == primitives["R_ERR"]:
self.tx_packet.done = True
2014-11-11 12:47:34 -05:00
2014-12-05 19:23:03 -05:00
def insert_cont(self):
self.tx_lasts.pop(0)
self.tx_lasts.append(self.phy.tx.dword.dat)
self.tx_cont = True
for i in range(3):
if not is_primitive(self.tx_lasts[i]):
self.tx_cont = False
if self.tx_lasts[i] != self.tx_lasts[0]:
self.tx_cont = False
if self.tx_cont:
if self.tx_cont_nb == 0:
self.phy.send(primitives["CONT"])
else:
self.phy.send(self.scrambled_datas[self.tx_cont_nb])
self.tx_cont_nb += 1
else:
self.tx_cont_nb = 0
2014-11-11 12:47:34 -05:00
def gen_simulation(self, selfp):
2014-12-05 14:26:09 -05:00
self.tx_packet.done = True
2014-12-05 15:27:26 -05:00
self.phy.send(primitives["SYNC"])
2014-11-11 12:47:34 -05:00
while True:
yield from self.phy.receive()
2014-12-05 14:26:09 -05:00
self.phy.send(primitives["SYNC"])
rx_dword = self.phy.rx.dword.dat
2014-12-05 19:23:03 -05:00
rx_dword = self.remove_cont(rx_dword)
2014-12-05 14:26:09 -05:00
if len(self.tx_packets) != 0:
if self.tx_packet.done:
self.tx_packet = self.tx_packets.pop(0)
self.tx_packet.encode()
self.send_state = "RDY"
if not self.tx_packet.done:
self.send(rx_dword)
else:
2014-12-05 19:23:03 -05:00
self.callback(rx_dword)
self.insert_cont()
2014-12-14 05:44:12 -05:00
# Transport Layer model
2014-12-04 19:13:55 -05:00
def get_field_data(field, packet):
return (packet[field.dword] >> field.offset) & (2**field.width-1)
class FIS:
2014-12-14 04:52:56 -05:00
def __init__(self, packet, description):
2014-12-04 19:13:55 -05:00
self.packet = packet
2014-12-14 04:52:56 -05:00
self.description = description
2014-12-04 19:13:55 -05:00
self.decode()
def decode(self):
2014-12-14 04:52:56 -05:00
for k, v in self.description.items():
2014-12-04 19:13:55 -05:00
setattr(self, k, get_field_data(v, self.packet))
def encode(self):
2014-12-14 04:52:56 -05:00
for k, v in self.description.items():
2014-12-04 19:13:55 -05:00
self.packet[v.dword] |= (getattr(self, k) << v.offset)
def __repr__(self):
r = "--------\n"
2014-12-14 04:52:56 -05:00
for k in sorted(self.description.keys()):
2014-12-04 19:13:55 -05:00
r += k + " : 0x%x" %getattr(self,k) + "\n"
return r
class FIS_REG_H2D(FIS):
def __init__(self, packet=[0]*fis_reg_h2d_cmd_len):
FIS.__init__(self, packet, fis_reg_h2d_layout)
self.type = fis_types["REG_H2D"]
2014-12-04 19:13:55 -05:00
def __repr__(self):
r = "FIS_REG_H2D\n"
r += FIS.__repr__(self)
return r
class FIS_REG_D2H(FIS):
def __init__(self, packet=[0]*fis_reg_d2h_cmd_len):
FIS.__init__(self, packet, fis_reg_d2h_layout)
self.type = fis_types["REG_D2H"]
2014-12-04 19:13:55 -05:00
def __repr__(self):
r = "FIS_REG_D2H\n"
r += FIS.__repr__(self)
return r
class FIS_DMA_ACTIVATE_D2H(FIS):
def __init__(self, packet=[0]*fis_dma_activate_d2h_cmd_len):
FIS.__init__(self, packet, fis_dma_activate_d2h_layout)
self.type = fis_types["DMA_ACTIVATE_D2H"]
2014-12-04 19:13:55 -05:00
def __repr__(self):
r = "FIS_DMA_ACTIVATE_D2H\n"
r += FIS.__repr__(self)
return r
class FIS_DATA(FIS):
def __init__(self, packet=[0]):
FIS.__init__(self, packet, fis_data_layout)
self.type = fis_types["DATA"]
2014-12-04 19:13:55 -05:00
def __repr__(self):
r = "FIS_DATA\n"
r += FIS.__repr__(self)
for data in self.packet[1:]:
r += "%08x\n" %data
2014-12-04 19:13:55 -05:00
return r
class FIS_UNKNOWN(FIS):
def __init__(self, packet=[0]):
FIS.__init__(self, packet, {})
def __repr__(self):
r = "UNKNOWN\n"
r += "--------\n"
for dword in self.packet:
r += "%08x\n" %dword
return r
class TransportLayer(Module):
2014-12-05 14:26:09 -05:00
def __init__(self, link, debug=False, loopback=False):
self.link = link
self.debug = debug
self.loopback = loopback
self.link.set_transport_callback(self.callback)
2014-12-04 19:13:55 -05:00
def set_command_callback(self, callback):
self.command_callback = callback
def send(self, fis):
fis.encode()
packet = LinkTXPacket(fis.packet)
self.link.tx_packets.append(packet)
if self.debug and not self.loopback:
print(fis)
2014-12-04 19:13:55 -05:00
def callback(self, packet):
fis_type = packet[0] & 0xff
2014-12-04 19:13:55 -05:00
if fis_type == fis_types["REG_H2D"]:
fis = FIS_REG_H2D(packet)
elif fis_type == fis_types["REG_D2H"]:
fis = FIS_REG_D2H(packet)
elif fis_type == fis_types["DMA_ACTIVATE_D2H"]:
fis = FIS_DMA_ACTIVATE_D2H(packet)
elif fis_type == fis_types["DATA"]:
fis = FIS_DATA(packet)
else:
fis = FIS_UNKNOWN(packet)
2014-12-05 14:26:09 -05:00
if self.debug:
print(fis)
if self.loopback:
self.send(fis)
else:
self.command_callback(fis)
2014-12-14 05:44:12 -05:00
# Command Layer model
class CommandLayer(Module):
def __init__(self, transport, debug=False):
self.transport = transport
self.debug = debug
self.transport.set_command_callback(self.callback)
2014-12-14 05:44:12 -05:00
self.hdd = None
2014-12-12 19:18:08 -05:00
2014-12-14 05:44:12 -05:00
def set_hdd(self, hdd):
self.hdd = hdd
2014-12-12 19:18:08 -05:00
2014-12-14 05:44:12 -05:00
def callback(self, fis):
# XXX manage maximum of 2048 DWORDS per DMA
if isinstance(fis, FIS_REG_H2D):
if fis.command == regs["WRITE_DMA_EXT"]:
self.transport.send(self.hdd.write_dma_cmd(fis))
elif fis.command == regs["READ_DMA_EXT"]:
self.transport.send(self.hdd.read_dma_cmd(fis))
elif fis.command == regs["IDENTIFY_DEVICE_DMA"]:
self.transport.send(self.hdd.identify_device_dma_cmd(fis))
elif isinstance(fis, FIS_DATA):
self.hdd.data_cmd(fis)
# HDD model
class HDDMemRegion:
def __init__(self, base, length):
self.base = base
self.length = length
self.data = [0]*(length//4)
class HDD(Module):
def __init__(self, command, debug=False):
self.command = command
command.set_hdd(self)
self.mem = None
self.wr_address = 0
def write_dma_cmd(self, fis):
self.wr_address = fis.lba_lsb
return FIS_DMA_ACTIVATE_D2H()
2014-12-12 19:18:08 -05:00
2014-12-14 05:44:12 -05:00
def read_dma_cmd(self, fis):
packet = self.read_mem(fis.lba_lsb, fis.count*4)
packet.insert(0, 0)
return FIS_DATA(packet)
2014-12-12 19:18:08 -05:00
2014-12-14 05:44:12 -05:00
def identify_dma_cmd(self, fis):
packet = [i for i in range(256)]
packet.insert(0, 0)
return FIS_DATA(packet)
2014-12-14 05:44:12 -05:00
def data_cmd(self, fis):
self.write_mem(self.wr_address, fis.packet[1:])
def allocate_mem(self, base, length):
# XXX add support for multiple memory regions
self.mem = HDDMemRegion(base, length)
def write_mem(self, adr, data):
# XXX test if adr allocated in one memory region
2014-12-14 05:44:12 -05:00
current_adr = (adr-self.mem.base)//4
2014-12-12 19:18:08 -05:00
for i in range(len(data)):
2014-12-14 05:44:12 -05:00
self.mem.data[current_adr+i] = data[i]
2014-12-12 19:18:08 -05:00
2014-12-14 05:44:12 -05:00
def read_mem(self, adr, length=1):
# XXX test if adr allocated in one memory region
2014-12-14 05:44:12 -05:00
current_adr = (adr-self.mem.base)//4
2014-12-12 19:18:08 -05:00
data = []
for i in range(length//4):
2014-12-14 05:44:12 -05:00
data.append(self.mem.data[current_adr+i])
2014-12-12 19:18:08 -05:00
return data
class BFM(Module):
2014-12-05 14:26:09 -05:00
def __init__(self,
phy_debug=False,
link_debug=False, link_random_level=0,
transport_debug=False, transport_loopback=False,
2014-12-14 05:44:12 -05:00
command_debug=False,
hdd_debug=False
2014-12-05 14:26:09 -05:00
):
###
2014-12-05 14:26:09 -05:00
self.submodules.phy = PHYLayer(phy_debug)
self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
self.submodules.command = CommandLayer(self.transport, command_debug)
2014-12-14 05:44:12 -05:00
self.submodules.hdd = HDD(self.command, hdd_debug)