liteeth/test/model/phy.py

70 lines
2.1 KiB
Python

# This file is Copyright (c) 2015-2019 Florent Kermarrec <florent@enjoy-digital.fr>
# License: BSD
from litex.soc.interconnect.stream_sim import *
from liteeth.common import *
# Helpers ------------------------------------------------------------------------------------------
def print_phy(s):
print_with_prefix(s, "[PHY]")
# PHY Source ---------------------------------------------------------------------------------------
class PHYSource(PacketStreamer):
def __init__(self, dw):
PacketStreamer.__init__(self, eth_phy_description(dw))
# PHY Sink -----------------------------------------------------------------------------------------
class PHYSink(PacketLogger):
def __init__(self, dw):
PacketLogger.__init__(self, eth_phy_description(dw))
# PHY ----------------------------------------------------------------------------------------------
class PHY(Module):
def __init__(self, dw, debug=False):
self.dw = dw
self.debug = debug
self.submodules.phy_source = PHYSource(dw)
self.submodules.phy_sink = PHYSink(dw)
self.source = self.phy_source.source
self.sink = self.phy_sink.sink
self.mac_callback = None
def set_mac_callback(self, callback):
self.mac_callback = callback
def send(self, datas):
packet = Packet(datas)
if self.debug:
r = ">>>>>>>>\n"
r += "length " + str(len(datas)) + "\n"
for d in datas:
r += "{:02x}".format(d)
print_phy(r)
self.phy_source.send(packet)
def receive(self):
yield from self.phy_sink.receive()
if self.debug:
r = "<<<<<<<<\n"
r += "length " + str(len(self.phy_sink.packet)) + "\n"
for d in self.phy_sink.packet:
r += "{:02x}".format(d)
print_phy(r)
self.packet = self.phy_sink.packet
@passive
def generator(self):
while True:
yield from self.receive()
if self.mac_callback is not None:
self.mac_callback(self.packet)