diff --git a/bench/arty.py b/bench/arty.py index d7f8eb5..e16fb1f 100755 --- a/bench/arty.py +++ b/bench/arty.py @@ -10,6 +10,7 @@ import os import argparse from migen import * +from migen.genlib.misc import WaitTimer from litex_boards.platforms import arty from litex_boards.targets.arty import _CRG @@ -46,12 +47,42 @@ class BenchSoC(SoCCore): # SRAM ------------------------------------------------------------------------------------- self.add_ram("sram", 0x20000000, 0x1000) + # UDP Streamer ----------------------------------------------------------------------------- + from liteeth.frontend.stream import LiteEthUDPStreamer + self.submodules.udp_streamer = udp_streamer = LiteEthUDPStreamer( + udp = self.ethcore.udp, + ip_address = "192.168.1.100", + udp_port = 6000, + ) + # Leds ------------------------------------------------------------------------------------- + leds_pads = platform.request_all("user_led") + + # Led Chaser (Default). from litex.soc.cores.led import LedChaser + chaser_leds = Signal(len(leds_pads)) self.submodules.leds = LedChaser( - pads = platform.request_all("user_led"), + pads = chaser_leds, sys_clk_freq = sys_clk_freq) + # Led Control from UDP Streamer RX. + udp_leds = Signal(len(leds_pads)) + self.comb += udp_streamer.source.ready.eq(1) + self.sync += If(udp_streamer.rx.source.valid, + udp_leds.eq(udp_streamer.source.data) + ) + + # Led Mux: Switch to received UDP value for 1s then switch back to Led Chaser. + self.submodules.leds_timer = leds_timer = WaitTimer(sys_clk_freq) + self.comb += [ + leds_timer.wait.eq(~udp_streamer.rx.source.valid), # Reload Timer on new UDP value. + If(leds_timer.done, + leds_pads.eq(chaser_leds) + ).Else( + leds_pads.eq(udp_leds) + ) + ] + # Main --------------------------------------------------------------------------------------------- def main(): diff --git a/bench/test_udp_streamer.py b/bench/test_udp_streamer.py new file mode 100755 index 0000000..4e51712 --- /dev/null +++ b/bench/test_udp_streamer.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +# +# This file is part of LiteEth +# +# Copyright (c) 2021 Florent Kermarrec +# SPDX-License-Identifier: BSD-2-Clause + +# LiteEth UDP Streamer test utility. + +import socket +import time +import argparse +import datetime + +# Leds Test ---------------------------------------------------------------------------------------- + +def leds_test(ip_address="192.168.1.50", udp_port=6000): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + for i in range(8): + sock.sendto(int(0x00).to_bytes(1, byteorder="big"), (ip_address, udp_port)) + time.sleep(0.2) + sock.sendto(int(0xff).to_bytes(1, byteorder="big"), (ip_address, udp_port)) + time.sleep(0.2) + +# Run ---------------------------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser(description="LiteEth UDP Streamer test utility") + parser.add_argument("--ip-address", default="192.168.1.50", help="Board's IP Address") + parser.add_argument("--udp-port", default="6000", help="UDP Port") + parser.add_argument("--leds", action="store_true", help="Test Leds over UDP Streamer") + args = parser.parse_args() + + udp_port = int(args.udp_port, 0) + + if args.leds: + leds_test(ip_address=args.ip_address, udp_port=udp_port) + +if __name__ == "__main__": + main()