mirror of
https://github.com/enjoy-digital/litex.git
synced 2025-01-04 09:52:26 -05:00
208 lines
6.8 KiB
Python
208 lines
6.8 KiB
Python
import subprocess
|
|
import math
|
|
|
|
from misoclib.mem.litesata.common import *
|
|
from misoclib.mem.litesata.test.common import *
|
|
|
|
|
|
def print_link(s, n=None):
|
|
print_with_prefix(s, "[LNK{}]: ".format("" if n is None else str(n)))
|
|
|
|
|
|
def import_scrambler_datas():
|
|
with subprocess.Popen(["./scrambler"],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE) as process:
|
|
process.stdin.write("0x10000".encode("ASCII"))
|
|
out, err = process.communicate()
|
|
return [int(e, 16) for e in out.decode("utf-8").split("\n")[:-1]]
|
|
|
|
|
|
class LinkPacket(list):
|
|
def __init__(self, init=[]):
|
|
self.ongoing = False
|
|
self.done = False
|
|
self.scrambled_datas = import_scrambler_datas()
|
|
for dword in init:
|
|
self.append(dword)
|
|
|
|
|
|
class LinkRXPacket(LinkPacket):
|
|
def descramble(self):
|
|
for i in range(len(self)):
|
|
self[i] = self[i] ^ self.scrambled_datas[i]
|
|
|
|
def check_crc(self):
|
|
stdin = ""
|
|
for v in self[:-1]:
|
|
stdin += "0x{:08x} ".format(v)
|
|
stdin += "exit"
|
|
with subprocess.Popen("./crc",
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE) as process:
|
|
process.stdin.write(stdin.encode("ASCII"))
|
|
out, err = process.communicate()
|
|
crc = int(out.decode("ASCII"), 16)
|
|
r = (self[-1] == crc)
|
|
self.pop()
|
|
return r
|
|
|
|
def decode(self):
|
|
self.descramble()
|
|
return self.check_crc()
|
|
|
|
|
|
class LinkTXPacket(LinkPacket):
|
|
def insert_crc(self):
|
|
stdin = ""
|
|
for v in self:
|
|
stdin += "0x{:08x} ".format(v)
|
|
stdin += "exit"
|
|
with subprocess.Popen("./crc",
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE) as process:
|
|
process.stdin.write(stdin.encode("ASCII"))
|
|
out, err = process.communicate()
|
|
crc = int(out.decode("ASCII"), 16)
|
|
self.append(crc)
|
|
|
|
def scramble(self):
|
|
for i in range(len(self)):
|
|
self[i] = self[i] ^ self.scrambled_datas[i]
|
|
|
|
def encode(self):
|
|
self.insert_crc()
|
|
self.scramble()
|
|
|
|
|
|
class LinkLayer(Module):
|
|
def __init__(self, phy, debug=False, random_level=0):
|
|
self.phy = phy
|
|
self.debug = debug
|
|
self.random_level = random_level
|
|
self.tx_packets = []
|
|
self.tx_packet = LinkTXPacket()
|
|
self.rx_packet = LinkRXPacket()
|
|
|
|
self.rx_cont = False
|
|
self.rx_last = 0
|
|
self.tx_cont = False
|
|
self.tx_cont_nb = -1
|
|
self.tx_lasts = [0, 0, 0]
|
|
|
|
self.scrambled_datas = import_scrambler_datas()
|
|
|
|
self.transport = None
|
|
self.n = None
|
|
|
|
self.send_state = ""
|
|
self.send_states = ["RDY", "SOF", "DATA", "EOF", "WTRM"]
|
|
|
|
def set_transport(self, transport):
|
|
self.transport = transport
|
|
|
|
def send(self, dword):
|
|
if self.send_state == "RDY":
|
|
self.phy.send(primitives["X_RDY"])
|
|
if dword == primitives["R_RDY"]:
|
|
self.send_state = "SOF"
|
|
elif self.send_state == "SOF":
|
|
self.phy.send(primitives["SOF"])
|
|
self.send_state = "DATA"
|
|
elif self.send_state == "DATA":
|
|
if dword == primitives["HOLD"]:
|
|
self.phy.send(primitives["HOLDA"])
|
|
else:
|
|
self.phy.send(self.tx_packet.pop(0))
|
|
if len(self.tx_packet) == 0:
|
|
self.send_state = "EOF"
|
|
elif self.send_state == "EOF":
|
|
self.phy.send(primitives["EOF"])
|
|
self.send_state = "WTRM"
|
|
elif self.send_state == "WTRM":
|
|
self.phy.send(primitives["WTRM"])
|
|
if dword == primitives["R_OK"]:
|
|
self.tx_packet.done = True
|
|
elif dword == primitives["R_ERR"]:
|
|
self.tx_packet.done = True
|
|
if self.tx_packet.done:
|
|
self.phy.send(primitives["SYNC"])
|
|
|
|
def insert_cont(self):
|
|
self.tx_lasts.pop(0)
|
|
self.tx_lasts.append(self.phy.tx.dword.dat)
|
|
self.tx_cont = True
|
|
for i in range(3):
|
|
if not is_primitive(self.tx_lasts[i]):
|
|
self.tx_cont = False
|
|
if self.tx_lasts[i] != self.tx_lasts[0]:
|
|
self.tx_cont = False
|
|
if self.tx_cont:
|
|
if self.tx_cont_nb == 0:
|
|
self.phy.send(primitives["CONT"])
|
|
else:
|
|
self.phy.send(self.scrambled_datas[self.tx_cont_nb])
|
|
self.tx_cont_nb += 1
|
|
else:
|
|
self.tx_cont_nb = 0
|
|
|
|
def remove_cont(self, dword):
|
|
if dword == primitives["HOLD"]:
|
|
if self.rx_cont:
|
|
self.tx_lasts = [0, 0, 0]
|
|
if dword == primitives["CONT"]:
|
|
self.rx_cont = True
|
|
elif is_primitive(dword):
|
|
self.rx_last = dword
|
|
self.rx_cont = False
|
|
if self.rx_cont:
|
|
dword = self.rx_last
|
|
return dword
|
|
|
|
def callback(self, dword):
|
|
if dword == primitives["X_RDY"]:
|
|
self.phy.send(primitives["R_RDY"])
|
|
elif dword == primitives["WTRM"]:
|
|
self.phy.send(primitives["R_OK"])
|
|
if self.rx_packet.ongoing:
|
|
self.rx_packet.decode()
|
|
if self.transport is not None:
|
|
self.transport.callback(self.rx_packet)
|
|
self.rx_packet.ongoing = False
|
|
elif dword == primitives["HOLD"]:
|
|
self.phy.send(primitives["HOLDA"])
|
|
elif dword == primitives["EOF"]:
|
|
pass
|
|
elif self.rx_packet.ongoing:
|
|
if dword != primitives["HOLD"]:
|
|
n = randn(100)
|
|
if n < self.random_level:
|
|
self.phy.send(primitives["HOLD"])
|
|
else:
|
|
self.phy.send(primitives["R_IP"])
|
|
if not is_primitive(dword):
|
|
self.rx_packet.append(dword)
|
|
elif dword == primitives["SOF"]:
|
|
self.rx_packet = LinkRXPacket()
|
|
self.rx_packet.ongoing = True
|
|
|
|
def gen_simulation(self, selfp):
|
|
self.tx_packet.done = True
|
|
self.phy.send(primitives["SYNC"])
|
|
while True:
|
|
yield from self.phy.receive()
|
|
if self.debug:
|
|
print_link(self.phy, self.n)
|
|
self.phy.send(primitives["SYNC"])
|
|
rx_dword = self.phy.rx.dword.dat
|
|
rx_dword = self.remove_cont(rx_dword)
|
|
if len(self.tx_packets) != 0:
|
|
if self.tx_packet.done:
|
|
self.tx_packet = self.tx_packets.pop(0)
|
|
self.tx_packet.encode()
|
|
self.send_state = "RDY"
|
|
if not self.tx_packet.done:
|
|
self.send(rx_dword)
|
|
else:
|
|
self.callback(rx_dword)
|
|
self.insert_cont()
|