gateware: complete refactoring (only keep essential features, now less than 200 LOCs :)

use new LiteX features and only keep one trigger, subsampler, cdc, converter and storage modules.

software still needs to be cleaned up.
This commit is contained in:
Florent Kermarrec 2016-03-31 11:37:00 +02:00
parent e211d17ca6
commit b1b9e61ecf
24 changed files with 312 additions and 787 deletions

17
README
View File

@ -34,15 +34,12 @@ design flow by generating the verilog rtl that you will use as a standard core.
[> Features
------------
- IO peek and poke with LiteScopeInOut
- Logic analyser with LiteScopeLogicAnalyzer:
- Various triggering modules: Term, Range, Edge (add yours! :)
- Run Length Encoder to "compress" data and increase recording depth
- IO peek and poke with LiteScopeIO
- Logic analyser with LiteScopeAnalyzer:
- Subsampling
- Storage qualifier
- Data storage in block rams
- Bridges:
- UART2Wishbone (provided by LiteScope)
- UART2Wishbone (provided by LiteX)
- Ethernet2Wishbone ("Etherbone") (when used with LiteEth)
- PCIe2Wishbone (when used with LitePCIe)
- Exports formats: .vcd, .sr(sigrok), .csv, .py, etc...
@ -83,11 +80,9 @@ devel [AT] lists.m-labs.hk.
4. Test design:
go to test and run:
./make.py --port your_serial_port test_inout (will blink leds)
./make.py --port your_serial_port test_logic_analyzer (will capture counter)
tests can also be executed over Etherbone (provided with LiteEth):
./make.py --ip_address fpga_ip_address your_test
litex_server --port your_serial_port
python3 test_io.py (led blinker)
python3 test_analyzer.py (capture counter with analyzer)
[> Simulations
---------------

View File

@ -15,10 +15,6 @@ from litex.build.xilinx.common import *
from litex.soc.integration import cpu_interface
litescope_path = "../"
sys.path.append(litescope_path) # XXX
from litescope.common import *
def autotype(s):
if s == "True":
@ -124,27 +120,23 @@ if __name__ == "__main__":
logic analyzer core powered by Migen
====== Building parameters: ======""")
if hasattr(soc, "inout"):
if hasattr(soc, "io"):
print("""
LiscopeIO
---------
Width: {}
""".format(soc.inout.dw)
""".format(soc.io.dw)
)
if hasattr(soc, "logic_analyzer"):
if hasattr(soc, "analyzer"):
print("""
LiscopeLA
LiscopeAnalyzer
---------
Width: {}
Depth: {}
Subsampler: {}
RLE: {}
===============================""".format(
soc.logic_analyzer.dw,
soc.logic_analyzer.depth,
str(soc.logic_analyzer.with_subsampler),
str(soc.logic_analyzer.with_rle)
soc.analyzer.dw,
soc.analyzer.depth
)
)

View File

@ -9,9 +9,7 @@ from litex.build.xilinx.platform import XilinxPlatform
from litex.soc.integration.soc_core import SoCCore
from litex.soc.cores.uart.bridge import UARTWishboneBridge
from litescope.core.port import LiteScopeTerm
from litescope.frontend.inout import LiteScopeInOut
from litescope.frontend.logic_analyzer import LiteScopeLogicAnalyzer
from litescope import LiteScopeAnalyzer
_io = [
@ -37,7 +35,7 @@ class CorePlatform(XilinxPlatform):
class Core(SoCCore):
platform = CorePlatform()
csr_map = {
"logic_analyzer": 16
"analyzer": 16
}
csr_map.update(SoCCore.csr_map)
@ -58,7 +56,6 @@ class Core(SoCCore):
self.add_wb_master(self.cpu_or_bridge.wishbone)
self.bus = platform.request("bus")
self.submodules.logic_analyzer = LiteScopeLogicAnalyzer((self.bus), 512, with_rle=True, with_subsampler=True)
self.logic_analyzer.trigger.add_port(LiteScopeTerm(self.logic_analyzer.dw))
self.submodules.analyzer = LiteScopeAnalyzer((self.bus), 512)
default_subtarget = Core

View File

@ -1,17 +1,16 @@
from litex.gen import *
from litex.gen.genlib.io import CRG
from litescope.common import *
from litescope.core.port import LiteScopeTerm
from litescope.frontend.inout import LiteScopeInOut
from litescope.frontend.logic_analyzer import LiteScopeLogicAnalyzer
from litex.soc.integration.soc_core import SoCCore
from litex.soc.cores.uart.bridge import UARTWishboneBridge
from litescope import LiteScopeIO, LiteScopeAnalyzer
class LiteScopeSoC(SoCCore):
csr_map = {
"inout" : 16,
"logic_analyzer" : 17
"io": 16,
"analyzer": 17
}
csr_map.update(SoCCore.csr_map)
@ -28,21 +27,20 @@ class LiteScopeSoC(SoCCore):
self.add_wb_master(self.cpu_or_bridge.wishbone)
self.submodules.crg = CRG(platform.request(platform.default_clk_name))
self.submodules.inout = LiteScopeInOut(8)
self.submodules.io = LiteScopeIO(8)
for i in range(8):
try:
self.comb += platform.request("user_led", i).eq(self.inout.o[i])
self.comb += platform.request("user_led", i).eq(self.io.output[i])
except:
pass
counter = Signal(16)
self.sync += counter.eq(counter + 1)
toto = Signal()
self.debug = (counter)
self.submodules.logic_analyzer = LiteScopeLogicAnalyzer(self.debug, 512, with_rle=True, with_subsampler=True)
self.logic_analyzer.trigger.add_port(LiteScopeTerm(self.logic_analyzer.dw))
self.submodules.analyzer = LiteScopeAnalyzer(counter, 512)
def do_exit(self, vns):
self.logic_analyzer.export(vns, "test/logic_analyzer.csv")
self.analyzer.export_csv(vns, "test/analyzer.csv")
default_subtarget = LiteScopeSoC

View File

@ -0,0 +1,20 @@
from litex.soc.tools.remote import RemoteClient
from litescope.software.driver.analyzer import LiteScopeAnalyzerDriver
wb = RemoteClient()
wb.open()
# # #
analyzer = LiteScopeAnalyzerDriver(wb.regs, "analyzer", debug=True)
analyzer.configure_trigger(cond={"counter1": 0})
analyzer.configure_subsampler(1)
analyzer.run(offset=128, length=512)
while not analyzer.done():
pass
analyzer.upload()
analyzer.save("dump.vcd")
# # #
wb.close()

View File

@ -1,28 +1,27 @@
import time
from litex.soc.tools.remote import RemoteClient
from litescope.software.driver.inout import LiteScopeInOutDriver
from litescope.software.driver.io import LiteScopeIODriver
def led_anim0(inout):
for i in range(10):
inout.write(0xA5)
io.write(0xa5)
time.sleep(0.1)
inout.write(0x5A)
io.write(0x5a)
time.sleep(0.1)
def led_anim1(inout):
for j in range(4):
# Led <<
led_data = 1
for i in range(8):
inout.write(led_data)
io.write(led_data)
time.sleep(i*i*0.0020)
led_data = (led_data << 1)
# Led >>
ledData = 128
for i in range(8):
inout.write(led_data)
io.write(led_data)
time.sleep(i*i*0.0020)
led_data = (led_data >> 1)
@ -31,11 +30,11 @@ wb.open()
# # #
inout = LiteScopeInOutDriver(wb.regs, "inout")
io = LiteScopeIODriver(wb.regs, "io")
led_anim0(inout)
led_anim1(inout)
print("{:02X}".format(inout.read()))
led_anim0(io)
led_anim1(io)
print("{:02x}".format(io.read()))
# # #

View File

@ -1,30 +0,0 @@
from litex.soc.tools.remote import RemoteClient
from litescope.software.driver.logic_analyzer import LiteScopeLogicAnalyzerDriver
wb = RemoteClient()
wb.open()
# # #
logic_analyzer = LiteScopeLogicAnalyzerDriver(wb.regs, "logic_analyzer", debug=True)
cond = {} # immediate trigger
logic_analyzer.configure_term(port=0, cond=cond)
logic_analyzer.configure_sum("term")
logic_analyzer.configure_subsampler(1)
# logic_analyzer.configure_qualifier(1)
logic_analyzer.configure_rle(1)
logic_analyzer.run(offset=128, length=256)
while not logic_analyzer.done():
pass
logic_analyzer.upload()
logic_analyzer.save("dump.vcd")
logic_analyzer.save("dump.csv")
logic_analyzer.save("dump.py")
logic_analyzer.save("dump.sr")
# # #
wb.close()

View File

@ -0,0 +1 @@
from litescope.core import LiteScopeIO, LiteScopeAnalyzer

View File

@ -1,13 +0,0 @@
from litex.gen import *
from litex.soc.interconnect.csr import *
from litex.soc.interconnect import stream
from litex.soc.interconnect.stream import *
def data_layout(dw):
return [("data", dw)]
def hit_layout():
return [("hit", 1)]

194
litescope/core.py Normal file
View File

@ -0,0 +1,194 @@
from litex.gen import *
from litex.gen.genlib.cdc import MultiReg
from litex.build.tools import write_to_file
from litex.soc.interconnect.csr import *
from litex.soc.cores.gpio import GPIOInOut
from litex.soc.interconnect import stream
def ceil_pow2(v):
return 2**(bits_for(v-1))
def core_layout(dw):
return [("data", dw), ("hit", 1)]
class FrontendTrigger(Module, AutoCSR):
def __init__(self, dw, cd):
self.sink = stream.Endpoint(core_layout(dw))
self.source = stream.Endpoint(core_layout(dw))
dw = len(self.sink.payload.raw_bits())
self.value = CSRStorage(dw)
self.mask = CSRStorage(dw)
# # #
value = Signal(dw)
mask = Signal(dw)
self.specials += [
MultiReg(self.value.storage, value, cd),
MultiReg(self.mask.storage, mask, cd)
]
self.comb += [
self.sink.connect(self.source),
self.source.hit.eq((self.sink.data & mask) == value)
]
class FrontendSubSampler(Module, AutoCSR):
def __init__(self, dw, cd):
self.sink = stream.Endpoint(core_layout(dw))
self.source = stream.Endpoint(core_layout(dw))
self.value = CSRStorage(16)
# # #
sync_cd = getattr(self.sync, cd)
value = Signal(16)
self.specials += MultiReg(self.value.storage, value, cd)
counter = Signal(16)
done = Signal()
sync_cd += \
If(self.source.ready,
If(done,
counter.eq(0)
).Elif(self.sink.valid,
counter.eq(counter + 1)
)
)
self.comb += [
done.eq(counter == value),
self.sink.connect(self.source, leave_out=set(["valid"])),
self.source.valid.eq(self.sink.valid & done)
]
class AnalyzerFrontend(Module, AutoCSR):
def __init__(self, dw, cd, cd_ratio):
self.sink = stream.Endpoint(core_layout(dw))
self.source = stream.Endpoint(core_layout(dw*cd_ratio))
# # #
self.submodules.buffer = ClockDomainsRenamer(cd)(stream.Buffer(core_layout(dw)))
self.submodules.trigger = FrontendTrigger(dw, cd)
self.submodules.subsampler = FrontendSubSampler(dw, cd)
self.submodules.converter = ClockDomainsRenamer(cd)(
stream.StrideConverter(
core_layout(dw),
core_layout(dw*cd_ratio)))
self.submodules.fifo = ClockDomainsRenamer({"write": cd, "read": "sys"})(
stream.AsyncFIFO(core_layout(dw*cd_ratio), 8))
self.submodules.pipeline = stream.Pipeline(self.sink,
self.buffer,
self.trigger,
self.subsampler,
self.converter,
self.fifo,
self.source)
class AnalyzerStorage(Module, AutoCSR):
def __init__(self, dw, depth):
self.sink = stream.Endpoint(core_layout(dw))
self.start = CSR()
self.length = CSRStorage(bits_for(depth))
self.offset = CSRStorage(bits_for(depth))
self.idle = CSRStatus()
self.wait = CSRStatus()
self.run = CSRStatus()
self.mem_valid = CSRStatus()
self.mem_ready = CSR()
self.mem_data = CSRStatus(dw)
# # #
mem = stream.SyncFIFO([("data", dw)], depth, buffered=True)
self.submodules += mem
fsm = FSM(reset_state="IDLE")
self.submodules += fsm
fsm.act("IDLE",
self.idle.status.eq(1),
If(self.start.re,
NextState("WAIT")
),
self.sink.ready.eq(1),
mem.source.ready.eq(self.mem_ready.re & self.mem_ready.r)
)
fsm.act("WAIT",
self.wait.status.eq(1),
self.sink.connect(mem.sink, leave_out=set(["hit"])),
If(self.sink.valid & self.sink.hit,
NextState("RUN")
),
mem.source.ready.eq(mem.level == self.offset.storage)
)
fsm.act("RUN",
self.run.status.eq(1),
self.sink.connect(mem.sink, leave_out=set(["hit"])),
If(~mem.sink.ready | (mem.level == self.length.storage),
NextState("IDLE")
)
)
self.comb += [
self.mem_valid.status.eq(mem.source.valid),
self.mem_data.status.eq(mem.source.data)
]
class LiteScopeIO(Module, AutoCSR):
def __init__(self, dw):
self.dw = dw
self.input = Signal(dw)
self.output = Signal(dw)
# # #
self.submodules.gpio = GPIOInOut(self.input, self.output)
def get_csrs(self):
return self.gpio.get_csrs()
class LiteScopeAnalyzer(Module, AutoCSR):
def __init__(self, signals, depth, cd="sys", cd_ratio=1):
self.signals = [signals] if not isinstance(signals, list) else signals
self.dw = ceil_pow2(sum([len(s) for s in signals]))
self.core_dw = self.dw*cd_ratio
self.depth = depth
self.cd_ratio = cd_ratio
# # #
self.submodules.frontend = AnalyzerFrontend(self.dw, cd, cd_ratio)
self.submodules.storage = AnalyzerStorage(self.core_dw, depth//cd_ratio)
self.comb += [
self.frontend.sink.valid.eq(1),
self.frontend.sink.data.eq(Cat(self.signals)),
self.frontend.source.connect(self.storage.sink)
]
def export_csv(self, vns, filename):
def format_line(*args):
return ",".join(args) + "\n"
r = format_line("config", "dw", str(self.dw))
r += format_line("config", "depth", str(self.depth))
r += format_line("config", "cd_ratio", str(int(self.cd_ratio)))
for s in self.signals:
r += format_line("signal", vns.get_name(s), str(len(s)))
write_to_file(filename, r)

View File

@ -1,112 +0,0 @@
from litescope.common import *
class LiteScopeTermUnit(Module):
def __init__(self, dw):
self.dw = dw
self.sink = sink = stream.Endpoint(data_layout(dw))
self.source = source = stream.Endpoint(hit_layout())
self.trig = Signal(dw)
self.mask = Signal(dw)
# # #
self.comb += [
source.valid.eq(sink.valid),
source.hit.eq((sink.data & self.mask) == self.trig),
sink.ready.eq(source.ready)
]
class LiteScopeTerm(LiteScopeTermUnit, AutoCSR):
def __init__(self, dw):
LiteScopeTermUnit.__init__(self, dw)
self._trig = CSRStorage(dw)
self._mask = CSRStorage(dw)
# # #
self.comb += [
self.trig.eq(self._trig.storage),
self.mask.eq(self._mask.storage)
]
class LiteScopeRangeDetectorUnit(Module):
def __init__(self, dw):
self.dw = dw
self.sink = sink = stream.Endpoint(data_layout(dw))
self.source = source = stream.Endpoint(hit_layout())
self.low = Signal(dw)
self.high = Signal(dw)
# # #
self.comb += [
source.valid.eq(sink.valid),
source.hit.eq((sink.data >= self.low) & (sink.data <= self.high)),
sink.ready.eq(source.ready)
]
class LiteScopeRangeDetector(LiteScopeRangeDetectorUnit, AutoCSR):
def __init__(self, dw):
LiteScopeRangeDetectorUnit.__init__(self, dw)
self._low = CSRStorage(dw)
self._high = CSRStorage(dw)
# # #
self.comb += [
self.low.eq(self._low.storage),
self.high.eq(self._high.storage)
]
class LiteScopeEdgeDetectorUnit(Module):
def __init__(self, dw):
self.dw = dw
self.sink = sink = stream.Endpoint(data_layout(dw))
self.source = source = stream.Endpoint(hit_layout())
self.rising_mask = Signal(dw)
self.falling_mask = Signal(dw)
self.both_mask = Signal(dw)
# # #
self.submodules.buffer = Buffer(self.sink.description)
self.comb += self.sink.connect(self.buffer.d)
rising = Signal(dw)
self.comb += rising.eq(self.rising_mask & sink.data & ~self.buffer.q.data)
falling = Signal(dw)
self.comb += falling.eq(self.falling_mask & ~sink.data & self.buffer.q.data)
both = Signal(dw)
self.comb += both.eq(self.both_mask & (rising | falling))
self.comb += [
source.valid.eq(sink.valid & self.buffer.q.valid),
self.buffer.q.ready.eq(source.ready),
source.hit.eq((rising | falling | both) != 0)
]
class LiteScopeEdgeDetector(LiteScopeEdgeDetectorUnit, AutoCSR):
def __init__(self, dw):
LiteScopeEdgeDetectorUnit.__init__(self, dw)
self._rising_mask = CSRStorage(dw)
self._falling_mask = CSRStorage(dw)
self._both_mask = CSRStorage(dw)
# # #
self.comb += [
self.rising_mask.eq(self._rising_mask.storage),
self.falling_mask.eq(self._falling_mask.storage),
self.both_mask.eq(self._both_mask.storage)
]

View File

@ -1,204 +0,0 @@
from litescope.common import *
class LiteScopeSubSamplerUnit(Module):
def __init__(self, dw):
self.sink = sink = stream.Endpoint(data_layout(dw))
self.source = source = stream.Endpoint(data_layout(dw))
self.value = Signal(32)
# # #
counter = Signal(32)
counter_reset = Signal()
counter_ce = Signal()
self.sync += \
If(counter_reset,
counter.eq(0)
).Elif(counter_ce,
counter.eq(counter + 1)
)
done = Signal()
self.comb += [
done.eq(counter >= self.value),
sink.connect(source),
source.valid.eq(sink.valid & done),
counter_ce.eq(source.ready),
counter_reset.eq(source.valid & source.ready & done)
]
class LiteScopeSubSampler(LiteScopeSubSamplerUnit, AutoCSR):
def __init__(self, dw):
LiteScopeSubSamplerUnit.__init__(self, dw)
self._value = CSRStorage(32)
# # #
self.comb += self.value.eq(self._value.storage)
class LiteScopeRunLengthEncoderUnit(Module):
def __init__(self, dw, length):
self.dw = dw
self.length = length
self.sink = sink = stream.Endpoint(data_layout(dw))
self.source = source = stream.Endpoint(data_layout(dw))
self.enable = Signal()
# # #
self.submodules.buf = buf = Buffer(sink.description)
self.comb += sink.connect(buf.sink)
counter = Signal(max=length)
counter_reset = Signal()
counter_ce = Signal()
counter_done = Signal()
self.sync += \
If(counter_reset,
counter.eq(0)
).Elif(counter_ce,
counter.eq(counter + 1)
)
self.comb += counter_done.eq(counter == length - 1)
change = Signal()
self.comb += change.eq(
sink.valid &
(sink.data != buf.source.data)
)
self.submodules.fsm = fsm = FSM(reset_state="BYPASS")
fsm.act("BYPASS",
buf.source.connect(source),
counter_reset.eq(1),
If(sink.valid & ~change,
If(self.enable,
NextState("COUNT")
)
)
)
fsm.act("COUNT",
buf.source.ready.eq(1),
counter_ce.eq(sink.valid),
If(~self.enable,
NextState("BYPASS")
).Elif(change | counter_done,
source.valid.eq(1),
source.data[:len(counter)].eq(counter),
source.data[-1].eq(1), # Set RLE bit
buf.source.ready.eq(source.ready),
If(source.ready,
NextState("BYPASS")
)
)
)
class LiteScopeRunLengthEncoder(LiteScopeRunLengthEncoderUnit, AutoCSR):
def __init__(self, dw, length=1024):
LiteScopeRunLengthEncoderUnit.__init__(self, dw, length)
self._enable = CSRStorage()
self.external_enable = Signal(reset=1)
# # #
self.comb += self.enable.eq(self._enable.storage & self.external_enable)
class LiteScopeRecorderUnit(Module):
def __init__(self, dw, depth):
self.dw = dw
self.depth = depth
self.trigger_sink = trigger_sink = stream.Endpoint(hit_layout())
self.data_sink = data_sink = stream.Endpoint(data_layout(dw))
self.trigger = Signal()
self.qualifier = Signal()
self.length = Signal(bits_for(depth))
self.offset = Signal(bits_for(depth))
self.done = Signal()
self.post_hit = Signal()
self.source = stream.Endpoint(data_layout(dw))
# # #
fifo = ResetInserter()(SyncFIFO(data_layout(dw), depth, buffered=True))
self.submodules += fifo
fsm = FSM(reset_state="IDLE")
self.submodules += fsm
self.comb += [
self.source.valid.eq(fifo.source.valid),
self.source.data.eq(fifo.source.data)
]
fsm.act("IDLE",
self.done.eq(1),
If(self.trigger,
NextState("PRE_HIT_RECORDING"),
fifo.reset.eq(1),
),
fifo.source.ready.eq(self.source.ready)
)
fsm.act("PRE_HIT_RECORDING",
fifo.sink.valid.eq(data_sink.valid),
fifo.sink.data.eq(data_sink.data),
data_sink.ready.eq(fifo.sink.ready),
fifo.source.ready.eq(fifo.level >= self.offset),
If(trigger_sink.valid & trigger_sink.hit,
NextState("POST_HIT_RECORDING")
)
)
fsm.act("POST_HIT_RECORDING",
self.post_hit.eq(1),
If(self.qualifier,
fifo.sink.valid.eq(trigger_sink.valid &
trigger_sink.hit &
data_sink.valid)
).Else(
fifo.sink.valid.eq(data_sink.valid)
),
fifo.sink.data.eq(data_sink.data),
data_sink.ready.eq(fifo.sink.ready),
If(~fifo.sink.ready | (fifo.level >= self.length),
NextState("IDLE")
)
)
class LiteScopeRecorder(LiteScopeRecorderUnit, AutoCSR):
def __init__(self, dw, depth):
LiteScopeRecorderUnit.__init__(self, dw, depth)
self._trigger = CSR()
self._qualifier = CSRStorage()
self._length = CSRStorage(bits_for(depth))
self._offset = CSRStorage(bits_for(depth))
self._done = CSRStatus()
self._source_valid = CSRStatus()
self._source_ready = CSR()
self._source_data = CSRStatus(dw)
# # #
self.comb += [
self.trigger.eq(self._trigger.re),
self.qualifier.eq(self._qualifier.storage),
self.length.eq(self._length.storage),
self.offset.eq(self._offset.storage),
self._done.status.eq(self.done),
self._source_valid.status.eq(self.source.valid),
self._source_data.status.eq(self.source.data),
self.source.ready.eq(self._source_ready.re)
]

View File

@ -1,80 +0,0 @@
from functools import reduce
from operator import and_
from litescope.common import *
class LiteScopeSumUnit(Module, AutoCSR):
def __init__(self, ports):
self.sinks = sinks = [stream.Endpoint(hit_layout()) for i in range(ports)]
self.source = source = stream.Endpoint(hit_layout())
self.prog_we = Signal()
self.prog_adr = Signal(ports)
self.prog_dat = Signal()
mem = Memory(1, 2**ports)
lut = mem.get_port()
prog = mem.get_port(write_capable=True)
self.specials += mem, lut, prog
# # #
# program port
self.comb += [
prog.we.eq(self.prog_we),
prog.adr.eq(self.prog_adr),
prog.dat_w.eq(self.prog_dat)
]
# LUT port
for i, sink in enumerate(sinks):
self.comb += lut.adr[i].eq(sink.hit)
# drive source
self.comb += [
source.valid.eq(reduce(and_, [sink.valid for sink in sinks])),
source.hit.eq(lut.dat_r)
]
for i, sink in enumerate(sinks):
self.comb += sink.ready.eq(sink.valid & source.ready)
class LiteScopeSum(LiteScopeSumUnit, AutoCSR):
def __init__(self, ports):
LiteScopeSumUnit.__init__(self, ports)
self._prog_we = CSR()
self._prog_adr = CSRStorage(ports)
self._prog_dat = CSRStorage()
# # #
self.comb += [
self.prog_we.eq(self._prog_we.re & self._prog_we.r),
self.prog_adr.eq(self._prog_adr.storage),
self.prog_dat.eq(self._prog_dat.storage)
]
class LiteScopeTrigger(Module, AutoCSR):
def __init__(self, dw):
self.dw = dw
self.ports = []
self.sink = stream.Endpoint(data_layout(dw))
self.source = stream.Endpoint(hit_layout())
def add_port(self, port):
setattr(self.submodules, "port"+str(len(self.ports)), port)
self.ports.append(port)
def do_finalize(self):
self.submodules.sum = LiteScopeSum(len(self.ports))
for i, port in enumerate(self.ports):
# Note: port's ready is not used and supposed to be always 1
self.comb += [
port.sink.valid.eq(self.sink.valid),
port.sink.data.eq(self.sink.data),
self.sink.ready.eq(1),
port.source.connect(self.sum.sinks[i])
]
self.comb += self.sum.source.connect(self.source)

View File

@ -1,11 +0,0 @@
from litescope.common import *
class LiteScopeInOut(Module, AutoCSR):
def __init__(self, dw):
self.dw = dw
self._input = CSRStatus(dw)
self._output = CSRStorage(dw)
self.i = self._input.status
self.o = self._output.storage

View File

@ -1,102 +0,0 @@
from litescope.common import *
from litescope.core.trigger import LiteScopeTrigger
from litescope.core.storage import LiteScopeSubSampler, LiteScopeRecorder, LiteScopeRunLengthEncoder
from litex.build.tools import write_to_file
class LiteScopeLogicAnalyzer(Module, AutoCSR):
def __init__(self, layout, depth,
with_input_buffer=False,
with_rle=False, rle_length=256,
with_subsampler=False,
clk_domain="sys", clk_ratio=1):
self.layout = layout
self.data = Cat(*layout)
self.dw = len(self.data)
if with_rle:
self.dw = max(self.dw, log2_int(rle_length))
self.dw += 1
self.depth = depth
self.with_rle = with_rle
self.rle_length = rle_length
self.with_input_buffer = with_input_buffer
self.with_subsampler = with_subsampler
self.clk_domain = clk_domain
self.clk_ratio = clk_ratio
self.sink = stream.Endpoint(data_layout(self.dw))
self.comb += [
self.sink.valid.eq(1),
self.sink.data.eq(self.data)
]
self.submodules.trigger = trigger = LiteScopeTrigger(self.dw*self.clk_ratio)
self.submodules.recorder = recorder = LiteScopeRecorder(self.dw*self.clk_ratio, self.depth)
def do_finalize(self):
sink = self.sink
# insert Buffer on sink (optional, can be used to improve timings)
if self.with_input_buffer:
input_buffer = Buffer(self.sink.description)
if self.clk_domain is not "sys":
self.submodules += ClockDomainsRenamer(self.clk_domain)(input_buffer)
else:
self.submodules += input_buffer
self.comb += sink.connect(intput_buffer.sink)
sink = input_buffer.source
# clock domain crossing (optional, required when capture_clk is not sys_clk)
if self.clk_domain is not "sys":
converter = StrideConverter(data_layout(self.dw),
data_layout(self.dw*self.clk_ratio))
self.submodules += ClockDomainsRenamer(self.clk_domain)(converter)
fifo = AsyncFIFO(converter.source.description, 32)
self.submodules += ClockDomainsRenamer({"write": self.clk_domain, "read": "sys"})(fifo)
self.comb += [
sink.connect(converter.sink),
converter.source.connect(fifo.sink)
]
sink = fifo.source
# connect trigger
self.comb += [
self.trigger.sink.valid.eq(sink.valid),
self.trigger.sink.data.eq(sink.data),
]
# insert subsampler (optional)
if self.with_subsampler:
self.submodules.subsampler = LiteScopeSubSampler(self.dw)
self.comb += sink.connect(self.subsampler.sink)
sink = self.subsampler.source
# connect recorder
self.comb += self.trigger.source.connect(self.recorder.trigger_sink)
if self.with_rle:
self.submodules.rle = LiteScopeRunLengthEncoder(self.dw, self.rle_length)
self.comb += [
sink.connect(self.rle.sink),
self.rle.source.connect(self.recorder.data_sink),
self.rle.external_enable.eq(self.recorder.post_hit)
]
else:
self.submodules.delay_buffer = Buffer(sink.description)
self.comb += [
sink.connect(self.delay_buffer.sink),
self.delay_buffer.source.connect(self.recorder.data_sink)
]
def export(self, vns, filename):
def format_line(*args):
return ",".join(args) + "\n"
r = ""
r += format_line("config", "dw", str(self.dw))
r += format_line("config", "depth", str(self.depth))
r += format_line("config", "with_rle", str(int(self.with_rle)))
r += format_line("config", "clk_ratio", str(int(self.clk_ratio)))
if not isinstance(self.layout, tuple):
self.layout = [self.layout]
for e in self.layout:
r += format_line("layout", vns.get_name(e), str(len(e)))
write_to_file(filename, r)

View File

@ -1,12 +1,12 @@
from struct import *
import os
from litex.gen.fhdl.structure import *
from litescope.software.dump.common import *
from litescope.software.dump import *
from litescope.software.driver.truthtable import *
import csv
class LiteScopeLogicAnalyzerDriver():
class LiteScopeAnalyzerDriver():
def __init__(self, regs, name, config_csv=None, clk_freq=None, debug=False):
self.regs = regs
self.name = name
@ -14,11 +14,8 @@ class LiteScopeLogicAnalyzerDriver():
if self.config_csv is None:
self.config_csv = name + ".csv"
if clk_freq is None:
try:
self.clk_freq = regs.identifier_frequency.read()
except:
self.clk_freq = None
self.samplerate = self.clk_freq
self.clk_freq = None
self.samplerate = None
else:
self.clk_freq = clk_freq
self.samplerate = clk_freq
@ -40,7 +37,7 @@ class LiteScopeLogicAnalyzerDriver():
csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
for item in csv_reader:
t, n, v = item
if t == "layout":
if t == "signal":
self.layout.append((n, int(v)))
def build(self):
@ -57,75 +54,47 @@ class LiteScopeLogicAnalyzerDriver():
setattr(self, name + "_m", (2**length-1) << value)
value += length
def configure_term(self, port, trigger=0, mask=0, cond=None):
def configure_trigger(self, value=0, mask=0, cond=None):
if cond is not None:
for k, v in cond.items():
trigger |= getattr(self, k + "_o")*v
value |= getattr(self, k + "_o")*v
mask |= getattr(self, k + "_m")
t = getattr(self, "trigger_port{d}_trig".format(d=int(port)))
m = getattr(self, "trigger_port{d}_mask".format(d=int(port)))
t.write(trigger)
t = getattr(self, "frontend_trigger_value")
m = getattr(self, "frontend_trigger_mask")
t.write(value)
m.write(mask)
def configure_range_detector(self, port, low, high):
l = getattr(self, "trigger_port{d}_low".format(d=int(port)))
h = getattr(self, "trigger_port{d}_high".format(d=int(port)))
l.write(low)
h.write(high)
def configure_edge_detector(self, port, rising_mask, falling_mask, both_mask):
rm = getattr(self, "trigger_port{d}_rising_mask".format(d=int(port)))
fm = getattr(self, "trigger_port{d}_falling_mask".format(d=int(port)))
bm = getattr(self, "trigger_port{d}_both_mask".format(d=int(port)))
rm.write(rising_mask)
fm.write(falling_mask)
bm.write(both_mask)
def configure_sum(self, equation):
datas = gen_truth_table(equation)
for adr, dat in enumerate(datas):
self.trigger_sum_prog_adr.write(adr)
self.trigger_sum_prog_dat.write(dat)
self.trigger_sum_prog_we.write(1)
def configure_subsampler(self, n):
self.subsampler_value.write(n-1)
def configure_subsampler(self, value):
self.frontend_subsampler_value.write(value-1)
if self.clk_freq is not None:
self.samplerate = self.clk_freq//n
else:
self.samplerate = None
def configure_qualifier(self, v):
self.recorder_qualifier.write(v)
def configure_rle(self, v):
self.rle_enable.write(v)
def done(self):
return self.recorder_done.read()
return self.storage_idle.read()
def run(self, offset, length):
while self.storage_mem_valid.read():
self.storage_mem_ready.write(1)
if self.debug:
print("running")
self.recorder_offset.write(offset)
self.recorder_length.write(length)
self.recorder_trigger.write(1)
self.storage_offset.write(offset)
self.storage_length.write(length)
self.storage_start.write(1)
def upload(self):
if self.debug:
print("uploading")
while self.recorder_source_valid.read():
self.data.append(self.recorder_source_data.read())
self.recorder_source_ready.write(1)
if self.clk_ratio > 1:
while self.storage_mem_valid.read():
self.data.append(self.storage_mem_data.read())
self.storage_mem_ready.write(1)
if self.cd_ratio > 1:
new_data = DumpData(self.dw)
for data in self.data:
for i in range(self.clk_ratio):
new_data.append(*get_bits([data], i*self.dw, (i+1)*self.dw))
self.data = new_data
if self.with_rle:
if self.rle_enable.read():
self.data = self.data.decode_rle()
return self.data
def save(self, filename):

View File

@ -1,14 +1,12 @@
class LiteScopeInOutDriver():
class LiteScopeIODriver():
def __init__(self, regs, name):
self.regs = regs
self.name = name
self.build()
def build(self):
for key, value in self.regs.d.items():
if self.name in key:
key = key.replace(self.name + "_", "")
setattr(self, key, value)
self.input = getattr(self.regs, self.name + "_in")
self.output = getattr(self.regs, self.name + "_out")
def write(self, value):
self.output.write(value)

View File

@ -1,55 +0,0 @@
import csv
class MappedReg:
def __init__(self, readfn, writefn, name, addr, length, busword, mode):
self.readfn = readfn
self.writefn = writefn
self.addr = addr
self.length = length
self.busword = busword
self.mode = mode
def read(self):
if self.mode not in ["rw", "ro"]:
raise KeyError(name + "register not readable")
datas = self.readfn(self.addr, burst_length=self.length)
if isinstance(datas, int):
return datas
else:
data = 0
for i in range(self.length):
data = data << self.busword
data |= datas[i]
return data
def write(self, value):
if self.mode not in ["rw", "wo"]:
raise KeyError(name + "register not writable")
datas = []
for i in range(self.length):
datas.append((value >> ((self.length-1-i)*self.busword)) & (2**self.busword-1))
self.writefn(self.addr, datas)
class MappedRegs:
def __init__(self, d):
self.d = d
def __getattr__(self, attr):
try:
return self.__dict__['d'][attr]
except KeyError:
pass
raise KeyError("No such register " + attr)
def build_map(addrmap, busword, readfn, writefn):
csv_reader = csv.reader(open(addrmap), delimiter=',', quotechar='#')
d = {}
for item in csv_reader:
name, addr, length, mode = item
addr = int(addr.replace("0x", ""), 16)
length = int(length)
d[name] = MappedReg(readfn, writefn, name, addr, length, busword, mode)
return MappedRegs(d)

View File

@ -1,53 +0,0 @@
import os
import re
import sys
def is_number(x):
try:
_ = float(x)
except ValueError:
return False
return True
def remove_numbers(seq):
return [x for x in seq if not is_number(x)]
def remove_duplicates(seq):
seen = set()
seen_add = seen.add
return [x for x in seq if x not in seen and not seen_add(x)]
def get_operands(s):
operands = re.findall("[A-z0-9_]+", s)
operands = remove_duplicates(operands)
operands = remove_numbers(operands)
return sorted(operands)
def gen_truth_table(s):
operands = get_operands(s)
width = len(operands)
stim = []
for i in range(width):
stim_op = []
for j in range(2**width):
stim_op.append((int(j/(2**i)))%2)
stim.append(stim_op)
truth_table = []
for i in range(2**width):
for j in range(width):
exec("{} = stim[j][i]".format(operands[j]))
truth_table.append(eval(s) != 0)
return truth_table
def main():
print(gen_truth_table("(A&B&C)|D"))
if __name__ == '__main__':
main()

View File

@ -45,20 +45,6 @@ class DumpData(list):
else:
raise KeyError
def decode_rle(self):
datas = DumpData(self.width - 1)
last_data = 0
for data in self:
rle = data >> (self.width - 1)
data = data & (2**(self.width - 1) - 1)
if rle:
for i in range(data):
datas.append(last_data)
else:
datas.append(data)
last_data = data
return datas
class DumpVariable:
def __init__(self, name, width, values=[]):

View File

@ -6,6 +6,9 @@ CMD = PYTHONPATH=$(COREDIR) $(PYTHON)
dump_tb:
$(CMD) dump_tb.py
analyzer_tb:
$(PYTHON) analyzer_tb.py
example_designs:
cd ../example_designs && $(PYTHON) make.py -t simple -p de0nano -Ob run False build-bitstream
cd ../example_designs && $(PYTHON) make.py -t simple -p kc705 -Ob run False build-bitstream

33
test/analyzer_tb.py Normal file
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python3
from litex.gen import *
from litescope import LiteScopeAnalyzer
class TB(Module):
def __init__(self):
counter = Signal(16)
self.sync += counter.eq(counter + 1)
self.submodules.analyzer = LiteScopeAnalyzer(counter, 128)
def main_generator(dut):
yield dut.analyzer.frontend.trigger.value.storage.eq(0x0080)
yield dut.analyzer.frontend.trigger.mask.storage.eq(0xfff0)
yield dut.analyzer.frontend.subsampler.value.storage.eq(1)
yield
yield dut.analyzer.storage.length.storage.eq(32)
yield dut.analyzer.storage.offset.storage.eq(16)
for i in range(16):
yield
yield dut.analyzer.storage.start.re.eq(1)
yield
yield dut.analyzer.storage.start.re.eq(0)
yield
for i in range(1024):
yield
if __name__ == "__main__":
tb = TB()
generators = {"sys" : [main_generator(tb)]}
clocks = {"sys": 10}
run_simulation(tb, generators, clocks, vcd_name="sim.vcd")