mirror of
https://github.com/enjoy-digital/liteeth.git
synced 2025-01-03 03:43:37 -05:00
bench/arty: Add UDP Streamer example with UDP RX stream redirected to Leds.
Tested with: ./arty.py --build --load ./test_udp_streamer.py --leds
This commit is contained in:
parent
a6298975bd
commit
a26608f30f
2 changed files with 73 additions and 1 deletions
|
@ -10,6 +10,7 @@ import os
|
|||
import argparse
|
||||
|
||||
from migen import *
|
||||
from migen.genlib.misc import WaitTimer
|
||||
|
||||
from litex_boards.platforms import arty
|
||||
from litex_boards.targets.arty import _CRG
|
||||
|
@ -46,12 +47,42 @@ class BenchSoC(SoCCore):
|
|||
# SRAM -------------------------------------------------------------------------------------
|
||||
self.add_ram("sram", 0x20000000, 0x1000)
|
||||
|
||||
# UDP Streamer -----------------------------------------------------------------------------
|
||||
from liteeth.frontend.stream import LiteEthUDPStreamer
|
||||
self.submodules.udp_streamer = udp_streamer = LiteEthUDPStreamer(
|
||||
udp = self.ethcore.udp,
|
||||
ip_address = "192.168.1.100",
|
||||
udp_port = 6000,
|
||||
)
|
||||
|
||||
# Leds -------------------------------------------------------------------------------------
|
||||
leds_pads = platform.request_all("user_led")
|
||||
|
||||
# Led Chaser (Default).
|
||||
from litex.soc.cores.led import LedChaser
|
||||
chaser_leds = Signal(len(leds_pads))
|
||||
self.submodules.leds = LedChaser(
|
||||
pads = platform.request_all("user_led"),
|
||||
pads = chaser_leds,
|
||||
sys_clk_freq = sys_clk_freq)
|
||||
|
||||
# Led Control from UDP Streamer RX.
|
||||
udp_leds = Signal(len(leds_pads))
|
||||
self.comb += udp_streamer.source.ready.eq(1)
|
||||
self.sync += If(udp_streamer.rx.source.valid,
|
||||
udp_leds.eq(udp_streamer.source.data)
|
||||
)
|
||||
|
||||
# Led Mux: Switch to received UDP value for 1s then switch back to Led Chaser.
|
||||
self.submodules.leds_timer = leds_timer = WaitTimer(sys_clk_freq)
|
||||
self.comb += [
|
||||
leds_timer.wait.eq(~udp_streamer.rx.source.valid), # Reload Timer on new UDP value.
|
||||
If(leds_timer.done,
|
||||
leds_pads.eq(chaser_leds)
|
||||
).Else(
|
||||
leds_pads.eq(udp_leds)
|
||||
)
|
||||
]
|
||||
|
||||
# Main ---------------------------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
|
|
41
bench/test_udp_streamer.py
Executable file
41
bench/test_udp_streamer.py
Executable file
|
@ -0,0 +1,41 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
#
|
||||
# This file is part of LiteEth
|
||||
#
|
||||
# Copyright (c) 2021 Florent Kermarrec <florent@enjoy-digital.fr>
|
||||
# SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
# LiteEth UDP Streamer test utility.
|
||||
|
||||
import socket
|
||||
import time
|
||||
import argparse
|
||||
import datetime
|
||||
|
||||
# Leds Test ----------------------------------------------------------------------------------------
|
||||
|
||||
def leds_test(ip_address="192.168.1.50", udp_port=6000):
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
for i in range(8):
|
||||
sock.sendto(int(0x00).to_bytes(1, byteorder="big"), (ip_address, udp_port))
|
||||
time.sleep(0.2)
|
||||
sock.sendto(int(0xff).to_bytes(1, byteorder="big"), (ip_address, udp_port))
|
||||
time.sleep(0.2)
|
||||
|
||||
# Run ----------------------------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="LiteEth UDP Streamer test utility")
|
||||
parser.add_argument("--ip-address", default="192.168.1.50", help="Board's IP Address")
|
||||
parser.add_argument("--udp-port", default="6000", help="UDP Port")
|
||||
parser.add_argument("--leds", action="store_true", help="Test Leds over UDP Streamer")
|
||||
args = parser.parse_args()
|
||||
|
||||
udp_port = int(args.udp_port, 0)
|
||||
|
||||
if args.leds:
|
||||
leds_test(ip_address=args.ip_address, udp_port=udp_port)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in a new issue