tools/litex_server/client: cleanup.

This commit is contained in:
Florent Kermarrec 2020-11-25 11:34:12 +01:00
parent fa9149720f
commit 2c3687983c
2 changed files with 42 additions and 45 deletions

View file

@ -48,25 +48,25 @@ class RemoteClient(EtherboneIPC, CSRBuilder):
def read(self, addr, length=None, burst="incr"):
length_int = 1 if length is None else length
# prepare packet
# Prepare packet
record = EtherboneRecord()
incr = (burst == "incr")
record.reads = EtherboneReads(addrs=[self.base_address + addr + 4*incr*j for j in range(length_int)])
record.reads = EtherboneReads(addrs=[self.base_address + addr + 4*incr*j for j in range(length_int)])
record.rcount = len(record.reads)
# send packet
# Send packet
packet = EtherbonePacket()
packet.records = [record]
packet.encode()
self.send_packet(self.socket, packet)
# receive response
# Receive response
packet = EtherbonePacket(self.receive_packet(self.socket))
packet.decode()
datas = packet.records.pop().writes.get_datas()
if self.debug:
for i, data in enumerate(datas):
print("read {:08x} @ {:08x}".format(data, self.base_address + addr + 4*i))
print("read 0x{:08x} @ 0x{:08x}".format(data, self.base_address + addr + 4*i))
return datas[0] if length is None else datas
def write(self, addr, datas):
@ -82,7 +82,7 @@ class RemoteClient(EtherboneIPC, CSRBuilder):
if self.debug:
for i, data in enumerate(datas):
print("write {:08x} @ {:08x}".format(data, self.base_address + addr + 4*i))
print("write 0x{:08x} @ 0x{:08x}".format(data, self.base_address + addr + 4*i))
# Utils --------------------------------------------------------------------------------------------

View file

@ -18,6 +18,8 @@ import threading
from litex.tools.remote.etherbone import EtherbonePacket, EtherboneRecord, EtherboneWrites
from litex.tools.remote.etherbone import EtherboneIPC
# Read Merger --------------------------------------------------------------------------------------
def _read_merger(addrs, max_length=256, bursts=["incr", "fixed"]):
"""Sequential reads merger
@ -64,12 +66,14 @@ def _read_merger(addrs, max_length=256, bursts=["incr", "fixed"]):
burst_type = "incr"
yield (burst_base, burst_length, burst_type)
# Remote Server ------------------------------------------------------------------------------------
class RemoteServer(EtherboneIPC):
def __init__(self, comm, bind_ip, bind_port=1234):
self.comm = comm
self.bind_ip = bind_ip
self.comm = comm
self.bind_ip = bind_ip
self.bind_port = bind_port
self.lock = False
self.lock = False
def open(self):
if hasattr(self, "socket"):
@ -108,18 +112,18 @@ class RemoteServer(EtherboneIPC):
record = packet.records.pop()
# wait for lock
# Wait for lock
while self.lock:
time.sleep(0.01)
# set lock
# Set lock
self.lock = True
# handle writes:
# Handle writes:
if record.writes != None:
self.comm.write(record.writes.base_addr, record.writes.get_datas())
# handle reads
# Handle reads
if record.reads != None:
max_length = {
"CommUART": 256,
@ -156,52 +160,38 @@ class RemoteServer(EtherboneIPC):
self.serve_thread.setDaemon(True)
self.serve_thread.start()
# Run ----------------------------------------------------------------------------------------------
def main():
print("LiteX remote server")
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(description="LiteX Server utility")
# Common arguments
parser.add_argument("--bind-ip", default="localhost",
help="Host bind address")
parser.add_argument("--bind-port", default=1234,
help="Host bind port")
parser.add_argument("--debug", action="store_true",
help="Enable debug")
parser.add_argument("--bind-ip", default="localhost", help="Host bind address")
parser.add_argument("--bind-port", default=1234, help="Host bind port")
parser.add_argument("--debug", action="store_true", help="Enable debug")
# UART arguments
parser.add_argument("--uart", action="store_true",
help="Select UART interface")
parser.add_argument("--uart-port", default=None,
help="Set UART port")
parser.add_argument("--uart-baudrate", default=115200,
help="Set UART baudrate")
parser.add_argument("--uart", action="store_true", help="Select UART interface")
parser.add_argument("--uart-port", default=None, help="Set UART port")
parser.add_argument("--uart-baudrate", default=115200, help="Set UART baudrate")
# UDP arguments
parser.add_argument("--udp", action="store_true",
help="Select UDP interface")
parser.add_argument("--udp-ip", default="192.168.1.50",
help="Set UDP remote IP address")
parser.add_argument("--udp-port", default=1234,
help="Set UDP remote port")
parser.add_argument("--udp", action="store_true", help="Select UDP interface")
parser.add_argument("--udp-ip", default="192.168.1.50", help="Set UDP remote IP address")
parser.add_argument("--udp-port", default=1234, help="Set UDP remote port")
# PCIe arguments
parser.add_argument("--pcie", action="store_true",
help="Select PCIe interface")
parser.add_argument("--pcie-bar", default=None,
help="Set PCIe BAR")
parser.add_argument("--pcie", action="store_true", help="Select PCIe interface")
parser.add_argument("--pcie-bar", default=None, help="Set PCIe BAR")
# USB arguments
parser.add_argument("--usb", action="store_true",
help="Select USB interface")
parser.add_argument("--usb-vid", default=None,
help="Set USB vendor ID")
parser.add_argument("--usb-pid", default=None,
help="Set USB product ID")
parser.add_argument("--usb-max-retries", default=10,
help="Number of times to try reconnecting to USB")
parser.add_argument("--usb", action="store_true", help="Select USB interface")
parser.add_argument("--usb-vid", default=None, help="Set USB vendor ID")
parser.add_argument("--usb-pid", default=None, help="Set USB product ID")
parser.add_argument("--usb-max-retries", default=10, help="Number of USB reconecting retries")
args = parser.parse_args()
# UART mode
if args.uart:
from litex.tools.remote.comm_uart import CommUART
if args.uart_port is None:
@ -211,12 +201,16 @@ def main():
uart_baudrate = int(float(args.uart_baudrate))
print("[CommUART] port: {} / baudrate: {} / ".format(uart_port, uart_baudrate), end="")
comm = CommUART(uart_port, uart_baudrate, debug=args.debug)
# UDP mode
elif args.udp:
from litex.tools.remote.comm_udp import CommUDP
udp_ip = args.udp_ip
udp_port = int(args.udp_port)
print("[CommUDP] ip: {} / port: {} / ".format(udp_ip, udp_port), end="")
comm = CommUDP(udp_ip, udp_port, debug=args.debug)
# PCIe mode
elif args.pcie:
from litex.tools.remote.comm_pcie import CommPCIe
pcie_bar = args.pcie_bar
@ -233,6 +227,8 @@ def main():
enable.close()
print("[CommPCIe] bar: {} / ".format(pcie_bar), end="")
comm = CommPCIe(pcie_bar, debug=args.debug)
# USB mode
elif args.usb:
from litex.tools.remote.comm_usb import CommUSB
if args.usb_pid is None and args.usb_vid is None:
@ -246,6 +242,7 @@ def main():
if vid is not None:
vid = int(vid, base=0)
comm = CommUSB(vid=vid, pid=pid, max_retries=args.usb_max_retries, debug=args.debug)
else:
parser.print_help()
exit()