97 lines
2.5 KiB
Python
97 lines
2.5 KiB
Python
from random import Random
|
|
|
|
from migen.fhdl import verilog
|
|
from migen.flow.ala import *
|
|
from migen.flow.network import *
|
|
from migen.actorlib import dma_wishbone
|
|
from migen.actorlib.sim import *
|
|
from migen.bus import wishbone
|
|
from migen.sim.generic import Simulator
|
|
from migen.sim.icarus import Runner
|
|
|
|
class MyPeripheral:
|
|
def __init__(self):
|
|
self.bus = wishbone.Interface()
|
|
self.ack_en = Signal()
|
|
self.prng = Random(763627)
|
|
|
|
def do_simulation(self, s):
|
|
# Only authorize acks on certain cycles to simulate variable latency.
|
|
s.wr(self.ack_en, self.prng.randrange(0, 2))
|
|
|
|
def get_fragment(self):
|
|
comb = [
|
|
self.bus.ack.eq(self.bus.cyc & self.bus.stb & self.ack_en),
|
|
self.bus.dat_r.eq(self.bus.adr + 4)
|
|
]
|
|
return Fragment(comb, sim=[self.do_simulation])
|
|
|
|
def adrgen_gen():
|
|
for i in range(10):
|
|
print("Address: " + str(i))
|
|
yield Token("address", {"a": i})
|
|
|
|
def dumper_gen():
|
|
while True:
|
|
t = Token("data")
|
|
yield t
|
|
print("Received: " + str(t.value["d"]))
|
|
|
|
def test_reader():
|
|
print("*** Testing reader")
|
|
adrgen = SimActor(adrgen_gen(), ("address", Source, [("a", BV(30))]))
|
|
reader = dma_wishbone.Reader()
|
|
dumper = SimActor(dumper_gen(), ("data", Sink, [("d", BV(32))]))
|
|
g = DataFlowGraph()
|
|
g.add_connection(adrgen, reader)
|
|
g.add_connection(reader, dumper)
|
|
comp = CompositeActor(g)
|
|
|
|
peripheral = MyPeripheral()
|
|
interconnect = wishbone.InterconnectPointToPoint(reader.bus, peripheral.bus)
|
|
|
|
def end_simulation(s):
|
|
s.interrupt = adrgen.done and not s.rd(comp.busy)
|
|
|
|
fragment = comp.get_fragment() \
|
|
+ peripheral.get_fragment() \
|
|
+ interconnect.get_fragment() \
|
|
+ Fragment(sim=[end_simulation])
|
|
|
|
sim = Simulator(fragment, Runner())
|
|
sim.run()
|
|
|
|
def trgen_gen():
|
|
for i in range(10):
|
|
a = i
|
|
d = i+10
|
|
print("Address: " + str(a) + " Data: " + str(d))
|
|
yield Token("address_data", {"a": a, "d": d})
|
|
|
|
def test_writer():
|
|
print("*** Testing writer")
|
|
trgen = SimActor(trgen_gen(), ("address_data", Source, [("a", BV(30)), ("d", BV(32))]))
|
|
writer = dma_wishbone.Writer()
|
|
g = DataFlowGraph()
|
|
g.add_connection(trgen, writer)
|
|
comp = CompositeActor(g)
|
|
|
|
peripheral = MyPeripheral()
|
|
tap = wishbone.Tap(peripheral.bus)
|
|
interconnect = wishbone.InterconnectPointToPoint(writer.bus, peripheral.bus)
|
|
|
|
def end_simulation(s):
|
|
s.interrupt = trgen.done and not s.rd(comp.busy)
|
|
|
|
fragment = comp.get_fragment() \
|
|
+ peripheral.get_fragment() \
|
|
+ tap.get_fragment() \
|
|
+ interconnect.get_fragment() \
|
|
+ Fragment(sim=[end_simulation])
|
|
|
|
sim = Simulator(fragment, Runner())
|
|
sim.run()
|
|
|
|
test_reader()
|
|
test_writer()
|