csr: add ability to express clusters

This adds the ability to express clusters of registers. This can be very
useful for peripherals with repeated groups of registers.

A `CSRCluster` class, emulating a simplified array, is added.
Peripherals can add items to the cluster, and the CSRs will, for JSON,
CSV, and HTML, be collected as normal.

For SVD, the code is extended to be able to handle outputting clusters
directly in the XML structure. This way, code generation tools like
`svd2rust` can generate nested array structures, making ergonomics
significantly better when working with peripherals.

The code supports nesting of `CSRClusters` too.
This commit is contained in:
Lasse Dalegaard 2023-03-23 10:42:14 +01:00
parent c3e93620ec
commit 302bb9120b
3 changed files with 258 additions and 42 deletions

View file

@ -45,6 +45,7 @@ class DocumentedCSR:
size = 8,
description = None,
access = "read-write",
cluster = None,
fields = []):
self.name = name
@ -53,6 +54,7 @@ class DocumentedCSR:
self.address = address
self.offset = offset
self.size = size
self.cluster = cluster
if size == 0:
print("!!! Warning: creating CSR of size 0 {}".format(name))
self.description = self.trim(description)
@ -337,6 +339,7 @@ class DocumentedCSRRegion:
atomic_write = csr.atomic_write
size = self.get_csr_size(csr)
reset = self.get_csr_reset(csr)
cluster = csr.cluster
# If the CSR is composed of multiple sub-CSRs, document each
# one individually.
@ -366,7 +369,8 @@ class DocumentedCSRRegion:
size = self.csr_data_width,
description = d,
fields = self.split_fields(fields, start, start + length),
access = access
access = access,
cluster = cluster
))
else:
self.csrs.append(DocumentedCSR(
@ -379,7 +383,8 @@ class DocumentedCSRRegion:
size = self.csr_data_width,
description = bits_str,
fields = self.split_fields(fields, start, start + length),
access = access
access = access,
cluster = cluster
))
self.current_address += 4
else:
@ -392,7 +397,8 @@ class DocumentedCSRRegion:
size = size,
description = description,
fields = fields,
access = access
access = access,
cluster = cluster
))
self.current_address += 4

View file

