mirror of
https://github.com/enjoy-digital/litex.git
synced 2025-01-04 09:52:26 -05:00
22507b117c
being able to know when a register is updated is useful in many cases and avoid having to handle another register for that. re is asserted when the the last CSR of the Compound is written. Software must also write Compound in the right order.
141 lines
4.4 KiB
Python
141 lines
4.4 KiB
Python
from migen.util.misc import xdir
|
|
from migen.fhdl.std import *
|
|
from migen.fhdl.tracer import get_obj_var_name
|
|
|
|
class _CSRBase(HUID):
|
|
def __init__(self, size, name):
|
|
HUID.__init__(self)
|
|
self.name = get_obj_var_name(name)
|
|
if self.name is None:
|
|
raise ValueError("Cannot extract CSR name from code, need to specify.")
|
|
if len(self.name) > 2 and self.name[:2] == "r_":
|
|
self.name = self.name[2:]
|
|
self.size = size
|
|
|
|
class CSR(_CSRBase):
|
|
def __init__(self, size=1, name=None):
|
|
_CSRBase.__init__(self, size, name)
|
|
self.re = Signal(name=self.name + "_re")
|
|
self.r = Signal(self.size, name=self.name + "_r")
|
|
self.w = Signal(self.size, name=self.name + "_w")
|
|
|
|
class _CompoundCSR(_CSRBase, Module):
|
|
def __init__(self, size, name):
|
|
_CSRBase.__init__(self, size, name)
|
|
self.simple_csrs = []
|
|
|
|
def get_simple_csrs(self):
|
|
if not self.finalized:
|
|
raise FinalizeError
|
|
return self.simple_csrs
|
|
|
|
def do_finalize(self, busword):
|
|
raise NotImplementedError
|
|
|
|
class CSRStatus(_CompoundCSR):
|
|
def __init__(self, size=1, reset=0, name=None):
|
|
_CompoundCSR.__init__(self, size, name)
|
|
self.status = Signal(self.size, reset=reset)
|
|
|
|
def do_finalize(self, busword):
|
|
nwords = (self.size + busword - 1)//busword
|
|
for i in reversed(range(nwords)):
|
|
nbits = min(self.size - i*busword, busword)
|
|
sc = CSR(nbits, self.name + str(i) if nwords > 1 else self.name)
|
|
self.comb += sc.w.eq(self.status[i*busword:i*busword+nbits])
|
|
self.simple_csrs.append(sc)
|
|
|
|
class CSRStorage(_CompoundCSR):
|
|
def __init__(self, size=1, reset=0, atomic_write=False, write_from_dev=False, alignment_bits=0, name=None):
|
|
_CompoundCSR.__init__(self, size, name)
|
|
self.alignment_bits = alignment_bits
|
|
self.storage_full = Signal(self.size, reset=reset)
|
|
self.storage = Signal(self.size - self.alignment_bits, reset=reset >> alignment_bits)
|
|
self.comb += self.storage.eq(self.storage_full[self.alignment_bits:])
|
|
self.atomic_write = atomic_write
|
|
self.re = Signal()
|
|
if write_from_dev:
|
|
self.we = Signal()
|
|
self.dat_w = Signal(self.size - self.alignment_bits)
|
|
self.sync += If(self.we, self.storage_full.eq(self.dat_w << self.alignment_bits))
|
|
|
|
def do_finalize(self, busword):
|
|
nwords = (self.size + busword - 1)//busword
|
|
if nwords > 1 and self.atomic_write:
|
|
backstore = Signal(self.size - busword, name=self.name + "_backstore")
|
|
for i in reversed(range(nwords)):
|
|
nbits = min(self.size - i*busword, busword)
|
|
sc = CSR(nbits, self.name + str(i) if nwords else self.name)
|
|
self.simple_csrs.append(sc)
|
|
lo = i*busword
|
|
hi = lo+nbits
|
|
# read
|
|
if lo >= self.alignment_bits:
|
|
self.comb += sc.w.eq(self.storage_full[lo:hi])
|
|
elif hi > self.alignment_bits:
|
|
self.comb += sc.w.eq(Cat(Replicate(0, hi - self.alignment_bits),
|
|
self.storage_full[self.alignment_bits:hi]))
|
|
else:
|
|
self.comb += sc.w.eq(0)
|
|
# write
|
|
if nwords > 1 and self.atomic_write:
|
|
if i:
|
|
self.sync += If(sc.re, backstore[lo-busword:hi-busword].eq(sc.r))
|
|
else:
|
|
self.sync += If(sc.re, self.storage_full.eq(Cat(sc.r, backstore)))
|
|
else:
|
|
self.sync += If(sc.re, self.storage_full[lo:hi].eq(sc.r))
|
|
self.sync += self.re.eq(sc.re)
|
|
|
|
def csrprefix(prefix, csrs, done):
|
|
for csr in csrs:
|
|
if csr.huid not in done:
|
|
csr.name = prefix + csr.name
|
|
done.add(csr.huid)
|
|
|
|
def memprefix(prefix, memories, done):
|
|
for memory in memories:
|
|
if memory.huid not in done:
|
|
memory.name_override = prefix + memory.name_override
|
|
done.add(memory.huid)
|
|
|
|
class AutoCSR:
|
|
def get_memories(self):
|
|
try:
|
|
exclude = self.autocsr_exclude
|
|
except AttributeError:
|
|
exclude = {}
|
|
try:
|
|
prefixed = self.__prefixed
|
|
except AttributeError:
|
|
prefixed = self.__prefixed = set()
|
|
r = []
|
|
for k, v in xdir(self, True):
|
|
if k not in exclude:
|
|
if isinstance(v, Memory):
|
|
r.append(v)
|
|
elif hasattr(v, "get_memories") and callable(v.get_memories):
|
|
memories = v.get_memories()
|
|
memprefix(k + "_", memories, prefixed)
|
|
r += memories
|
|
return sorted(r, key=lambda x: x.huid)
|
|
|
|
def get_csrs(self):
|
|
try:
|
|
exclude = self.autocsr_exclude
|
|
except AttributeError:
|
|
exclude = {}
|
|
try:
|
|
prefixed = self.__prefixed
|
|
except AttributeError:
|
|
prefixed = self.__prefixed = set()
|
|
r = []
|
|
for k, v in xdir(self, True):
|
|
if k not in exclude:
|
|
if isinstance(v, _CSRBase):
|
|
r.append(v)
|
|
elif hasattr(v, "get_csrs") and callable(v.get_csrs):
|
|
csrs = v.get_csrs()
|
|
csrprefix(k + "_", csrs, prefixed)
|
|
r += csrs
|
|
return sorted(r, key=lambda x: x.huid)
|