liteeth/test/model/phy.py

70 lines
2.1 KiB
Python
Raw Normal View History

# This file is Copyright (c) 2015-2019 Florent Kermarrec <florent@enjoy-digital.fr>
# License: BSD
from litex.soc.interconnect.stream_sim import *
from liteeth.common import *
2015-09-07 07:28:02 -04:00
# Helpers ------------------------------------------------------------------------------------------
2015-09-07 07:28:02 -04:00
def print_phy(s):
print_with_prefix(s, "[PHY]")
# PHY Source ---------------------------------------------------------------------------------------
2016-03-15 15:14:33 -04:00
class PHYSource(PacketStreamer):
2015-09-07 07:28:02 -04:00
def __init__(self, dw):
PacketStreamer.__init__(self, eth_phy_description(dw))
# PHY Sink -----------------------------------------------------------------------------------------
2015-09-07 07:28:02 -04:00
class PHYSink(PacketLogger):
def __init__(self, dw):
PacketLogger.__init__(self, eth_phy_description(dw))
# PHY ----------------------------------------------------------------------------------------------
2015-09-07 07:28:02 -04:00
class PHY(Module):
def __init__(self, dw, debug=False):
self.dw = dw
2015-09-07 07:28:02 -04:00
self.debug = debug
2016-03-15 15:14:33 -04:00
self.submodules.phy_source = PHYSource(dw)
self.submodules.phy_sink = PHYSink(dw)
2015-09-07 07:28:02 -04:00
self.source = self.phy_source.source
self.sink = self.phy_sink.sink
2015-09-07 07:28:02 -04:00
self.mac_callback = None
def set_mac_callback(self, callback):
self.mac_callback = callback
def send(self, datas):
packet = Packet(datas)
if self.debug:
r = ">>>>>>>>\n"
r += "length " + str(len(datas)) + "\n"
for d in datas:
r += "{:02x}".format(d)
print_phy(r)
self.phy_source.send(packet)
def receive(self):
yield from self.phy_sink.receive()
if self.debug:
r = "<<<<<<<<\n"
r += "length " + str(len(self.phy_sink.packet)) + "\n"
for d in self.phy_sink.packet:
r += "{:02x}".format(d)
print_phy(r)
self.packet = self.phy_sink.packet
@passive
def generator(self):
2015-09-07 07:28:02 -04:00
while True:
yield from self.receive()
if self.mac_callback is not None:
self.mac_callback(self.packet)