litex/migen/pytholite/io.py

177 lines
5.3 KiB
Python
Raw Normal View History

import ast
2012-11-23 06:41:50 -05:00
from itertools import zip_longest
2012-11-16 13:24:45 -05:00
from migen.fhdl.structure import *
2012-11-17 13:54:50 -05:00
from migen.uio.ioo import UnifiedIOObject
from migen.actorlib.sim import *
2012-11-23 06:41:50 -05:00
from migen.bus import wishbone
2012-11-17 13:54:50 -05:00
from migen.bus.transactions import *
2012-11-16 13:24:45 -05:00
from migen.pytholite.fsm import *
2012-11-16 17:48:41 -05:00
from migen.pytholite.expr import ExprCompiler
2012-11-10 15:51:19 -05:00
2012-11-17 13:54:50 -05:00
class Pytholite(UnifiedIOObject):
2012-11-10 15:51:19 -05:00
def get_fragment(self):
2012-11-23 10:23:24 -05:00
return super().get_fragment() + self.fragment
2012-11-10 15:51:19 -05:00
2012-11-16 17:48:41 -05:00
class _TokenPullExprCompiler(ExprCompiler):
def __init__(self, symdict, modelname, ep):
super().__init__(symdict)
self.modelname = modelname
self.ep = ep
def visit_expr_subscript(self, node):
# check that we are subscripting <modelname>.value
if not isinstance(node.value, ast.Attribute) \
or node.value.attr != "value" \
or not isinstance(node.value.value, ast.Name) \
or node.value.value.id != self.modelname:
raise NotImplementedError
if not isinstance(node.slice, ast.Index):
raise NotImplementedError
field = ast.literal_eval(node.slice.value)
signal = getattr(self.ep.token, field)
return signal
2012-11-23 06:41:50 -05:00
def _gen_df_io(compiler, modelname, to_model, from_model):
epname = ast.literal_eval(to_model["endpoint"])
values = to_model["value"]
ep = compiler.ioo.endpoints[epname]
2012-11-16 13:24:45 -05:00
2012-11-23 06:41:50 -05:00
if isinstance(values, ast.Name) and values.id == "None":
2012-11-16 13:24:45 -05:00
# token pull from sink
2012-11-16 17:48:41 -05:00
if not isinstance(ep, Sink):
raise TypeError("Attempted to pull from source")
ec = _TokenPullExprCompiler(compiler.symdict, modelname, ep)
state = []
for target_regs, expr in from_model:
cexpr = ec.visit_expr(expr)
state += [reg.load(cexpr) for reg in target_regs]
state += [
ep.ack.eq(1),
If(~ep.stb, AbstractNextState(state))
]
return [state], [state]
2012-11-16 13:24:45 -05:00
else:
# token push to source
2012-11-16 17:48:41 -05:00
if not isinstance(ep, Source):
raise TypeError("Attempted to push to sink")
2012-11-16 13:24:45 -05:00
if from_model:
raise TypeError("Attempted to read from pushed token")
2012-11-23 06:41:50 -05:00
if not isinstance(values, ast.Dict):
2012-11-16 13:24:45 -05:00
raise NotImplementedError
state = []
2012-11-23 06:41:50 -05:00
for akey, value in zip(values.keys, values.values):
2012-11-16 13:24:45 -05:00
key = ast.literal_eval(akey)
signal = getattr(ep.token, key)
state.append(signal.eq(compiler.ec.visit_expr(value)))
state += [
ep.stb.eq(1),
If(~ep.ack, AbstractNextState(state))
]
return [state], [state]
2012-11-23 07:09:55 -05:00
class _BusReadExprCompiler(ExprCompiler):
def __init__(self, symdict, modelname, data_signal):
super().__init__(symdict)
self.modelname = modelname
self.data_signal = data_signal
def visit_expr_attribute(self, node):
# recognize <modelname>.data as the bus read signal, raise exception otherwise
if not isinstance(node.value, ast.Name) \
or node.value.id != self.modelname \
or node.attr != "data":
raise NotImplementedError
return self.data_signal
2012-11-23 06:41:50 -05:00
def _gen_wishbone_io(compiler, modelname, model, to_model, from_model, bus):
state = [
bus.cyc.eq(1),
bus.stb.eq(1),
bus.adr.eq(compiler.ec.visit_expr(to_model["address"])),
]
if model == TWrite:
if from_model:
raise TypeError("Attempted to read from write transaction")
state += [
bus.we.eq(1),
bus.dat_w.eq(compiler.ec.visit_expr(to_model["data"]))
]
sel = to_model["sel"]
if isinstance(sel, ast.Name) and sel.id == "None":
nbytes = (len(bus.dat_w) + 7)//8
state.append(bus.sel.eq(2**nbytes-1))
else:
state.append(bus.sel.eq(compiler.ec.visit_expr(sel)))
2012-11-23 06:41:50 -05:00
else:
state.append(bus.we.eq(0))
2012-11-23 07:09:55 -05:00
ec = _BusReadExprCompiler(compiler.symdict, modelname, bus.dat_r)
for target_regs, expr in from_model:
cexpr = ec.visit_expr(expr)
state += [reg.load(cexpr) for reg in target_regs]
2012-11-23 06:41:50 -05:00
state.append(If(~bus.ack, AbstractNextState(state)))
return [state], [state]
def _gen_bus_io(compiler, modelname, model, to_model, from_model):
busname = ast.literal_eval(to_model["busname"])
if busname is None:
if len(compiler.ioo.buses) != 1:
raise TypeError("Bus name not specified")
bus = list(compiler.ioo.buses.values())[0]
else:
bus = compiler.ioo.buses[busname]
if isinstance(bus, wishbone.Interface):
return _gen_wishbone_io(compiler, modelname, model, to_model, from_model, bus)
else:
raise NotImplementedError("Unsupported bus")
def _decode_args(desc, args, args_kw):
d = {}
argnames = set()
for param, value in zip_longest(desc, args):
if param is None:
raise TypeError("Too many arguments")
if isinstance(param, tuple):
name, default = param
else:
name, default = param, None
# build the set of argument names at the same time
argnames.add(name)
if value is None:
if default is None:
raise TypeError("No default value for parameter " + name)
else:
d[name] = default
else:
d[name] = value
for akw in args_kw:
if akw.arg not in argnames:
raise TypeError("Parameter " + akw.arg + " does not exist")
d[akw.arg] = akw.value
return d
def gen_io(compiler, modelname, model, to_model, to_model_kw, from_model):
2012-11-16 13:24:45 -05:00
if model == Token:
2012-11-23 06:41:50 -05:00
desc = [
"endpoint",
("value", ast.Name("None", ast.Load())),
]
args = _decode_args(desc, to_model, to_model_kw)
return _gen_df_io(compiler, modelname, args, from_model)
elif model == TRead or model == TWrite:
desc = [
"address",
("data", ast.Num(0)),
("sel", ast.Name("None", ast.Load())),
("busname", ast.Name("None", ast.Load()))
]
args = _decode_args(desc, to_model, to_model_kw)
return _gen_bus_io(compiler, modelname, model, args, from_model)
2012-11-16 13:24:45 -05:00
else:
raise NotImplementedError