examples/dataflow/dma: refactor

This commit is contained in:
Sebastien Bourdeauducq 2012-06-12 19:55:57 +02:00
parent ce9e35b8ef
commit 3a58916a4f
1 changed files with 44 additions and 48 deletions

View File

@ -5,20 +5,24 @@ from migen.flow.ala import *
from migen.flow.network import * from migen.flow.network import *
from migen.actorlib import dma_wishbone from migen.actorlib import dma_wishbone
from migen.actorlib.sim import * from migen.actorlib.sim import *
from migen.bus import wishbone from migen.bus import wishbone, asmibus
from migen.sim.generic import Simulator from migen.sim.generic import Simulator
from migen.sim.icarus import Runner from migen.sim.icarus import Runner
class MyModel(wishbone.TargetModel): class MyModel:
def __init__(self):
self.prng = Random(763627)
def read(self, address): def read(self, address):
return address + 4 return address + 4
class MyModelWB(MyModel, wishbone.TargetModel):
def __init__(self):
self.prng = Random(763627)
def can_ack(self, bus): def can_ack(self, bus):
return self.prng.randrange(0, 2) return self.prng.randrange(0, 2)
class MyModelASMI(MyModel, asmibus.TargetModel):
pass
def adrgen_gen(): def adrgen_gen():
for i in range(10): for i in range(10):
print("Address: " + str(i)) print("Address: " + str(i))
@ -30,53 +34,19 @@ def dumper_gen():
yield t yield t
print("Received: " + str(t.value["d"])) print("Received: " + str(t.value["d"]))
def test_reader():
print("*** Testing reader")
adrgen = SimActor(adrgen_gen(), ("address", Source, [("a", BV(30))]))
reader = dma_wishbone.Reader()
dumper = SimActor(dumper_gen(), ("data", Sink, [("d", BV(32))]))
g = DataFlowGraph()
g.add_connection(adrgen, reader)
g.add_connection(reader, dumper)
comp = CompositeActor(g)
peripheral = wishbone.Target(MyModel())
interconnect = wishbone.InterconnectPointToPoint(reader.bus, peripheral.bus)
def end_simulation(s):
s.interrupt = adrgen.done and not s.rd(comp.busy)
fragment = comp.get_fragment() \
+ peripheral.get_fragment() \
+ interconnect.get_fragment() \
+ Fragment(sim=[end_simulation])
sim = Simulator(fragment, Runner())
sim.run()
def trgen_gen(): def trgen_gen():
for i in range(10): for i in range(10):
a = i a = i
d = i+10 d = i+10
print("Address: " + str(a) + " Data: " + str(d)) print("Address: " + str(a) + " Data: " + str(d))
yield Token("address_data", {"a": a, "d": d}) yield Token("address_data", {"a": a, "d": d})
def test_writer(): def wishbone_sim(efragment, master, end_simulation):
print("*** Testing writer") peripheral = wishbone.Target(MyModelWB())
trgen = SimActor(trgen_gen(), ("address_data", Source, [("a", BV(30)), ("d", BV(32))]))
writer = dma_wishbone.Writer()
g = DataFlowGraph()
g.add_connection(trgen, writer)
comp = CompositeActor(g)
peripheral = wishbone.Target(MyModel())
tap = wishbone.Tap(peripheral.bus) tap = wishbone.Tap(peripheral.bus)
interconnect = wishbone.InterconnectPointToPoint(writer.bus, peripheral.bus) interconnect = wishbone.InterconnectPointToPoint(master.bus, peripheral.bus)
def end_simulation(s): fragment = efragment \
s.interrupt = trgen.done and not s.rd(comp.busy)
fragment = comp.get_fragment() \
+ peripheral.get_fragment() \ + peripheral.get_fragment() \
+ tap.get_fragment() \ + tap.get_fragment() \
+ interconnect.get_fragment() \ + interconnect.get_fragment() \
@ -85,5 +55,31 @@ def test_writer():
sim = Simulator(fragment, Runner()) sim = Simulator(fragment, Runner())
sim.run() sim.run()
test_reader() def test_wb_reader():
test_writer() print("*** Testing Wishbone reader")
adrgen = SimActor(adrgen_gen(), ("address", Source, [("a", BV(30))]))
reader = dma_wishbone.Reader()
dumper = SimActor(dumper_gen(), ("data", Sink, [("d", BV(32))]))
g = DataFlowGraph()
g.add_connection(adrgen, reader)
g.add_connection(reader, dumper)
comp = CompositeActor(g)
def end_simulation(s):
s.interrupt = adrgen.done and not s.rd(comp.busy)
wishbone_sim(comp.get_fragment(), reader, end_simulation)
def test_wb_writer():
print("*** Testing Wishbone writer")
trgen = SimActor(trgen_gen(), ("address_data", Source, [("a", BV(30)), ("d", BV(32))]))
writer = dma_wishbone.Writer()
g = DataFlowGraph()
g.add_connection(trgen, writer)
comp = CompositeActor(g)
def end_simulation(s):
s.interrupt = trgen.done and not s.rd(comp.busy)
wishbone_sim(comp.get_fragment(), writer, end_simulation)
test_wb_reader()
test_wb_writer()