litex/liteeth/test/model/mac.py

152 lines
3.5 KiB
Python
Raw Normal View History

import math, binascii
2015-01-28 13:07:59 -05:00
from liteeth.common import *
from liteeth.mac.common import *
from liteeth.test.common import *
def split_bytes(v, n):
r = []
r_bytes = v.to_bytes(n, byteorder="little")
for byte in r_bytes:
r.append(int(byte))
return r
def merge_bytes(b):
return int.from_bytes(bytes(b), "little")
def get_field_data(field, datas):
v = merge_bytes(datas[field.byte:field.byte+math.ceil(field.width/8)])
return (v >> field.offset) & (2**field.width-1)
2015-01-28 14:44:41 -05:00
def print_mac(s):
print_with_prefix(s, "[MAC]")
preamble = split_bytes(eth_preamble, 8)
2015-01-28 14:44:41 -05:00
2015-01-28 13:07:59 -05:00
def crc32(l):
crc = []
crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4)
2015-01-28 13:07:59 -05:00
for byte in crc_bytes:
crc.append(int(byte))
return crc
# MAC model
class MACPacket(Packet):
2015-01-28 13:07:59 -05:00
def __init__(self, init=[]):
Packet.__init__(self, init)
self.preamble_error = False
self.crc_error = False
2015-01-28 13:07:59 -05:00
def check_remove_preamble(self):
2015-01-28 14:44:41 -05:00
if comp(self[0:8], preamble):
2015-01-28 13:07:59 -05:00
for i in range(8):
self.pop(0)
return False
else:
return True
def check_remove_crc(self):
if comp(self[-4:], crc32(self[:-4])):
for i in range(4):
self.pop()
return False
else:
return True
def decode_remove_header(self):
header = []
for byte in self[:mac_header_len]:
header.append(self.pop(0))
for k, v in sorted(mac_header.items()):
setattr(self, k, get_field_data(v, header))
def decode(self):
self.preamble_error = self.check_remove_preamble()
self.crc_error = self.check_remove_crc()
if self.crc_error or self.preamble_error:
raise ValueError # XXX handle this properly
else:
self.decode_remove_header()
def encode_header(self):
header = 0
for k, v in sorted(mac_header.items()):
header |= (getattr(self, k) << (v.byte*8+v.offset))
for d in reversed(split_bytes(header, mac_header_len)):
self.insert(0, d)
2015-01-28 13:07:59 -05:00
def insert_crc(self):
2015-01-28 14:44:41 -05:00
for d in crc32(self):
self.append(d)
2015-01-28 13:07:59 -05:00
def insert_preamble(self):
2015-01-28 14:44:41 -05:00
for d in reversed(preamble):
self.insert(0, d)
2015-01-28 13:07:59 -05:00
def encode(self):
self.encode_header()
self.insert_crc()
self.insert_preamble()
def __repr__(self):
r = "--------\n"
for k in sorted(mac_header.keys()):
r += k + " : 0x%x" %getattr(self,k) + "\n"
r += "payload: "
for d in self:
r += "%02x" %d
return r
2015-01-28 13:07:59 -05:00
class MAC(Module):
2015-01-28 14:44:41 -05:00
def __init__(self, phy, debug=False, loopback=False):
2015-01-28 13:07:59 -05:00
self.phy = phy
self.debug = debug
2015-01-28 14:44:41 -05:00
self.loopback = loopback
2015-01-28 13:07:59 -05:00
self.tx_packets = []
self.tx_packet = MACPacket()
self.rx_packet = MACPacket()
2015-01-28 13:07:59 -05:00
self.ip_callback = None
self.arp_callback = None
2015-01-28 13:07:59 -05:00
def set_ip_callback(self, callback):
self.ip_callback = callback
def set_arp_callback(self, callback):
self.arp_callback = callback
def send(self, packet):
2015-01-28 14:44:41 -05:00
if self.debug:
print_mac(">>>>>>>>")
print_mac(packet)
packet.encode()
self.tx_packets.append(packet)
2015-01-28 13:07:59 -05:00
def callback(self, datas):
packet = MACPacket(datas)
packet.decode()
2015-01-28 14:44:41 -05:00
if self.debug:
print_mac("<<<<<<<<")
print_mac(packet)
if self.loopback:
self.send(packet)
else:
if self.ethernet_type == ethernet_type_ip:
if self.ip_callback is not None:
self.ip_callback(packet)
elif self.ethernet_type == ethernet_type_arp:
if self.arp_callback is not None:
self.arp_callback(packet)
else:
raise ValueError # XXX handle this properly
2015-01-28 13:07:59 -05:00
def gen_simulation(self, selfp):
self.tx_packet.done = True
while True:
yield from self.phy.receive()
self.callback(self.phy.packet)
# XXX add full duplex
if len(self.tx_packets) != 0:
tx_packet = self.tx_packets.pop(0)
yield from self.phy.send(tx_packet)