bench/arty:bench/arty: Add UDP Streamer example with UDP TX stream from Switches.

This commit is contained in:
Florent Kermarrec 2021-09-22 18:21:20 +02:00
parent c7aa4e50f4
commit 4cadf912f2
2 changed files with 44 additions and 2 deletions

View File

@ -10,6 +10,7 @@ import os
import argparse import argparse
from migen import * from migen import *
from migen.genlib.cdc import MultiReg
from migen.genlib.misc import WaitTimer from migen.genlib.misc import WaitTimer
from litex_boards.platforms import arty from litex_boards.platforms import arty
@ -19,6 +20,7 @@ from litex.soc.cores.clock import *
from litex.soc.interconnect.csr import * from litex.soc.interconnect.csr import *
from litex.soc.integration.soc_core import * from litex.soc.integration.soc_core import *
from litex.soc.integration.builder import * from litex.soc.integration.builder import *
from litex.soc.cores.led import LedChaser
from liteeth.phy.mii import LiteEthPHYMII from liteeth.phy.mii import LiteEthPHYMII
@ -59,7 +61,6 @@ class BenchSoC(SoCCore):
leds_pads = platform.request_all("user_led") leds_pads = platform.request_all("user_led")
# Led Chaser (Default). # Led Chaser (Default).
from litex.soc.cores.led import LedChaser
chaser_leds = Signal(len(leds_pads)) chaser_leds = Signal(len(leds_pads))
self.submodules.leds = LedChaser( self.submodules.leds = LedChaser(
pads = chaser_leds, pads = chaser_leds,
@ -83,6 +84,32 @@ class BenchSoC(SoCCore):
) )
] ]
# Switches ---------------------------------------------------------------------------------
if False:
# Resynchronize Swiches inputs.
switches_pads = platform.request_all("user_sw")
switches = Signal(len(switches_pads))
self.specials += MultiReg(switches_pads, switches)
# Send Switches value on UDP Streamer TX every 500ms.
switches_timer = WaitTimer(int(500e-3*sys_clk_freq))
switches_fsm = FSM(reset_state="IDLE")
self.submodules += switches_timer, switches_fsm
switches_fsm.act("IDLE",
switches_timer.wait.eq(1),
If(switches_timer.done,
NextState("SEND")
)
)
switches_fsm.act("SEND",
udp_streamer.sink.valid.eq(1),
udp_streamer.sink.data.eq(switches),
If(udp_streamer.sink.ready,
NextState("IDLE")
)
)
# Main --------------------------------------------------------------------------------------------- # Main ---------------------------------------------------------------------------------------------
def main(): def main():

View File

@ -15,7 +15,7 @@ import datetime
# Leds Test ---------------------------------------------------------------------------------------- # Leds Test ----------------------------------------------------------------------------------------
def leds_test(ip_address="192.168.1.50", udp_port=6000): def leds_test(ip_address, udp_port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for i in range(8): for i in range(8):
sock.sendto(int(0x00).to_bytes(1, byteorder="big"), (ip_address, udp_port)) sock.sendto(int(0x00).to_bytes(1, byteorder="big"), (ip_address, udp_port))
@ -23,6 +23,17 @@ def leds_test(ip_address="192.168.1.50", udp_port=6000):
sock.sendto(int(0xff).to_bytes(1, byteorder="big"), (ip_address, udp_port)) sock.sendto(int(0xff).to_bytes(1, byteorder="big"), (ip_address, udp_port))
time.sleep(0.2) time.sleep(0.2)
# Switches Test ------------------------------------------------------------------------------------
def switches_test(udp_port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", udp_port))
while True:
data, addr = sock.recvfrom(1024)
switches = int.from_bytes(data, byteorder="big")
print(f"Switches value: 0x{switches:02x}")
# Run ---------------------------------------------------------------------------------------------- # Run ----------------------------------------------------------------------------------------------
def main(): def main():
@ -30,6 +41,7 @@ def main():
parser.add_argument("--ip-address", default="192.168.1.50", help="Board's IP Address") parser.add_argument("--ip-address", default="192.168.1.50", help="Board's IP Address")
parser.add_argument("--udp-port", default="6000", help="UDP Port") parser.add_argument("--udp-port", default="6000", help="UDP Port")
parser.add_argument("--leds", action="store_true", help="Test Leds over UDP Streamer") parser.add_argument("--leds", action="store_true", help="Test Leds over UDP Streamer")
parser.add_argument("--switches", action="store_true", help="Test Switches over UDP Streamer")
args = parser.parse_args() args = parser.parse_args()
udp_port = int(args.udp_port, 0) udp_port = int(args.udp_port, 0)
@ -37,5 +49,8 @@ def main():
if args.leds: if args.leds:
leds_test(ip_address=args.ip_address, udp_port=udp_port) leds_test(ip_address=args.ip_address, udp_port=udp_port)
if args.switches:
switches_test(udp_port=udp_port)
if __name__ == "__main__": if __name__ == "__main__":
main() main()