@ -23,6 +23,7 @@ import json
import time
import datetime
import inspect
import collections
from shutil import which
from sysconfig import get_platform
@ -435,6 +436,20 @@ def get_csr_csv(csr_regions={}, constants={}, mem_regions={}):
# SVD Export --------------------------------------------------------------------------------------
def get_csr_svd(soc, vendor="litex", name="soc", description=None):
class IndentedAppender:
def __init__(self, receiver, indent=""):
self._receiver = receiver
self._indent = indent
def append(self, line):
self._receiver.append(self._indent + line)
def indent(self, indent):
return IndentedAppender(self, indent)
def unindent(self):
return self._receiver
def sub_csr_bit_range(busword, csr, offset):
nwords = (csr.size + busword - 1)//busword
i = nwords - offset - 1
@ -443,16 +458,18 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
origin = i*busword
return (origin, nbits, name)
def print_svd_register(csr, csr_address, description, length, svd):
def print_svd_register(csr, csr_address, description, length, cluster_state, svd):
prefix_len = 0
if cluster_state is not None:
prefix_len = cluster_state.prefix_len
svd.append(' <register>')
svd.append(' <name>{}</name>'.format(csr.short_numbered_name))
svd.append(' <name>{}</name>'.format(csr.short_numbered_name[prefix_len:]))
if description is not None:
svd.append(' <description><![CDATA[{}]]></description>'.format(description))
svd.append(' <addressOffset>0x{:04x}</addressOffset>'.format(csr_address))
svd.append(' <resetValue>0x{:02x}</resetValue>'.format(csr.reset_value))
svd.append(' <size>{}</size>'.format(length))
# svd.append(' <access>{}</access>'.format(csr.access)) # 'access' is a lie: "read-only" registers can legitimately change state based on a write, and is in fact used to handle the "pending" field in events
csr_address = csr_address + 4
svd.append(' <fields>')
if hasattr(csr, "fields") and len(csr.fields) > 0:
for field in csr.fields:
@ -468,7 +485,7 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
svd.append(' </field>')
else:
field_size = csr.size
field_name = csr.short_name.lower()
field_name = csr.short_name.lower()[prefix_len:]
# Strip off "ev_" from eventmanager fields
if field_name == "ev_enable":
field_name = "enable"
@ -485,6 +502,135 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
svd.append(' </fields>')
svd.append(' </register>')
def print_svd_registers(region, csrs, svd):
csr_address = 0
current_cluster = []
cluster_states = []
do_print = True
class ClusterState:
def __init__(self, cluster, prev_state, must_close):
self.offset = None
self.current_address = 0
self.end = None
self.layout = list(reversed(cluster._layout_csrs))
self.cluster = cluster
self.must_close = must_close
# We use index 0 for the prefix as that's the only item we are going to print
self.prefix_len = len(f"{cluster.name}_{cluster._prefix_for(0)}")
if prev_state:
self.prefix_len += prev_state.prefix_len
self.prev_state = prev_state
def push_cluster(cluster, prev_state):
nonlocal svd
if do_print:
svd.append(' <cluster>')
svd.append(' <name>{}[%s]</name>'.format(cluster.name.upper()))
svd.append(' <dim>{}</dim>'.format(len(cluster.items)))
if cluster.description is not None:
svd.append(' <description><![CDATA[{}]]></description>'.format(cluster.description))
svd = svd.indent(" ")
return ClusterState(cluster, prev_state, do_print)
def pop_cluster(cluster, state):
nonlocal svd
if state.must_close:
svd = svd.unindent()
increment = int(state.current_address / len(state.cluster.items))
svd.append(' <dimIncrement>0x{:04x}</dimIncrement>'.format(increment))
svd.append(' <addressOffset>0x{:04x}</addressOffset>'.format(state.offset))
svd.append(' </cluster>')
for csr in csrs:
description = None
if hasattr(csr, "description"):
description = csr.description
# Find cluster differences
exits = collections.deque(current_cluster)
entries = collections.deque(csr.cluster or [])
while exits and entries and exits[0] == entries[0]:
exits.popleft()
entries.popleft()
while exits:
pop_cluster(exits.pop(), cluster_states.pop())
# We can do this before pushing a new cluster, a new cluster would
# push with a layout and would not set `do_print = False`.
do_print = True
for state in cluster_states:
if not state.layout:
do_print = False
while entries:
if cluster_states:
prev_state = cluster_states[-1]
else:
prev_state = None
state = push_cluster(entries.popleft(), prev_state)
if state.prev_state:
state.offset = state.prev_state.current_address
else:
state.offset = csr_address
cluster_states.append(state)
current_cluster = csr.cluster or []
# Grab the latest cluster state for easy access to the address later
cluster_state = None
if cluster_states:
cluster_state = cluster_states[-1]
if isinstance(csr, _CompoundCSR) and len(csr.simple_csrs) > 1:
is_first = True
for i in range(len(csr.simple_csrs)):
(start, length, name) = sub_csr_bit_range(
region.busword, csr, i)
if length > 0:
bits_str = "Bits {}-{} of `{}`.".format(
start, start+length, csr.name)
else:
bits_str = "Bit {} of `{}`.".format(
start, csr.name)
if do_print:
address = csr_address if cluster_state is None else cluster_state.current_address
if is_first:
if description is not None:
print_svd_register(
csr.simple_csrs[i], address, bits_str + " " + description, length,
cluster_state, svd)
else:
print_svd_register(
csr.simple_csrs[i], address, bits_str, length, cluster_state, svd)
is_first = False
else:
print_svd_register(
csr.simple_csrs[i], csr_address, bits_str, length, cluster_state, svd)
for state in cluster_states:
state.current_address += 4
csr_address = csr_address + 4
else:
length = ((csr.size + region.busword - 1) //
region.busword) * region.busword
if do_print:
address = csr_address if cluster_state is None else cluster_state.current_address
print_svd_register(
csr, address, description, length, cluster_state, svd)
for state in cluster_states:
state.current_address += 4
csr_address = csr_address + 4
for state in cluster_states:
if state.layout:
state.layout.pop()
while current_cluster:
pop_cluster(current_cluster.pop(), cluster_states.pop())
return csr_address
interrupts = {}
for csr, irq in sorted(soc.irq.locs.items()):
interrupts[csr] = irq
@ -520,7 +666,6 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
svd.append(' <peripherals>')
for region in documented_regions:
csr_address = 0
svd.append(' <peripheral>')
svd.append(' <name>{}</name>'.format(region.name.upper()))
svd.append(' <baseAddress>0x{:08X}</baseAddress>'.format(region.origin))
@ -529,39 +674,7 @@ def get_csr_svd(soc, vendor="litex", name="soc", description=None):
svd.append(' <description><![CDATA[{}]]></description>'.format(
reflow(region.sections[0].body())))
svd.append(' <registers>')
for csr in region.csrs:
description = None
if hasattr(csr, "description"):
description = csr.description
if isinstance(csr, _CompoundCSR) and len(csr.simple_csrs) > 1:
is_first = True
for i in range(len(csr.simple_csrs)):
(start, length, name) = sub_csr_bit_range(
region.busword, csr, i)
if length > 0:
bits_str = "Bits {}-{} of `{}`.".format(
start, start+length, csr.name)
else:
bits_str = "Bit {} of `{}`.".format(
start, csr.name)
if is_first:
if description is not None:
print_svd_register(
csr.simple_csrs[i], csr_address, bits_str + " " + description, length, svd)
else:
print_svd_register(
csr.simple_csrs[i], csr_address, bits_str, length, svd)
is_first = False
else:
print_svd_register(
csr.simple_csrs[i], csr_address, bits_str, length, svd)
csr_address = csr_address + 4
else:
length = ((csr.size + region.busword - 1) //
region.busword) * region.busword
print_svd_register(
csr, csr_address, description, length, svd)
csr_address = csr_address + 4
csr_address = print_svd_registers(region, region.csrs, IndentedAppender(svd))
svd.append(' </registers>')
svd.append(' <addressBlock>')
svd.append(' <offset>0</offset>')

