improve test_udp (add random data and check)

This commit is contained in:
Florent Kermarrec 2015-02-06 20:49:30 +01:00
parent 0a0d113331
commit a8300a2e93
1 changed files with 55 additions and 7 deletions

View File

@ -1,23 +1,71 @@
import socket import socket
import time import time
import threading import threading
import copy
KB = 1024
MB = 1024*KB
GB = 1024*MB
def seed_to_data(seed, random=True):
if random:
return (seed * 0x31415979 + 1) & 0xffffffff
else:
return seed
def check(p1, p2):
p1 = copy.deepcopy(p1)
p2 = copy.deepcopy(p2)
if isinstance(p1, int):
return 0, 1, int(p1 != p2)
else:
if len(p1) >= len(p2):
ref, res = p1, p2
else:
ref, res = p2, p1
shift = 0
while((ref[0] != res[0]) and (len(res)>1)):
res.pop(0)
shift += 1
length = min(len(ref), len(res))
errors = 0
for i in range(length):
if ref.pop(0) != res.pop(0):
errors += 1
return shift, length, errors
def generate_packet(seed, length):
r = []
for i in range(length):
r.append(seed_to_data(seed, True)%0xff) # XXX FIXME
seed += 1
return r, seed
FPGA_IP = "192.168.1.40" FPGA_IP = "192.168.1.40"
HOST_IP = "192.168.1.12" HOST_IP = "192.168.1.12"
UDP_PORT = 5010 UDP_PORT = 6000
MESSAGE = bytes("LiteEth UDP Loopback test", "utf-8") MESSAGE = bytes("LiteEth UDP Loopback test", "utf-8")
tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rx_sock.bind(("", UDP_PORT)) rx_sock.bind(("", UDP_PORT))
def receive(): def receive():
while True: rx_seed = 0
while rx_seed < 1*MB:
data, addr = rx_sock.recvfrom(1024) data, addr = rx_sock.recvfrom(1024)
print(data) rx_packet = []
for byte in data:
rx_packet.append(int(byte))
rx_reference_packet, rx_seed = generate_packet(rx_seed, 512)
s, l, e = check(rx_reference_packet, rx_packet)
print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
done = True
def send(): def send():
while True: tx_seed = 0
tx_sock.sendto(MESSAGE, (FPGA_IP, UDP_PORT)) while tx_seed < 1*MB:
tx_packet, tx_seed = generate_packet(tx_seed, 512)
tx_sock.sendto(bytes(tx_packet), (FPGA_IP, UDP_PORT))
time.sleep(0.01) time.sleep(0.01)
receive_thread = threading.Thread(target=receive, daemon=True) receive_thread = threading.Thread(target=receive, daemon=True)