Avalon frontend for LiteDRAM (#337)

Add initial Avalon MM frontend + tests.
This commit is contained in:
Hans Baier 2023-05-23 19:52:05 +07:00 committed by GitHub
parent d7df59560e
commit f1293eae1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 395 additions and 2 deletions

View File

@ -41,6 +41,9 @@
"type": "wishbone", "type": "wishbone",
"block_until_ready": True, "block_until_ready": True,
}, },
"avalon_0" : {
"type": "avalon",
},
"native_0" : { "native_0" : {
"type": "native", "type": "native",
}, },

View File

@ -104,7 +104,7 @@ class LiteDRAMCrossbar(Module):
self.submodules += LiteDRAMNativePortCDC(new_port, port) self.submodules += LiteDRAMNativePortCDC(new_port, port)
port = new_port port = new_port
# Data width convertion -------------------------------------------------------------------- # Data width conversion --------------------------------------------------------------------
if data_width != self.controller.data_width: if data_width != self.controller.data_width:
if data_width > self.controller.data_width: if data_width > self.controller.data_width:
addr_shift = -log2_int(data_width//self.controller.data_width) addr_shift = -log2_int(data_width//self.controller.data_width)

178
litedram/frontend/avalon.py Normal file
View File

@ -0,0 +1,178 @@
#
# This file is part of LiteDRAM.
#
# Copyright (c) 2023 Hans Baier <hansfbaier@gmail.com>
# SPDX-License-Identifier: BSD-2-Clause
"""Wishbone frontend for LiteDRAM"""
from math import log2
from migen import *
from litex.soc.interconnect import stream
from litedram.common import LiteDRAMNativePort
from litedram.frontend.adapter import LiteDRAMNativePortConverter
# LiteDRAMWishbone2Native --------------------------------------------------------------------------
class LiteDRAMAvalonMM2Native(Module):
def __init__(self, avalon, port, base_address=0x00000000, burst_increment=1):
avalon_data_width = len(avalon.writedata)
port_data_width = 2**int(log2(len(port.wdata.data))) # Round to lowest power 2
ratio = avalon_data_width/port_data_width
downconvert = ratio > 1
upconvert = ratio < 1
if avalon_data_width != port_data_width:
if avalon_data_width > port_data_width:
addr_shift = -log2_int(avalon_data_width//port_data_width)
else:
addr_shift = log2_int(port_data_width//avalon_data_width)
new_port = LiteDRAMNativePort(
mode = port.mode,
address_width = port.address_width + addr_shift,
data_width = avalon_data_width
)
self.submodules += LiteDRAMNativePortConverter(new_port, port)
port = new_port
# # #
offset = base_address >> log2_int(port.data_width//8)
burstcounter = Signal(9)
active_burst = Signal()
address = Signal(avalon_data_width)
byteenable = Signal.like(avalon.byteenable)
writedata = Signal(avalon_data_width)
start_transaction = Signal()
cmd_ready_seen = Signal()
self.comb += active_burst.eq(2 <= burstcounter)
self.sync += [
If(start_transaction,
byteenable.eq(avalon.byteenable),
burstcounter.eq(avalon.burstcount),
address.eq(avalon.address - offset))
]
start_condition = start_transaction if downconvert else (start_transaction & port.cmd.ready)
self.submodules.fsm = fsm = FSM(reset_state="START")
fsm.act("START",
avalon.waitrequest.eq(1),
port.cmd.addr.eq(avalon.address - offset),
port.cmd.we.eq(avalon.write),
port.cmd.valid .eq(avalon.read | avalon.write),
start_transaction.eq(avalon.read | avalon.write),
If(start_condition,
[] if downconvert else [avalon.waitrequest.eq(0)],
If (avalon.write,
[
port.wdata.data.eq(avalon.writedata),
port.wdata.valid.eq(1),
port.wdata.we.eq(avalon.byteenable),
] if downconvert else [],
NextValue(writedata, avalon.writedata),
[] if downconvert else [NextValue(port.cmd.last, 0)],
NextState("WRITE_DATA")
).Elif(avalon.read,
NextState("READ_DATA")))
)
fsm.act("WRITE_CMD",
avalon.waitrequest.eq(1),
port.rdata.ready.eq(0),
port.cmd.addr.eq(address),
port.cmd.we.eq(1),
port.cmd.valid.eq(1),
If(port.cmd.ready, NextState("WRITE_DATA")))
fsm.act("WRITE_DATA",
avalon.waitrequest.eq(1),
port.rdata.ready.eq(0),
[
port.cmd.addr.eq(address),
port.cmd.we.eq(1),
port.cmd.valid.eq(1),
If(port.cmd.ready, NextValue(cmd_ready_seen, 1)),
If(cmd_ready_seen,
port.cmd.valid.eq(0),
port.cmd.we.eq(0)
),
] if downconvert else [],
port.wdata.data.eq(writedata),
port.wdata.valid.eq(1),
port.wdata.we.eq(byteenable),
If(port.wdata.ready,
avalon.waitrequest.eq(active_burst if downconvert else ~active_burst),
NextValue(writedata, avalon.writedata),
If(~active_burst,
port.flush.eq(1),
NextValue(cmd_ready_seen, 0) if downconvert else NextValue(port.cmd.last, 1),
NextValue(burstcounter, 0),
NextValue(byteenable, 0),
# this marks the end of a write cycle
NextState("START")
).Else(
NextValue(address, address + burst_increment),
NextValue(burstcounter, burstcounter - 1),
NextState("WRITE_CMD")))
)
if downconvert:
self.comb += port.cmd.last.eq(~(fsm.ongoing("WRITE_CMD") | fsm.ongoing("WRITE_DATA") | avalon.write))
fsm.act("READ_CMD",
avalon.waitrequest.eq(1),
port.rdata.ready.eq(1),
port.cmd.addr.eq(address),
port.cmd.we.eq(0),
port.cmd.valid.eq(1),
If(port.cmd.ready,
NextState("READ_DATA")))
fsm.act("READ_DATA",
avalon.waitrequest.eq(1),
port.rdata.ready.eq(1),
[
port.cmd.addr.eq(address),
port.cmd.we.eq(0),
port.cmd.valid.eq(1),
If(port.cmd.ready, NextValue(cmd_ready_seen, 1)),
If(cmd_ready_seen,
port.cmd.valid.eq(0),
port.cmd.we.eq(0)
),
] if downconvert else [],
If(port.rdata.valid,
avalon.readdata.eq(port.rdata.data),
avalon.readdatavalid.eq(1),
If(~active_burst,
[
port.cmd.valid.eq(0),
avalon.waitrequest.eq(0),
NextValue(cmd_ready_seen, 0),
] if downconvert else [],
NextValue(burstcounter, 0),
NextState("START")
).Else(NextValue(address, address + burst_increment),
NextValue(burstcounter, burstcounter - 1),
NextState("READ_CMD")))
)

View File

@ -48,6 +48,7 @@ from litex.soc.cores.clock import *
from litex.soc.integration.soc_core import * from litex.soc.integration.soc_core import *
from litex.soc.integration.builder import * from litex.soc.integration.builder import *
from litex.soc.interconnect import wishbone from litex.soc.interconnect import wishbone
from litex.soc.interconnect import avalon
from litex.soc.cores.uart import * from litex.soc.cores.uart import *
from litedram import modules as litedram_modules from litedram import modules as litedram_modules
@ -60,6 +61,7 @@ from litedram.core.controller import ControllerSettings
from litedram.frontend.axi import * from litedram.frontend.axi import *
from litedram.frontend.wishbone import * from litedram.frontend.wishbone import *
from litedram.frontend.avalon import *
from litedram.frontend.bist import LiteDRAMBISTGenerator from litedram.frontend.bist import LiteDRAMBISTGenerator
from litedram.frontend.bist import LiteDRAMBISTChecker from litedram.frontend.bist import LiteDRAMBISTChecker
from litedram.frontend.fifo import LiteDRAMFIFO from litedram.frontend.fifo import LiteDRAMFIFO
@ -215,6 +217,21 @@ def get_wishbone_user_port_ios(_id, aw, dw):
), ),
] ]
def get_avalon_user_port_ios(_id, aw, dw):
return [
("user_port_{}".format(_id), 0,
Subsignal("address", Pins(aw)),
Subsignal("writedata", Pins(dw)),
Subsignal("readdata", Pins(dw)),
Subsignal("readdatavalid", Pins(1)),
Subsignal("byteenable", Pins(dw//8)),
Subsignal("read", Pins(1)),
Subsignal("write", Pins(1)),
Subsignal("waitrequest", Pins(1)),
Subsignal("burstcount", Pins(8)),
),
]
def get_axi_user_port_ios(_id, aw, dw, iw): def get_axi_user_port_ios(_id, aw, dw, iw):
return [ return [
("user_port_{}".format(_id), 0, ("user_port_{}".format(_id), 0,
@ -667,7 +684,7 @@ class LiteDRAMCore(SoCCore):
self.comb += user_enable.eq(1) self.comb += user_enable.eq(1)
# Request user port on crossbar and add optional ECC. # Request user port on crossbar and add optional ECC.
if port["type"] in ["native", "wishbone", "axi"]: if port["type"] in ["native", "wishbone", "avalon", "axi"]:
# With ECC. # With ECC.
if port.get("ecc", False): if port.get("ecc", False):
assert port.get("data_width", None) is not None assert port.get("data_width", None) is not None
@ -729,6 +746,28 @@ class LiteDRAMCore(SoCCore):
wb_port.we.eq(_wb_port_io.we), wb_port.we.eq(_wb_port_io.we),
_wb_port_io.err.eq(wb_port.err), _wb_port_io.err.eq(wb_port.err),
] ]
# Avalon -----------------------------------------------------------------------------
elif port["type"] == "avalon":
avalon_port = avalon.AvalonMMInterface(
user_port.data_width,
user_port.address_width)
avalon2native = LiteDRAMAvalonMM2Native(avalon_port, user_port)
self.submodules += avalon2native
platform.add_extension(get_avalon_user_port_ios(name,
len(avalon_port.address),
len(avalon_port.writedata)))
_avalon_port_io = platform.request("user_port_{}".format(name))
self.comb += [
avalon_port.address.eq(_avalon_port_io.address),
avalon_port.writedata.eq(_avalon_port_io.writedata),
_avalon_port_io.readdata.eq(avalon_port.readdata),
_avalon_port_io.readdatavalid.eq(avalon_port.readdatavalid),
avalon_port.burstcount.eq(_avalon_port_io.burstcount),
avalon_port.byteenable.eq(_avalon_port_io.byteenable),
avalon_port.write.eq(_avalon_port_io.write & user_enable),
avalon_port.read.eq(_avalon_port_io.read & user_enable),
_avalon_port_io.waitrequest.eq(avalon_port.waitrequest | ~user_enable),
]
# AXI ---------------------------------------------------------------------------------- # AXI ----------------------------------------------------------------------------------
elif port["type"] == "axi": elif port["type"] == "axi":
axi_port = LiteDRAMAXIPort( axi_port = LiteDRAMAXIPort(

173
test/test_avalon.py Normal file
View File

@ -0,0 +1,173 @@
#
# This file is part of LiteDRAM.
#
# Copyright (c) 2023 Hans Baier <hansfbaier@gmail.com>
# SPDX-License-Identifier: BSD-2-Clause
import unittest
from migen import *
from litex.gen.sim import run_simulation
from litex.soc.interconnect import avalon
from litedram.frontend.avalon import LiteDRAMAvalonMM2Native
from litedram.common import LiteDRAMNativePort
from test.common import DRAMMemory, MemoryTestDataMixin
class DUT(Module):
def __init__(self, port, avalon, base_address=0x0, mem_expected=[]):
self.port = port
self.avalon = avalon
self.submodules += LiteDRAMAvalonMM2Native(
avalon = self.avalon,
port = self.port,
base_address = base_address)
self.mem = DRAMMemory(port.data_width, len(mem_expected))
class TestAvalon(MemoryTestDataMixin, unittest.TestCase):
def avalon_readback_test(self, pattern, mem_expected, avalon, port, base_address=0):
def main_generator(dut):
for adr, data in pattern:
yield from dut.avalon.bus_write(adr, data)
data_r = (yield from dut.avalon.bus_read(adr))
self.assertEqual(data_r, data)
dut = DUT(port, avalon, base_address, mem_expected)
generators = [
main_generator(dut),
dut.mem.write_handler(dut.port),
dut.mem.read_handler(dut.port),
]
run_simulation(dut, generators, vcd_name='sim.vcd')
self.assertEqual(dut.mem.mem, mem_expected)
def test_avalon_8bit(self):
# Verify AvalonMM with 8-bit data width.
data = self.pattern_test_data["8bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=8)
port = LiteDRAMNativePort("both", address_width=30, data_width=8)
self.avalon_readback_test(data["pattern"], data["expected"], avl, port)
def test_avalon_32bit(self):
# Verify AvalonMM with 32-bit data width.
data = self.pattern_test_data["32bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=32)
port = LiteDRAMNativePort("both", address_width=30, data_width=32)
self.avalon_readback_test(data["pattern"], data["expected"], avl, port)
def test_avalon_64bit(self):
# Verify AvalonMM with 64-bit data width.
data = self.pattern_test_data["64bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=64)
port = LiteDRAMNativePort("both", address_width=30, data_width=64)
self.avalon_readback_test(data["pattern"], data["expected"], avl, port)
def test_avalon_64bit_to_32bit(self):
# Verify AvalonMM with 64-bit data width down-converted to 32-bit data width.
data = self.pattern_test_data["64bit_to_32bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=64)
port = LiteDRAMNativePort("both", address_width=30, data_width=32)
self.avalon_readback_test(data["pattern"], data["expected"], avl, port)
def test_avalon_64bit_to_32bit_base_address(self):
# Verify AvalonMM with 64-bit data width down-converted to 32-bit data width and non-zero base address.
data = self.pattern_test_data["64bit_to_32bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=64)
port = LiteDRAMNativePort("both", address_width=30, data_width=32)
origin = 0x10000000
pattern = [(adr + origin//(64//8), data) for adr, data in data["pattern"]]
self.avalon_readback_test(pattern, data["expected"], avl, port, base_address=origin)
def test_avalon_32bit_to_8bit(self):
# Verify AvalonMM with 32-bit data width down-converted to 8-bit data width.
data = self.pattern_test_data["32bit_to_8bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=32)
port = LiteDRAMNativePort("both", address_width=30, data_width=8)
self.avalon_readback_test(data["pattern"], data["expected"], avl, port)
def test_avalon_32bit_to_8bit_base_address(self):
# Verify AvalonMM with 32-bit data width down-converted to 8-bit data width and non-zero base address.
data = self.pattern_test_data["32bit_to_8bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=32)
port = LiteDRAMNativePort("both", address_width=30, data_width=8)
origin = 0x10000000
pattern = [(adr + origin//(32//8), data) for adr, data in data["pattern"]]
self.avalon_readback_test(pattern, data["expected"], avl, port, base_address=origin)
def test_avalon_8bit_to_32bit(self):
# Verify AvalonMM with 8-bit data width up-converted to 32-bit data width.
data = self.pattern_test_data["8bit_to_32bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=8)
port = LiteDRAMNativePort("both", address_width=30, data_width=32)
self.avalon_readback_test(data["pattern"], data["expected"], avl, port)
def test_avalon_32bit_to_64bit(self):
# Verify AvalonMM with 32-bit data width up-converted to 64-bit data width.
data = self.pattern_test_data["32bit_to_64bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=32)
port = LiteDRAMNativePort("both", address_width=30, data_width=64)
self.avalon_readback_test(data["pattern"], data["expected"], avl, port)
def test_avalon_32bit_to_128bit(self):
# Verify AvalonMM with 32-bit data width up-converted to 128-bit data width.
data = self.pattern_test_data["32bit_to_128bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=32)
port = LiteDRAMNativePort("both", address_width=30, data_width=128)
self.avalon_readback_test(data["pattern"], data["expected"], avl, port)
def test_avalon_32bit_to_256bit(self):
# Verify AvalonMM with 32-bit data width up-converted to 128-bit data width.
data = self.pattern_test_data["32bit_to_256bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=32)
port = LiteDRAMNativePort("both", address_width=30, data_width=256)
self.avalon_readback_test(data["pattern"], data["expected"], avl, port)
def test_avalon_32bit_base_address(self):
# Verify AvalonMM with 32-bit data width and non-zero base address.
data = self.pattern_test_data["32bit"]
avl = avalon.AvalonMMInterface(adr_width=30, data_width=32)
port = LiteDRAMNativePort("both", address_width=30, data_width=32)
origin = 0x10000000
# add offset (in data words)
pattern = [(adr + origin//(32//8), data) for adr, data in data["pattern"]]
self.avalon_readback_test(pattern, data["expected"], avl, port, base_address=origin)
def test_avalon_burst(self):
data = [0x01234567, 0x89abcdef, 0xdeadbeef, 0xc0ffee00, 0x76543210]
def main_generator(dut):
yield from dut.avalon.bus_write(0x0, data)
yield
self.assertEqual((yield from dut.avalon.bus_read(0x0000, burstcount=5)), 0x01234567)
self.assertEqual((yield dut.avalon.readdatavalid), 1)
self.assertEqual((yield from dut.avalon.continue_read_burst()), 0x89abcdef)
self.assertEqual((yield dut.avalon.readdatavalid), 1)
self.assertEqual((yield from dut.avalon.continue_read_burst()), 0xdeadbeef)
self.assertEqual((yield dut.avalon.readdatavalid), 1)
self.assertEqual((yield from dut.avalon.continue_read_burst()), 0xc0ffee00)
self.assertEqual((yield dut.avalon.readdatavalid), 1)
self.assertEqual((yield from dut.avalon.continue_read_burst()), 0x76543210)
yield
yield
yield
yield
self.assertEqual((yield from dut.avalon.bus_read(0x0000)), 0x01234567)
self.assertEqual((yield from dut.avalon.bus_read(0x0001)), 0x89abcdef)
self.assertEqual((yield from dut.avalon.bus_read(0x0002)), 0xdeadbeef)
self.assertEqual((yield from dut.avalon.bus_read(0x0003)), 0xc0ffee00)
self.assertEqual((yield from dut.avalon.bus_read(0x0004)), 0x76543210)
yield
yield
avl = avalon.AvalonMMInterface(adr_width=30, data_width=32)
port = LiteDRAMNativePort("both", address_width=30, data_width=32)
dut = DUT(port, avl, base_address=0x0, mem_expected=data)
generators = [
main_generator(dut),
dut.mem.write_handler(dut.port),
dut.mem.read_handler(dut.port),
]
run_simulation(dut, generators, vcd_name='sim.vcd')
self.assertEqual(dut.mem.mem, data)