tools/comm_udp/litex_server: add --udp-scan args to scan network for available Etherbone/UDP devices.

litex_server --udp --udp-scan --udp-ip=192.168.1.x --udp-port=1234
Etherbone scan on 192.168.1.x network:
- 192.168.1.20
- 192.168.1.50
This commit is contained in:
Florent Kermarrec 2020-11-26 13:33:20 +01:00
parent 4a748a53b8
commit b02753ecfa
2 changed files with 41 additions and 16 deletions

View file

@ -178,6 +178,7 @@ def main():
parser.add_argument("--udp", action="store_true", help="Select UDP interface")
parser.add_argument("--udp-ip", default="192.168.1.50", help="Set UDP remote IP address")
parser.add_argument("--udp-port", default=1234, help="Set UDP remote port")
parser.add_argument("--udp-scan", action="store_true", help="Scan network for available UDP devices.")
# PCIe arguments
parser.add_argument("--pcie", action="store_true", help="Select PCIe interface")
@ -205,10 +206,21 @@ def main():
# UDP mode
elif args.udp:
from litex.tools.remote.comm_udp import CommUDP
udp_ip = args.udp_ip
udp_ip = args.udp_ip
udp_port = int(args.udp_port)
print("[CommUDP] ip: {} / port: {} / ".format(udp_ip, udp_port), end="")
comm = CommUDP(udp_ip, udp_port, debug=args.debug)
if args.udp_scan:
udp_ip = udp_ip.split(".")
assert len(udp_ip) == 4
udp_ip[3] = "x"
udp_ip = ".".join(udp_ip)
comm = CommUDP(udp_ip, udp_port, debug=args.debug)
comm.open(probe=False)
comm.scan(udp_ip)
comm.close()
exit()
else:
print("[CommUDP] ip: {} / port: {} / ".format(udp_ip, udp_port), end="")
comm = CommUDP(udp_ip, udp_port, debug=args.debug)
# PCIe mode
elif args.pcie:

View file

@ -1,7 +1,7 @@
#
# This file is part of LiteX.
#
# Copyright (c) 2015-2019 Florent Kermarrec <florent@enjoy-digital.fr>
# Copyright (c) 2015-2020 Florent Kermarrec <florent@enjoy-digital.fr>
# Copyright (c) 2016 Tim 'mithro' Ansell <mithro@mithis.com>
# SPDX-License-Identifier: BSD-2-Clause
@ -21,13 +21,14 @@ class CommUDP(CSRBuilder):
self.port = port
self.debug = debug
def open(self):
def open(self, probe=True):
if hasattr(self, "socket"):
return
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(("", self.port))
self.socket.settimeout(1)
self.probe()
if probe:
self.probe(self.server, self.port)
def close(self):
if not hasattr(self, "socket"):
@ -35,22 +36,34 @@ class CommUDP(CSRBuilder):
self.socket.close()
del self.socket
def probe(self):
# Send probe request to server...
packet = EtherbonePacket()
packet.pf = 1
packet.encode()
self.socket.sendto(packet.bytes, (self.server, self.port))
# ...and get/check server's response.
def probe(self, ip, port, loose=False):
try:
# Send probe request to server...
packet = EtherbonePacket()
packet.pf = 1
packet.encode()
self.socket.sendto(packet.bytes, (ip, port))
# ...and get/check server's response.
datas, dummy = self.socket.recvfrom(8192)
packet = EtherbonePacket(datas)
packet.decode()
assert packet.pr == 1
return 1
except:
self.close()
raise Exception(f"Unable to probe Etherbone server at {self.server}.")
if not loose:
self.close()
raise Exception(f"Unable to probe Etherbone server at {self.server}.")
return 0
def scan(self, ip="192.168.1.x"):
print(f"Etherbone scan on {ip} network:")
ip = ip.replace("x", "{}")
self.socket.settimeout(0.01)
for i in range(1, 255):
if self.probe(ip=ip.format(str(i)), port=self.port, loose=True):
print("- {}".format(ip.format(i)))
self.socket.settimeout(1)
def read(self, addr, length=None, burst="incr"):
assert burst == "incr"