diff --git a/litex/soc/doc/csr.py b/litex/soc/doc/csr.py
index b18b0b8a4..2b3edafe2 100644
--- a/litex/soc/doc/csr.py
+++ b/litex/soc/doc/csr.py
@@ -45,6 +45,7 @@ class DocumentedCSR:
size = 8,
description = None,
access = "read-write",
+ cluster = None,
fields = []):
self.name = name
@@ -53,6 +54,7 @@ class DocumentedCSR:
self.address = address
self.offset = offset
self.size = size
+ self.cluster = cluster
if size == 0:
print("!!! Warning: creating CSR of size 0 {}".format(name))
self.description = self.trim(description)
@@ -337,6 +339,7 @@ class DocumentedCSRRegion:
atomic_write = csr.atomic_write
size = self.get_csr_size(csr)
reset = self.get_csr_reset(csr)
+ cluster = csr.cluster
# If the CSR is composed of multiple sub-CSRs, document each
# one individually.
@@ -366,7 +369,8 @@ class DocumentedCSRRegion:
size = self.csr_data_width,
description = d,
fields = self.split_fields(fields, start, start + length),
- access = access
+ access = access,
+ cluster = cluster
))
else:
self.csrs.append(DocumentedCSR(
@@ -379,7 +383,8 @@ class DocumentedCSRRegion:
size = self.csr_data_width,
description = bits_str,
fields = self.split_fields(fields, start, start + length),
- access = access
+ access = access,
+ cluster = cluster
))
self.current_address += 4
else:
@@ -392,7 +397,8 @@ class DocumentedCSRRegion:
size = size,
description = description,
fields = fields,
- access = access
+ access = access,
+ cluster = cluster
))
self.current_address += 4
diff --git a/litex/soc/integration/export.py b/litex/soc/integration/export.py
index ecd474669..14b64c7d1 100644
--- a/litex/soc/integration/export.py
+++ b/litex/soc/integration/export.py
@@ -23,6 +23,7 @@ import json
import time
import datetime
import inspect
+import collections
from shutil import which
from sysconfig import get_platform
@@ -435,6 +436,20 @@ def get_csr_csv(csr_regions={}, constants={}, mem_regions={}):
# SVD Export --------------------------------------------------------------------------------------
def get_csr_svd(soc, vendor="litex", name="soc", description=None):
+ class IndentedAppender:
+ def __init__(self, receiver, indent=""):
+ self._receiver = receiver
+ self._indent = indent
+
+ def append(self, line):
+ self._receiver.append(self._indent + line)
+
+ def indent(self, indent):
+ return IndentedAppender(self, indent)
+
+ def unindent(self):
+ return self._receiver
+
def sub_csr_bit_range(busword, csr, offset):
nwords = (csr.size + busword - 1)//busword
i = nwords - offset - 1
@@ -443,16 +458,18 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
origin = i*busword
return (origin, nbits, name)
- def print_svd_register(csr, csr_address, description, length, svd):
+ def print_svd_register(csr, csr_address, description, length, cluster_state, svd):
+ prefix_len = 0
+ if cluster_state is not None:
+ prefix_len = cluster_state.prefix_len
svd.append(' ')
- svd.append(' {}'.format(csr.short_numbered_name))
+ svd.append(' {}'.format(csr.short_numbered_name[prefix_len:]))
if description is not None:
svd.append(' '.format(description))
svd.append(' 0x{:04x}'.format(csr_address))
svd.append(' 0x{:02x}'.format(csr.reset_value))
svd.append(' {}'.format(length))
# svd.append(' {}'.format(csr.access)) # 'access' is a lie: "read-only" registers can legitimately change state based on a write, and is in fact used to handle the "pending" field in events
- csr_address = csr_address + 4
svd.append(' ')
if hasattr(csr, "fields") and len(csr.fields) > 0:
for field in csr.fields:
@@ -468,7 +485,7 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
svd.append(' ')
else:
field_size = csr.size
- field_name = csr.short_name.lower()
+ field_name = csr.short_name.lower()[prefix_len:]
# Strip off "ev_" from eventmanager fields
if field_name == "ev_enable":
field_name = "enable"
@@ -485,6 +502,135 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
svd.append(' ')
svd.append(' ')
+ def print_svd_registers(region, csrs, svd):
+ csr_address = 0
+ current_cluster = []
+ cluster_states = []
+ do_print = True
+
+ class ClusterState:
+ def __init__(self, cluster, prev_state, must_close):
+ self.offset = None
+ self.current_address = 0
+ self.end = None
+ self.layout = list(reversed(cluster._layout_csrs))
+ self.cluster = cluster
+ self.must_close = must_close
+ # We use index 0 for the prefix as that's the only item we are going to print
+ self.prefix_len = len(f"{cluster.name}_{cluster._prefix_for(0)}")
+ if prev_state:
+ self.prefix_len += prev_state.prefix_len
+ self.prev_state = prev_state
+
+ def push_cluster(cluster, prev_state):
+ nonlocal svd
+ if do_print:
+ svd.append(' ')
+ svd.append(' {}[%s]'.format(cluster.name.upper()))
+ svd.append(' {}'.format(len(cluster.items)))
+ if cluster.description is not None:
+ svd.append(' '.format(cluster.description))
+ svd = svd.indent(" ")
+ return ClusterState(cluster, prev_state, do_print)
+
+ def pop_cluster(cluster, state):
+ nonlocal svd
+ if state.must_close:
+ svd = svd.unindent()
+ increment = int(state.current_address / len(state.cluster.items))
+ svd.append(' 0x{:04x}'.format(increment))
+ svd.append(' 0x{:04x}'.format(state.offset))
+ svd.append(' ')
+
+ for csr in csrs:
+ description = None
+ if hasattr(csr, "description"):
+ description = csr.description
+
+ # Find cluster differences
+ exits = collections.deque(current_cluster)
+ entries = collections.deque(csr.cluster or [])
+ while exits and entries and exits[0] == entries[0]:
+ exits.popleft()
+ entries.popleft()
+
+ while exits:
+ pop_cluster(exits.pop(), cluster_states.pop())
+
+ # We can do this before pushing a new cluster, a new cluster would
+ # push with a layout and would not set `do_print = False`.
+ do_print = True
+ for state in cluster_states:
+ if not state.layout:
+ do_print = False
+
+ while entries:
+ if cluster_states:
+ prev_state = cluster_states[-1]
+ else:
+ prev_state = None
+ state = push_cluster(entries.popleft(), prev_state)
+ if state.prev_state:
+ state.offset = state.prev_state.current_address
+ else:
+ state.offset = csr_address
+ cluster_states.append(state)
+
+ current_cluster = csr.cluster or []
+
+ # Grab the latest cluster state for easy access to the address later
+ cluster_state = None
+ if cluster_states:
+ cluster_state = cluster_states[-1]
+
+ if isinstance(csr, _CompoundCSR) and len(csr.simple_csrs) > 1:
+ is_first = True
+ for i in range(len(csr.simple_csrs)):
+ (start, length, name) = sub_csr_bit_range(
+ region.busword, csr, i)
+ if length > 0:
+ bits_str = "Bits {}-{} of `{}`.".format(
+ start, start+length, csr.name)
+ else:
+ bits_str = "Bit {} of `{}`.".format(
+ start, csr.name)
+ if do_print:
+ address = csr_address if cluster_state is None else cluster_state.current_address
+ if is_first:
+ if description is not None:
+ print_svd_register(
+ csr.simple_csrs[i], address, bits_str + " " + description, length,
+ cluster_state, svd)
+ else:
+ print_svd_register(
+ csr.simple_csrs[i], address, bits_str, length, cluster_state, svd)
+ is_first = False
+ else:
+ print_svd_register(
+ csr.simple_csrs[i], csr_address, bits_str, length, cluster_state, svd)
+ for state in cluster_states:
+ state.current_address += 4
+ csr_address = csr_address + 4
+ else:
+ length = ((csr.size + region.busword - 1) //
+ region.busword) * region.busword
+ if do_print:
+ address = csr_address if cluster_state is None else cluster_state.current_address
+ print_svd_register(
+ csr, address, description, length, cluster_state, svd)
+ for state in cluster_states:
+ state.current_address += 4
+ csr_address = csr_address + 4
+
+ for state in cluster_states:
+ if state.layout:
+ state.layout.pop()
+
+ while current_cluster:
+ pop_cluster(current_cluster.pop(), cluster_states.pop())
+
+ return csr_address
+
interrupts = {}
for csr, irq in sorted(soc.irq.locs.items()):
interrupts[csr] = irq
@@ -520,7 +666,6 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
svd.append(' ')
for region in documented_regions:
- csr_address = 0
svd.append(' ')
svd.append(' {}'.format(region.name.upper()))
svd.append(' 0x{:08X}'.format(region.origin))
@@ -529,39 +674,7 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
svd.append(' '.format(
reflow(region.sections[0].body())))
svd.append(' ')
- for csr in region.csrs:
- description = None
- if hasattr(csr, "description"):
- description = csr.description
- if isinstance(csr, _CompoundCSR) and len(csr.simple_csrs) > 1:
- is_first = True
- for i in range(len(csr.simple_csrs)):
- (start, length, name) = sub_csr_bit_range(
- region.busword, csr, i)
- if length > 0:
- bits_str = "Bits {}-{} of `{}`.".format(
- start, start+length, csr.name)
- else:
- bits_str = "Bit {} of `{}`.".format(
- start, csr.name)
- if is_first:
- if description is not None:
- print_svd_register(
- csr.simple_csrs[i], csr_address, bits_str + " " + description, length, svd)
- else:
- print_svd_register(
- csr.simple_csrs[i], csr_address, bits_str, length, svd)
- is_first = False
- else:
- print_svd_register(
- csr.simple_csrs[i], csr_address, bits_str, length, svd)
- csr_address = csr_address + 4
- else:
- length = ((csr.size + region.busword - 1) //
- region.busword) * region.busword
- print_svd_register(
- csr, csr_address, description, length, svd)
- csr_address = csr_address + 4
+ csr_address = print_svd_registers(region, region.csrs, IndentedAppender(svd))
svd.append(' ')
svd.append(' ')
svd.append(' 0')
diff --git a/litex/soc/interconnect/csr.py b/litex/soc/interconnect/csr.py
index 849cfd27e..1c78786b5 100644
--- a/litex/soc/interconnect/csr.py
+++ b/litex/soc/interconnect/csr.py
@@ -38,7 +38,7 @@ from enum import IntEnum
from migen import *
from migen.util.misc import xdir
-from migen.fhdl.tracer import get_obj_var_name
+from migen.fhdl.tracer import get_obj_var_name, remove_underscore
# CSRBase ------------------------------------------------------------------------------------------
@@ -49,6 +49,7 @@ class _CSRBase(DUID):
self.fixed = n is not None
self.size = size
self.name = get_obj_var_name(name)
+ self.cluster = None
if self.name is None:
raise ValueError("Cannot extract CSR name from code, need to specify.")
# CSRConstant --------------------------------------------------------------------------------------
@@ -448,6 +449,100 @@ class CSRStorage(_CompoundCSR):
if field.pulse:
yield getattr(self.fields, field.name).eq(0)
+
+# CSRCluster ---------------------------------------------------------------------------------------
+
+class CSRCluster:
+ """CSR Cluster.
+
+ A ``CSRCluster`` can be used to express repeated subgroups of a peripheral. Especially when
+ exporting to SVD this can be very useful for generating (nested) trees of peripheral registers
+ instead of a flat list of registers.
+
+ A cluster acts like an array: after creation, items can be added to it and accessed by index.
+
+ To maximize ergonomics, ``CSRClusters`` can be created in multiple ways:
+ - All items can be specified upfront by passing `items` to the constructor.
+ - Items can be added with the `append` method.
+ - Items can be assigned by indexing.
+
+ All elements in a ``CSRCluster`` must provide the same set of registers. If this is unwanted,
+ for example if one simply wants to use ``CSRCluster`` as a simple way to express a list of
+ modules, one can pass `always_flatten=True` to the constructor.
+
+ A cluster can be flattened in to lists of memories, CSRs, and constants. In this case, each
+ item will be prefixed by their index like this: ``2_``.
+ """
+ def __init__(self, items=None, always_flatten=False, name=None, description=None):
+ self.name = get_obj_var_name(name)
+ self.items = [] if items is None else items
+ self.description = description
+ self.always_flatten = always_flatten
+
+ def append(self, item):
+ self.items.append(item)
+
+ def __getitem__(self, idx):
+ return self.items[idx]
+
+ def __setitem__(self, idx, value):
+ while len(self.items) <= idx:
+ self.items.append(None)
+ self.items[idx] = value
+
+ def __iter__(self):
+ return self.items.__iter__()
+
+ def __len__(self):
+ return len(self.items)
+
+ def _check_layout(self, kind, idx, sub, extract):
+ old_layout = getattr(self, f"_layout_{kind}", None)
+ this_layout = [extract(csr) for csr in sub]
+ if old_layout:
+ if old_layout != this_layout:
+ raise ValueError(f"CSR cluster conflict, item {idx} doesn't match first item layout")
+ else:
+ setattr(self, f"_layout_{kind}", this_layout)
+
+ def _prefix_for(self, idx):
+ return f"{idx}_"
+
+ def get_memories(self):
+ ret = []
+ for (idx, item) in enumerate(self.items):
+ sub = item.get_memories()
+ memprefix(self._prefix_for(idx), sub, set())
+ ret += sub
+ return ret
+
+ def get_csrs(self):
+ ret = []
+ for (idx, item) in enumerate(self.items):
+ sub = item.get_csrs()
+ self._check_layout("csrs", idx, sub, lambda s: (s.name, s.size))
+ csrprefix(self._prefix_for(idx), sub, set())
+ ret += sub
+
+ if not self.always_flatten:
+ for csr in ret:
+ cur_cluster = getattr(csr, 'cluster', None)
+ if cur_cluster:
+ csr.cluster = [self, *cur_cluster]
+ else:
+ csr.cluster = [self]
+ return ret
+
+ def get_constants(self):
+ ret = []
+ layout = None
+ for (idx, item) in enumerate(self.items):
+ sub = item.get_constants()
+ csrprefix(self._prefix_for(idx), sub, set())
+ ret += sub
+ return ret
+
+
# AutoCSR & Helpers --------------------------------------------------------------------------------
def csrprefix(prefix, csrs, done):
@@ -541,6 +636,8 @@ def _make_gatherer(method, cls, prefix_cb):
r.append(v)
elif hasattr(v, method) and callable(getattr(v, method)):
items = getattr(v, method)()
+ if isinstance(v, CSRCluster):
+ k = v.name
prefix_cb(k + "_", items, prefixed)
r += items
r = sorted(r, key=lambda x: x.duid)