mirror of
https://github.com/enjoy-digital/liteeth.git
synced 2025-01-03 03:43:37 -05:00
64b85e621e
Artix7/Ultrascale 1000BaseX is reused from MiSoC/LiteEthMini, specify it.
47 lines
No EOL
1.1 KiB
Python
47 lines
No EOL
1.1 KiB
Python
#
|
|
# This file is part of LiteEth.
|
|
#
|
|
# Copyright (c) 2015-2018 Florent Kermarrec <florent@enjoy-digital.fr>
|
|
# SPDX-License-Identifier: BSD-2-Clause
|
|
|
|
import socket
|
|
import threading
|
|
|
|
|
|
def test(fpga_ip, udp_port, test_message):
|
|
tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rx_sock.bind(("", udp_port))
|
|
rx_sock.settimeout(0.5)
|
|
|
|
def receive():
|
|
while True:
|
|
try:
|
|
msg = rx_sock.recv(8192)
|
|
for byte in msg:
|
|
print(chr(byte), end="")
|
|
except:
|
|
break
|
|
|
|
def send():
|
|
tx_sock.sendto(bytes(test_message, "utf-8"), (fpga_ip, udp_port))
|
|
|
|
receive_thread = threading.Thread(target=receive)
|
|
receive_thread.start()
|
|
|
|
send_thread = threading.Thread(target=send)
|
|
send_thread.start()
|
|
|
|
try:
|
|
send_thread.join(5)
|
|
send_thread.join(5)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
# # #
|
|
|
|
test_message = "LiteEth Stream Hello world\n"
|
|
test("192.168.1.50", 10000, test_message)
|
|
|
|
# # # |