diff --git a/litex/soc/interconnect/axi.py b/litex/soc/interconnect/axi.py index dc8a32b3a..14ffe0303 100644 --- a/litex/soc/interconnect/axi.py +++ b/litex/soc/interconnect/axi.py @@ -936,3 +936,107 @@ class AXILiteInterconnectPointToPoint(Module): def __init__(self, master, slave): self.comb += master.connect(slave) +class AXILiteArbiter(Module): + """AXI Lite arbiter + + Arbitrate between master interfaces and connect one to the target. + Arbitration is done separately for write and read channels. + """ + def __init__(self, masters, target): + self.submodules.rr_write = roundrobin.RoundRobin(len(masters)) + self.submodules.rr_read = roundrobin.RoundRobin(len(masters)) + + def get_sig(interface, channel, name): + return getattr(getattr(interface, channel), name) + + # mux master->slave signals + for channel, name, direction in target.layout_flat(): + rr = self.rr_write if channel in ["aw", "w", "b"] else self.rr_read + if direction == DIR_M_TO_S: + choices = Array(get_sig(m, channel, name) for m in masters) + self.comb += get_sig(target, channel, name).eq(choices[rr.grant]) + + # connect slave->master signals + for channel, name, direction in target.layout_flat(): + rr = self.rr_write if channel in ["aw", "w", "b"] else self.rr_read + if direction == DIR_S_TO_M: + source = get_sig(target, channel, name) + for i, m in enumerate(masters): + dest = get_sig(m, channel, name) + if name == "ready": + self.comb += dest.eq(source & (rr.grant == i)) + else: + self.comb += dest.eq(source) + + # connect bus requests to round-robin selectors + self.comb += self.rr_write.request.eq(Cat(*[m.aw.valid | m.w.valid | m.b.valid for m in masters])) + self.comb += self.rr_read.request.eq(Cat(*[m.ar.valid | m.r.valid for m in masters])) + +class AXILiteDecoder(Module): + # slaves is a list of pairs: + # 0) function that takes the address signal and returns a FHDL expression + # that evaluates to 1 when the slave is selected and 0 otherwise. + # 1) wishbone.Slave reference. + # register adds flip-flops after the address comparators. Improves timing, + # but breaks Wishbone combinatorial feedback. + def __init__(self, master, slaves, register=False): + addr_shift = log2_int(master.data_width//8) + ns = len(slaves) + slave_sel = Signal(ns) + slave_sel_r = Signal(ns) + + def get_sig(interface, channel, name): + return getattr(getattr(interface, channel), name) + + # decode slave addresses + self.comb += [slave_sel[i].eq(fun(master.aw.addr[addr_shift:]) | fun(master.aw.addr[addr_shift:])) + for i, (fun, bus) in enumerate(slaves)] + if register: + self.sync += slave_sel_r.eq(slave_sel) + else: + self.comb += slave_sel_r.eq(slave_sel) + + # connect master->slaves signals except valid + for fun, slave in slaves: + for channel, name, direction in master.layout_flat(): + if direction == DIR_M_TO_S and name != "valid": + self.comb += get_sig(slave, channel, name).eq(get_sig(master, channel, name)) + + # combine cyc with slave selection signals + for i, (fun, slave) in enumerate(slaves): + for ch in ["aw", "w", "ar"]: + slave_valid = get_sig(slave, ch, "valid") + master_valid = get_sig(master, ch, "valid") + self.comb += slave_valid.eq(master_valid & slave_sel[i]) + + # generate master ready by ORing all slave readys + self.comb += [ + master.aw.ready.eq(reduce(or_, [slave.aw.ready for fun, slave in slaves])), + master.w.ready.eq(reduce(or_, [slave.w.ready for fun, slave in slaves])), + master.ar.ready.eq(reduce(or_, [slave.ar.ready for fun, slave in slaves])), + ] + + # mux (1-hot) slave data return + masked = [Replicate(slave_sel_r[i], len(master.r.data)) & slaves[i][1].r.data for i in range(ns)] + self.comb += master.r.data.eq(reduce(or_, masked)) + +class AXILiteInterconnectShared(Module): + def __init__(self, masters, slaves, register=False, timeout_cycles=1e6): + # TODO: data width + shared = AXILiteInterface() + self.submodules.arbiter = AXILiteArbiter(masters, shared) + self.submodules.decoder = AXILiteDecoder(shared, slaves, register) + if timeout_cycles is not None: + self.submodules.timeout = AXILiteTimeout(shared, timeout_cycles) + +class AXILiteCrossbar(Module): + def __init__(self, masters, slaves, register=False): + matches, busses = zip(*slaves) + access = [[AXILiteInterface() for j in slaves] for i in masters] + # decode each master into its access row + for row, master in zip(access, masters): + row = list(zip(matches, row)) + self.submodules += AXILiteDecoder(master, row, register) + # arbitrate each access column onto its slave + for column, bus in zip(zip(*access), busses): + self.submodules += AXILiteArbiter(column, bus)