add icmp model

This commit is contained in:
Florent Kermarrec 2015-02-06 09:08:05 +01:00
parent b1e0a21f78
commit cf0b36c6cf
4 changed files with 152 additions and 16 deletions

View File

@ -8,6 +8,7 @@ model_tb:
$(CMD) ./model/arp.py
$(CMD) ./model/ip.py
$(CMD) ./model/udp.py
$(CMD) ./model/icmp.py
mac_core_tb:
$(CMD) mac_core_tb.py

View File

@ -66,3 +66,25 @@ udp_infos = {
"src_port" : 0xa63f,
"dst_port" : 0x690f
}
ping_request = format_dump("""
00 50 56 e0 14 49 00 0c 29 34 0b de 08 00 45 00
00 3c d7 43 00 00 80 01 2b 73 c0 a8 9e 8b ae 89
2a 4d 08 00 2a 5c 02 00 21 00 61 62 63 64 65 66
67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76
77 61 62 63 64 65 66 67 68 69""")
ping_request_infos = {
"code" : 0x0,
"msgtype" : 0x8,
"quench" : 0x2002100
}
ping_reply = format_dump("""
00 0c 29 34 0b de 00 50 56 e0 14 49 08 00 45 00
00 3c 76 e1 00 00 80 01 8b d5 ae 89 2a 4d c0 a8
9e 8b 00 00 32 5c 02 00 21 00 61 62 63 64 65 66
67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76
77 61 62 63 64 65 66 67 68 69""")
ping_reply_infos = {}

106
liteeth/test/model/icmp.py Normal file
View File

@ -0,0 +1,106 @@
import math
from liteeth.common import *
from liteeth.test.common import *
from liteeth.test.model import ip
def print_icmp(s):
print_with_prefix(s, "[ICMP]")
# ICMP model
class ICMPPacket(Packet):
def __init__(self, init=[]):
Packet.__init__(self, init)
def decode(self):
header = []
for byte in self[:icmp_header_len]:
header.append(self.pop(0))
for k, v in sorted(icmp_header.items()):
setattr(self, k, get_field_data(v, header))
def encode(self):
header = 0
for k, v in sorted(icmp_header.items()):
value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
header += (value << v.offset+(v.byte*8))
for d in split_bytes(header, icmp_header_len):
self.insert(0, d)
def __repr__(self):
r = "--------\n"
for k in sorted(icmp_header.keys()):
r += k + " : 0x%x" %getattr(self,k) + "\n"
r += "payload: "
for d in self:
r += "%02x" %d
return r
class ICMP(Module):
def __init__(self, ip, ip_address, debug=False):
self.ip = ip
self.ip_address = ip_address
self.debug = debug
self.loopback = loopback
self.tx_packets = []
self.tx_packet = ICMPPacket()
self.rx_packet = ICMPPacket()
self.ip.set_icmp_callback(self.callback)
def send(self, packet):
packet.encode()
if self.debug:
print_icmp(">>>>>>>>")
print_icmp(packet)
ip_packet = ip.IPPacket(packet)
ip_packet.version = 0x4
ip_packet.ihl = 0x5
ip_packet.total_length = len(packet) + ip_packet.ihl
ip_packet.identification = 0
ip_packet.flags = 0
ip_packet.fragment_offset = 0
ip_packet.ttl = 0x80
ip_packet.sender_ip = self.ip_address
ip_packet.target_ip = 0x12345678 # XXX
ip_packet.checksum = 0
ip_packet.protocol = icmp_protocol
self.ip.send(ip_packet)
def callback(self, packet):
packet = ICMPPacket(packet)
packet.decode()
if self.debug:
print_icmp("<<<<<<<<")
print_icmp(packet)
if self.loopback:
self.send(packet)
else:
self.process(packet)
def process(self, packet):
pass
if __name__ == "__main__":
from liteeth.test.model.dumps import *
from liteeth.test.model.mac import *
from liteeth.test.model.ip import *
errors = 0
# ICMP packet
packet = MACPacket(ping_request)
packet.decode_remove_header()
#print(packet)
packet = IPPacket(packet)
packet.decode()
#print(packet)
packet = ICMPPacket(packet)
packet.decode()
#print(packet)
errors += verify_packet(packet, ping_request_infos)
packet.encode()
packet.decode()
#print(packet)
errors += verify_packet(packet, ping_request_infos)
print("icmp errors " + str(errors))

View File

@ -74,12 +74,16 @@ class IP(Module):
self.table = {}
self.request_pending = False
self.udp_callback = None
self.icmp_callback = None
self.mac.set_ip_callback(self.callback)
def set_udp_callback(self, callback):
self.udp_callback = callback
def set_icmp_callback(self, callback):
self.icmp_callback = callback
def send(self, packet):
packet.encode()
packet.insert_checksum()
@ -116,6 +120,9 @@ class IP(Module):
if packet.protocol == udp_protocol:
if self.udp_callback is not None:
self.udp_callback(packet)
elif packet.protocol == icmp_protocol:
if self.icmp_callback is not None:
self.icmp_callback(packet)
if __name__ == "__main__":
from liteeth.test.model.dumps import *