litex/liteeth/test/model/mac.py

162 lines
3.8 KiB
Python
Raw Normal View History

import math, binascii
2015-01-28 13:07:59 -05:00
from liteeth.common import *
from liteeth.mac.common import *
from liteeth.test.common import *
2015-01-28 14:44:41 -05:00
def print_mac(s):
print_with_prefix(s, "[MAC]")
preamble = split_bytes(eth_preamble, 8, "little")
2015-01-28 14:44:41 -05:00
2015-01-28 13:07:59 -05:00
def crc32(l):
crc = []
crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4, "little")
2015-01-28 13:07:59 -05:00
for byte in crc_bytes:
crc.append(int(byte))
return crc
# MAC model
class MACPacket(Packet):
2015-01-28 13:07:59 -05:00
def __init__(self, init=[]):
Packet.__init__(self, init)
self.preamble_error = False
self.crc_error = False
2015-01-28 13:07:59 -05:00
def check_remove_preamble(self):
2015-01-28 14:44:41 -05:00
if comp(self[0:8], preamble):
2015-01-28 13:07:59 -05:00
for i in range(8):
self.pop(0)
return False
else:
return True
def check_remove_crc(self):
if comp(self[-4:], crc32(self[:-4])):
for i in range(4):
self.pop()
return False
else:
return True
def decode_remove_header(self):
header = []
for byte in self[:mac_header_len]:
header.append(self.pop(0))
for k, v in sorted(mac_header.items()):
setattr(self, k, get_field_data(v, header))
def decode(self):
self.preamble_error = self.check_remove_preamble()
self.crc_error = self.check_remove_crc()
if self.crc_error or self.preamble_error:
raise ValueError # XXX handle this properly
else:
self.decode_remove_header()
def encode_header(self):
header = 0
for k, v in sorted(mac_header.items()):
value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
header += (value << v.offset+(v.byte*8))
for d in split_bytes(header, mac_header_len):
self.insert(0, d)
2015-01-28 13:07:59 -05:00
def insert_crc(self):
2015-01-28 14:44:41 -05:00
for d in crc32(self):
self.append(d)
2015-01-28 13:07:59 -05:00
def insert_preamble(self):
2015-01-28 14:44:41 -05:00
for d in reversed(preamble):
self.insert(0, d)
2015-01-28 13:07:59 -05:00
def encode(self):
self.encode_header()
self.insert_crc()
self.insert_preamble()
def __repr__(self):
r = "--------\n"
for k in sorted(mac_header.keys()):
r += k + " : 0x%x" %getattr(self,k) + "\n"
r += "payload: "
for d in self:
r += "%02x" %d
return r
2015-01-28 13:07:59 -05:00
class MAC(Module):
2015-01-28 14:44:41 -05:00
def __init__(self, phy, debug=False, loopback=False):
2015-01-28 13:07:59 -05:00
self.phy = phy
self.debug = debug
2015-01-28 14:44:41 -05:00
self.loopback = loopback
2015-01-28 13:07:59 -05:00
self.tx_packets = []
self.tx_packet = MACPacket()
self.rx_packet = MACPacket()
2015-01-28 13:07:59 -05:00
self.ip_callback = None
self.arp_callback = None
2015-01-28 13:07:59 -05:00
def set_ip_callback(self, callback):
self.ip_callback = callback
def set_arp_callback(self, callback):
self.arp_callback = callback
def send(self, packet):
2015-01-28 14:44:41 -05:00
if self.debug:
print_mac(">>>>>>>>")
print_mac(packet)
packet.encode()
self.tx_packets.append(packet)
2015-01-28 13:07:59 -05:00
def callback(self, datas):
packet = MACPacket(datas)
packet.decode()
2015-01-28 14:44:41 -05:00
if self.debug:
print_mac("<<<<<<<<")
print_mac(packet)
if self.loopback:
self.send(packet)
else:
if self.ethernet_type == ethernet_type_ip:
if self.ip_callback is not None:
self.ip_callback(packet)
elif self.ethernet_type == ethernet_type_arp:
if self.arp_callback is not None:
self.arp_callback(packet)
else:
raise ValueError # XXX handle this properly
2015-01-28 13:07:59 -05:00
def gen_simulation(self, selfp):
self.tx_packet.done = True
while True:
yield from self.phy.receive()
self.callback(self.phy.packet)
# XXX add full duplex
if len(self.tx_packets) != 0:
tx_packet = self.tx_packets.pop(0)
yield from self.phy.send(tx_packet)
if __name__ == "__main__":
from liteeth.test.model.dumps import *
errors = 0
packet = MACPacket(arp_request)
packet.decode_remove_header()
#print(packet)
errors += verify_packet(packet, arp_request_infos)
packet.encode_header()
packet.decode_remove_header()
#print(packet)
errors += verify_packet(packet, arp_request_infos)
#print(packet)
packet = MACPacket(arp_reply)
packet.decode_remove_header()
errors += verify_packet(packet, arp_reply_infos)
packet.encode_header()
packet.decode_remove_header()
#print(packet)
errors += verify_packet(packet, arp_reply_infos)
print("mac errors " + str(errors))