106 lines
2.4 KiB
Python
106 lines
2.4 KiB
Python
import binascii
|
|
|
|
from liteeth.common import *
|
|
from liteeth.mac.common import *
|
|
from liteeth.test.common import *
|
|
|
|
def print_mac(s):
|
|
print_with_prefix(s, "[MAC]")
|
|
|
|
preamble = [0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0xD5]
|
|
|
|
def crc32(l):
|
|
crc = []
|
|
crc_bytes = binascii.crc32(bytes(l)).to_bytes(4, byteorder="little")
|
|
for byte in crc_bytes:
|
|
crc.append(int(byte))
|
|
return crc
|
|
|
|
# MAC model
|
|
class MACPacket(list):
|
|
def __init__(self, init=[]):
|
|
self.ongoing = False
|
|
self.done = False
|
|
for byte in init:
|
|
self.append(byte)
|
|
|
|
class MACRXPacket(MACPacket):
|
|
def check_remove_preamble(self):
|
|
if comp(self[0:8], preamble):
|
|
for i in range(8):
|
|
self.pop(0)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def check_remove_crc(self):
|
|
if comp(self[-4:], crc32(self[:-4])):
|
|
for i in range(4):
|
|
self.pop()
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
class MACTXPacket(MACPacket):
|
|
def insert_crc(self):
|
|
for d in crc32(self):
|
|
self.append(d)
|
|
|
|
def insert_preamble(self):
|
|
for d in reversed(preamble):
|
|
self.insert(0, d)
|
|
|
|
class MAC(Module):
|
|
def __init__(self, phy, debug=False, loopback=False):
|
|
self.phy = phy
|
|
self.debug = debug
|
|
self.loopback = loopback
|
|
self.tx_packets = []
|
|
self.tx_packet = MACTXPacket()
|
|
self.rx_packet = MACRXPacket()
|
|
|
|
self.ip_callback = None
|
|
|
|
def set_ip_callback(self, callback):
|
|
self.ip_callback = callback
|
|
|
|
def send(self, datas):
|
|
tx_packet = MACTXPacket(datas)
|
|
if self.debug:
|
|
r = ">>>>>>>>\n"
|
|
r += "length " + str(len(tx_packet)) + "\n"
|
|
for d in tx_packet:
|
|
r += "%02x" %d
|
|
print_mac(r)
|
|
tx_packet.insert_crc()
|
|
tx_packet.insert_preamble()
|
|
self.tx_packets.append(tx_packet)
|
|
|
|
def callback(self, datas):
|
|
rx_packet = MACRXPacket(datas)
|
|
preamble_error = rx_packet.check_remove_preamble()
|
|
crc_error = rx_packet.check_remove_crc()
|
|
if self.debug:
|
|
r = "<<<<<<<<\n"
|
|
r += "preamble_error " + str(preamble_error) + "\n"
|
|
r += "crc_error " + str(crc_error) + "\n"
|
|
r += "length " + str(len(rx_packet)) + "\n"
|
|
for d in rx_packet:
|
|
r += "%02x" %d
|
|
print_mac(r)
|
|
if (not preamble_error) and (not crc_error):
|
|
if self.loopback:
|
|
self.send(rx_packet)
|
|
elif self.ip_callback is not None:
|
|
self.ip_callback(rx_packet)
|
|
|
|
def gen_simulation(self, selfp):
|
|
self.tx_packet.done = True
|
|
while True:
|
|
yield from self.phy.receive()
|
|
self.callback(self.phy.packet)
|
|
# XXX add full duplex
|
|
if len(self.tx_packets) != 0:
|
|
tx_packet = self.tx_packets.pop(0)
|
|
yield from self.phy.send(tx_packet)
|