bench/test_etherbone: allow direct CommUDP use with --udp.
Useful to compare performance with RemoteClient and CommUDP.
This commit is contained in:
parent
5abf44d89a
commit
faf78965dd
|
@ -13,14 +13,22 @@ import time
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
from litex import RemoteClient
|
from litex import RemoteClient
|
||||||
|
from litex.tools.remote.comm_udp import CommUDP
|
||||||
|
|
||||||
|
# Constants ----------------------------------------------------------------------------------------
|
||||||
|
|
||||||
KiB = 1024
|
KiB = 1024
|
||||||
MiB = 1024*KiB
|
MiB = 1024*KiB
|
||||||
|
|
||||||
|
comms = {
|
||||||
|
"cli": RemoteClient,
|
||||||
|
"udp": CommUDP
|
||||||
|
}
|
||||||
|
|
||||||
# Identifier Test ----------------------------------------------------------------------------------
|
# Identifier Test ----------------------------------------------------------------------------------
|
||||||
|
|
||||||
def ident_test(port):
|
def ident_test(comm, port):
|
||||||
wb = RemoteClient(port=port)
|
wb = comms[comm](port=port, csr_csv="csr.csv")
|
||||||
wb.open()
|
wb.open()
|
||||||
|
|
||||||
fpga_identifier = ""
|
fpga_identifier = ""
|
||||||
|
@ -37,8 +45,8 @@ def ident_test(port):
|
||||||
|
|
||||||
# Access Test --------------------------------------------------------------------------------------
|
# Access Test --------------------------------------------------------------------------------------
|
||||||
|
|
||||||
def access_test(port):
|
def access_test(comm, port):
|
||||||
wb = RemoteClient(port=port)
|
wb = comms[comm](port=port, csr_csv="csr.csv")
|
||||||
wb.open()
|
wb.open()
|
||||||
|
|
||||||
data = 0x12345678
|
data = 0x12345678
|
||||||
|
@ -52,8 +60,8 @@ def access_test(port):
|
||||||
|
|
||||||
# SRAM Test ----------------------------------------------------------------------------------------
|
# SRAM Test ----------------------------------------------------------------------------------------
|
||||||
|
|
||||||
def sram_test(port):
|
def sram_test(comm, port):
|
||||||
wb = RemoteClient(port=port)
|
wb = comms[comm](port=port, csr_csv="csr.csv")
|
||||||
wb.open()
|
wb.open()
|
||||||
|
|
||||||
def mem_dump(base, length):
|
def mem_dump(base, length):
|
||||||
|
@ -100,12 +108,12 @@ def sram_test(port):
|
||||||
|
|
||||||
# Speed Test ---------------------------------------------------------------------------------------
|
# Speed Test ---------------------------------------------------------------------------------------
|
||||||
|
|
||||||
def speed_test(port):
|
def speed_test(comm, port):
|
||||||
wb = RemoteClient(port=port)
|
wb = comms[comm](port=port, csr_csv="csr.csv")
|
||||||
wb.open()
|
wb.open()
|
||||||
|
|
||||||
test_size = 16*KiB
|
test_size = 16*KiB
|
||||||
burst_size = 128
|
burst_size = 200
|
||||||
|
|
||||||
print("Testing write speed... ", end="")
|
print("Testing write speed... ", end="")
|
||||||
start = time.time()
|
start = time.time()
|
||||||
|
@ -113,15 +121,15 @@ def speed_test(port):
|
||||||
wb.write(wb.mems.sram.base, [j for j in range(burst_size)])
|
wb.write(wb.mems.sram.base, [j for j in range(burst_size)])
|
||||||
end = time.time()
|
end = time.time()
|
||||||
duration = (end - start)
|
duration = (end - start)
|
||||||
print("{:3.2f} KiB/s".format(test_size/(duration*KiB)))
|
print("{:8.2f} KiB/s".format(test_size/(duration*KiB)))
|
||||||
|
|
||||||
print("Testing read speed... ", end="")
|
print("Testing read speed... ", end="")
|
||||||
start = time.time()
|
start = time.time()
|
||||||
for i in range(test_size//(4*burst_size)):
|
for i in range(test_size//(4*burst_size)):
|
||||||
wb.read(wb.mems.sram.base, length=burst_size)
|
wb.read(wb.mems.sram.base, length=burst_size)
|
||||||
end = time.time()
|
end = time.time()
|
||||||
duration = (end - start)
|
duration = (end - start)
|
||||||
print("{:3.2f} KiB/s".format(test_size/(duration*KiB)))
|
print("{:8.2f} KiB/s".format(test_size/(duration*KiB)))
|
||||||
|
|
||||||
wb.close()
|
wb.close()
|
||||||
|
|
||||||
|
@ -130,6 +138,7 @@ def speed_test(port):
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description="LiteEth Etherbone test utility")
|
parser = argparse.ArgumentParser(description="LiteEth Etherbone test utility")
|
||||||
parser.add_argument("--port", default="1234", help="Host bind port")
|
parser.add_argument("--port", default="1234", help="Host bind port")
|
||||||
|
parser.add_argument("--udp", action="store_true", help="Use CommUDP directly instead of RemoteClient")
|
||||||
parser.add_argument("--ident", action="store_true", help="Read FPGA identifier")
|
parser.add_argument("--ident", action="store_true", help="Read FPGA identifier")
|
||||||
parser.add_argument("--access", action="store_true", help="Test single Write/Read access over Etherbone")
|
parser.add_argument("--access", action="store_true", help="Test single Write/Read access over Etherbone")
|
||||||
parser.add_argument("--sram", action="store_true", help="Test SRAM access over Etherbone")
|
parser.add_argument("--sram", action="store_true", help="Test SRAM access over Etherbone")
|
||||||
|
@ -137,18 +146,19 @@ def main():
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
port = int(args.port, 0)
|
port = int(args.port, 0)
|
||||||
|
comm = "cli" if not args.udp else "udp"
|
||||||
|
|
||||||
if args.ident:
|
if args.ident:
|
||||||
ident_test(port=port)
|
ident_test(comm=comm, port=port)
|
||||||
|
|
||||||
if args.access:
|
if args.access:
|
||||||
access_test(port=port)
|
access_test(comm=comm, port=port)
|
||||||
|
|
||||||
if args.sram:
|
if args.sram:
|
||||||
sram_test(port=port)
|
sram_test(comm=comm, port=port)
|
||||||
|
|
||||||
if args.speed:
|
if args.speed:
|
||||||
speed_test(port=port)
|
speed_test(comm=comm, port=port)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
Loading…
Reference in New Issue