View file

@ -38,7 +38,7 @@ from enum import IntEnum
from migen import *
from migen.util.misc import xdir
from migen.fhdl.tracer import get_obj_var_name
from migen.fhdl.tracer import get_obj_var_name, remove_underscore
# CSRBase ------------------------------------------------------------------------------------------
@ -49,6 +49,7 @@ class _CSRBase(DUID):
self.fixed = n is not None
self.size = size
self.name = get_obj_var_name(name)
self.cluster = None
if self.name is None:
raise ValueError("Cannot extract CSR name from code, need to specify.")
# CSRConstant --------------------------------------------------------------------------------------
@ -448,6 +449,100 @@ class CSRStorage(_CompoundCSR):
if field.pulse:
yield getattr(self.fields, field.name).eq(0)
# CSRCluster ---------------------------------------------------------------------------------------
class CSRCluster:
"""CSR Cluster.
A ``CSRCluster`` can be used to express repeated subgroups of a peripheral. Especially when
exporting to SVD this can be very useful for generating (nested) trees of peripheral registers
instead of a flat list of registers.
A cluster acts like an array: after creation, items can be added to it and accessed by index.
To maximize ergonomics, ``CSRClusters`` can be created in multiple ways:
- All items can be specified upfront by passing `items` to the constructor.
- Items can be added with the `append` method.
- Items can be assigned by indexing.
All elements in a ``CSRCluster`` must provide the same set of registers. If this is unwanted,
for example if one simply wants to use ``CSRCluster`` as a simple way to express a list of
modules, one can pass `always_flatten=True` to the constructor.
A cluster can be flattened in to lists of memories, CSRs, and constants. In this case, each
item will be prefixed by their index like this: ``2_``.
"""
def __init__(self, items=None, always_flatten=False, name=None, description=None):
self.name = get_obj_var_name(name)
self.items = [] if items is None else items
self.description = description
self.always_flatten = always_flatten
def append(self, item):
self.items.append(item)
def __getitem__(self, idx):
return self.items[idx]
def __setitem__(self, idx, value):
while len(self.items) <= idx:
self.items.append(None)
self.items[idx] = value
def __iter__(self):
return self.items.__iter__()
def __len__(self):
return len(self.items)
def _check_layout(self, kind, idx, sub, extract):
old_layout = getattr(self, f"_layout_{kind}", None)
this_layout = [extract(csr) for csr in sub]
if old_layout:
if old_layout != this_layout:
raise ValueError(f"CSR cluster conflict, item {idx} doesn't match first item layout")
else:
setattr(self, f"_layout_{kind}", this_layout)
def _prefix_for(self, idx):
return f"{idx}_"
def get_memories(self):
ret = []
for (idx, item) in enumerate(self.items):
sub = item.get_memories()
memprefix(self._prefix_for(idx), sub, set())
ret += sub
return ret
def get_csrs(self):
ret = []
for (idx, item) in enumerate(self.items):
sub = item.get_csrs()
self._check_layout("csrs", idx, sub, lambda s: (s.name, s.size))
csrprefix(self._prefix_for(idx), sub, set())
ret += sub
if not self.always_flatten:
for csr in ret:
cur_cluster = getattr(csr, 'cluster', None)
if cur_cluster:
csr.cluster = [self, *cur_cluster]
else:
csr.cluster = [self]
return ret
def get_constants(self):
ret = []
layout = None
for (idx, item) in enumerate(self.items):
sub = item.get_constants()
csrprefix(self._prefix_for(idx), sub, set())
ret += sub
return ret
# AutoCSR & Helpers --------------------------------------------------------------------------------
def csrprefix(prefix, csrs, done):
@ -541,6 +636,8 @@ def _make_gatherer(method, cls, prefix_cb):
r.append(v)
elif hasattr(v, method) and callable(getattr(v, method)):
items = getattr(v, method)()
if isinstance(v, CSRCluster):
k = v.name
prefix_cb(k + "_", items, prefixed)
r += items
r = sorted(r, key=lambda x: x.duid)