udp: add model

This commit is contained in:
Florent Kermarrec 2015-02-04 20:30:17 +01:00
parent 957c16264a
commit 7a4713b385
5 changed files with 131 additions and 6 deletions

View File

@ -77,6 +77,8 @@ udp_header = {
"checksum": HField( 6, 0, 16)
}
udp_protocol = 0x11
def reverse_bytes(v):
n = math.ceil(flen(v)/8)
r = []

View File

@ -7,6 +7,7 @@ model_tb:
$(CMD) ./model/mac.py
$(CMD) ./model/arp.py
$(CMD) ./model/ip.py
$(CMD) ./model/udp.py
mac_core_tb:
$(CMD) mac_core_tb.py
@ -18,4 +19,7 @@ arp_tb:
$(CMD) arp_tb.py
ip_tb:
$(CMD) ip_tb.py
$(CMD) ip_tb.py
udp_tb:
$(CMD) udp_tb.py

View File

@ -62,5 +62,7 @@ udp_infos = {
"destination_mac_address" : 0xd07ab596cd0a,
"protocol" : 0x11,
"source_ip_address" : 0xc0a80165,
"destination_ip_address" : 0xb27b0d78
"destination_ip_address" : 0xb27b0d78,
"source_port" : 0xa63f,
"destination_port" : 0x690f
}

View File

@ -8,8 +8,6 @@ from liteeth.test.model import mac
def print_ip(s):
print_with_prefix(s, "[IP]")
preamble = split_bytes(eth_preamble, 8)
def carry_around_add(a, b):
c = a + b
return (c & 0xffff) + (c >> 16)
@ -75,9 +73,13 @@ class IP(Module):
self.rx_packet = IPPacket()
self.table = {}
self.request_pending = False
self.udp_callback = None
self.mac.set_ip_callback(self.callback)
def set_udp_callback(self, callback):
self.udp_callback = callback
def send(self, packet):
packet.encode()
packet.insert_checksum()
@ -104,16 +106,22 @@ class IP(Module):
if self.loopback:
self.send(packet)
else:
if packet.version != 0x4:
raise ValueError
if packet.ihl != 0x5:
raise ValueError
self.process(packet)
def process(self, packet):
pass
if packet.protocol == udp_protocol:
if self.udp_callback is not None:
self.udp_callback(packet)
if __name__ == "__main__":
from liteeth.test.model.dumps import *
from liteeth.test.model.mac import *
errors = 0
# ARP request
# UDP packet
packet = MACPacket(udp)
packet.decode_remove_header()
#print(packet)

109
liteeth/test/model/udp.py Normal file
View File

@ -0,0 +1,109 @@
import math
from liteeth.common import *
from liteeth.test.common import *
from liteeth.test.model import ip
def print_udp(s):
print_with_prefix(s, "[UDP]")
# UDP model
class UDPPacket(Packet):
def __init__(self, init=[]):
Packet.__init__(self, init)
def decode(self):
header = []
for byte in self[:udp_header_len]:
header.append(self.pop(0))
for k, v in sorted(udp_header.items()):
setattr(self, k, get_field_data(v, header))
def encode(self):
header = 0
for k, v in sorted(udp_header.items()):
value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
header += (value << v.offset+(v.byte*8))
for d in split_bytes(header, udp_header_len):
self.insert(0, d)
def __repr__(self):
r = "--------\n"
for k in sorted(udp_header.keys()):
r += k + " : 0x%x" %getattr(self,k) + "\n"
r += "payload: "
for d in self:
r += "%02x" %d
return r
class UDP(Module):
def __init__(self, ip, ip_address, debug=False, loopback=False):
self.ip = ip
self.debug = debug
self.loopback = loopback
self.tx_packets = []
self.tx_packet = UDPPacket()
self.rx_packet = UDPPacket()
self.ip.set_udp_callback(self.callback)
def send(self, packet):
packet.encode()
if self.debug:
print_udp(">>>>>>>>")
print_udp(packet)
ip_packet = ip.IPPacket(packet)
ip_packet.version = 0x4
ip_packet.ihl = 0x5
ip_packet.dscp = 0x0
ip_packet.ecn = 0x0
ip_packet.total_length = len(packet) + ip_packet.ihl
ip_packet.identification = 0
ip_packet.flags = 0
ip_packet.fragment_offset = 0
ip_packet.time_to_live = 0x80
ip_packet.source_ip_address = ip_address,
ip_packet.destination_ip_address = 0x12345678 # XXX
self.ip.send(ip_packet)
def callback(self, packet):
packet = UDPPacket(packet)
packet.decode()
if self.debug:
print_udp("<<<<<<<<")
print_udp(packet)
if self.loopback:
self.send(packet)
else:
self.process(packet)
def process(self, packet):
pass
if __name__ == "__main__":
from liteeth.test.model.dumps import *
from liteeth.test.model.mac import *
from liteeth.test.model.ip import *
errors = 0
# UDP packet
packet = MACPacket(udp)
packet.decode_remove_header()
#print(packet)
packet = IPPacket(packet)
packet.decode()
#print(packet)
packet = UDPPacket(packet)
packet.decode()
#print(packet)
if packet.length != (len(packet)+udp_header_len):
errors += 1
errors += verify_packet(packet, udp_infos)
packet.encode()
packet.decode()
#print(packet)
if packet.length != (len(packet)+udp_header_len):
errors += 1
errors += verify_packet(packet, udp_infos)
print("udp errors " + str(errors))