core: i2c: add i2c master

add a i2c master similar to LiteSPI

Signed-off-by: Fin Maaß <f.maass@vogl-electronic.com>
This commit is contained in:
Fin Maaß 2024-07-26 16:18:02 +02:00
parent bccd1e9c54
commit 3a6b7b2246
7 changed files with 936 additions and 0 deletions

View File

@ -0,0 +1,93 @@
#
# This file is part of LiteX.
#
# Copyright (c) 2024 Fin Maaß <f.maass@vogl-electronic.com>
# SPDX-License-Identifier: BSD-2-Clause
from migen import *
from litex.soc.integration.doc import AutoDoc
from litex.soc.interconnect import stream
from litex.soc.interconnect.csr import *
from litex.soc.cores.litei2c.common import *
from litex.soc.cores.litei2c.crossbar import LiteI2CCrossbar
from litex.soc.cores.litei2c.master import LiteI2CMaster
from litex.soc.cores.litei2c.generic_phy import LiteI2CPHYCore
class LiteI2CCore(Module):
def __init__(self):
self.source = stream.Endpoint(i2c_core2phy_layout)
self.sink = stream.Endpoint(i2c_phy2core_layout)
self.enable = Signal()
class LiteI2C(Module, AutoCSR, AutoDoc):
"""I2C Controller wrapper.
The ``LiteI2C`` class provides a wrapper that can instantiate ``LiteI2CMaster`` and connect it to the PHY.
Access to PHY can be shared via crossbar.
Parameters
----------
sys_clk_freq : int
Frequency of the system clock.
phy : Module
Module or object that contains PHY stream interfaces and a enable signal to connect
the ``LiteI2C`` to. If not provided, it will be created automatically based on the pads.
pads : Object
I2C pads description.
clock_domain : str
Name of LiteI2C clock domain.
with_master : bool
Enables register-operated I2C master controller.
"""
def __init__(self, sys_clk_freq, phy=None, pads=None, clock_domain="sys",
with_master=True, i2c_master_tx_fifo_depth=1, i2c_master_rx_fifo_depth=1):
if phy is None:
if pads is None:
raise ValueError("Either phy or pads must be provided.")
self.submodules.phy = phy = LiteI2CPHYCore(pads, clock_domain, sys_clk_freq)
self.submodules.crossbar = crossbar = LiteI2CCrossbar(clock_domain)
self.comb += phy.enable.eq(crossbar.enable)
if with_master:
self.submodules.master = master = LiteI2CMaster(
tx_fifo_depth = i2c_master_tx_fifo_depth,
rx_fifo_depth = i2c_master_rx_fifo_depth)
port_master = crossbar.get_port(master.enable)
self.comb += [
port_master.source.connect(master.sink),
master.source.connect(port_master.sink),
]
if clock_domain != "sys":
self.comb += [
crossbar.tx_cdc.source.connect(phy.sink),
phy.source.connect(crossbar.rx_cdc.sink),
]
else:
self.comb += [
crossbar.master.source.connect(phy.sink),
phy.source.connect(crossbar.master.sink),
]
def add_i2c_device(self, i2c_device):
port = self.crossbar.get_port(i2c_device.enable)
self.comb += [
port.source.connect(i2c_device.sink),
i2c_device.source.connect(port.sink),
]

View File

@ -0,0 +1,110 @@
#
# This file is part of LiteX.
#
# Copyright (c) 2024 Fin Maaß <f.maass@vogl-electronic.com>
# SPDX-License-Identifier: BSD-2-Clause
from migen import *
import math
from litex.soc.integration.doc import AutoDoc, ModuleDoc
from litex.build.io import SDRTristate
def freq_to_div(sys_clk_freq, freq):
return math.ceil(sys_clk_freq / (4*freq)) - 1
class LiteI2CClkGen(Module, AutoDoc):
"""I2C Clock generator
The ``LiteI2CClkGen`` class provides a generic I2C clock generator.
Parameters
----------
pads : Object
i2C pads description.
i2c_speed_mode : Signal(2), in
I2C speed mode.
sys_clk_freq : int
System clock frequency.
Attributes
----------
posedge : Signal(), out
Outputs 1 when there is a rising edge on the generated clock, 0 otherwise.
negedge : Signal(), out
Outputs 1 when there is a falling edge on the generated clock, 0 otherwise.
en : Signal(), in
Clock enable input, output clock will be generated if set to 1, 0 resets the core.
tx : Signal(), out
Outputs 1 when the clock is high and the I2C bus is in the transmit state.
rx : Signal(), out
Outputs 1 when the clock is low and the I2C bus is in the receive state.
keep_low : Signal(), in
Forces the clock to be low, when the clock is disabled.
"""
def __init__(self, pads, i2c_speed_mode, sys_clk_freq):
# self.posedge = posedge = Signal()
# self.negedge = negedge = Signal()
self.tx = tx = Signal()
self.rx = rx = Signal()
self.en = en = Signal()
self.keep_low = keep_low = Signal()
cnt_width = bits_for(freq_to_div(sys_clk_freq, 100000))
div = Signal(cnt_width)
cnt = Signal(cnt_width)
sub_cnt = Signal(2)
clk = Signal(reset=1)
self.comb += [
Case(i2c_speed_mode, {
0 : div.eq(freq_to_div(sys_clk_freq, 100000)), # 100 kHz (Standard Mode)
1 : div.eq(freq_to_div(sys_clk_freq, 400000)), # 400 kHz (Fast Mode)
2 : div.eq(freq_to_div(sys_clk_freq, 1000000)), # 1000 kHz (Fast Mode Plus)
})]
self.comb += [
# negedge.eq(en & (sub_cnt == 0b00) & (cnt == div)),
tx.eq(en & (sub_cnt == 0b01) & (cnt == div)),
# posedge.eq(en & (sub_cnt == 0b10) & (cnt == div)),
rx.eq(en & (sub_cnt == 0b11) & (cnt == div)),
]
self.sync += [
If(en,
If(cnt < div,
cnt.eq(cnt+1),
).Else(
cnt.eq(0),
clk.eq(sub_cnt[1]),
If(sub_cnt < 3,
sub_cnt.eq(sub_cnt+1),
).Else(
sub_cnt.eq(0),
)
)
).Else(
clk.eq(~keep_low),
cnt.eq(0),
sub_cnt.eq(0),
)
]
self.specials += SDRTristate(
io = pads.scl,
o = Signal(), # I2C uses Pull-ups, only drive low.
oe = ~clk, # Drive when scl is low.
i = Signal(), # Not used.
)

View File

@ -0,0 +1,47 @@
#
# This file is part of LiteX.
#
# Copyright (c) 2024 Fin Maaß <f.maass@vogl-electronic.com>
# SPDX-License-Identifier: BSD-2-Clause
from migen import *
from migen.genlib.cdc import MultiReg
# Core <-> PHY Layouts -----------------------------------------------------------------------------
"""
Stream layout for LiteI2CCore->PHY connection:
data - data to be transmitted
addr - slave address
len_tx - number of bytes to transmit
len_rx - number of bytes to receive
"""
i2c_core2phy_layout = [
("data", 32),
("addr", 7),
("len_tx", 3),
("len_rx", 3),
("recover", 1)
]
"""
Stream layout for PHY->LiteI2CCore connection
data - received data
nack - NACK signal
unfinished_tx - another tx transfer is expected
unfinished_rx - another rx transfer is expected
"""
i2c_phy2core_layout = [
("data", 32),
("nack", 1),
("unfinished_tx", 1),
("unfinished_rx", 1)
]
# Helpers ------------------------------------------------------------------------------------------
class ResyncReg(Module):
def __init__(self, src, dst, clock_domain):
if clock_domain == "sys":
self.comb += dst.eq(src)
else:
self.specials += MultiReg(src, dst, clock_domain)

View File

@ -0,0 +1,98 @@
#
# This file is part of LiteX.
#
# Copyright (c) 2015 Florent Kermarrec <florent@enjoy-digital.fr>
# Copyright (c) 2020 Antmicro <www.antmicro.com>
# Copyright from LiteSPI file added above
#
# Copyright (c) 2024 Fin Maaß <f.maass@vogl-electronic.com>
# SPDX-License-Identifier: BSD-2-Clause
from collections import OrderedDict
from migen import *
from migen.genlib.roundrobin import RoundRobin
from litex.soc.cores.litei2c.common import *
from litex.soc.interconnect import stream
class LiteI2CMasterPort:
def __init__(self):
self.source = stream.Endpoint(i2c_core2phy_layout)
self.sink = stream.Endpoint(i2c_phy2core_layout)
class LiteI2CSlavePort:
def __init__(self):
self.source = stream.Endpoint(i2c_phy2core_layout)
self.sink = stream.Endpoint(i2c_core2phy_layout)
class LiteI2CCrossbar(Module):
def __init__(self, cd):
self.cd = cd
self.users = []
self.master = LiteI2CMasterPort()
if cd != "sys":
rx_cdc = stream.AsyncFIFO(i2c_phy2core_layout, 32, buffered=True)
tx_cdc = stream.AsyncFIFO(i2c_core2phy_layout, 32, buffered=True)
self.submodules.rx_cdc = ClockDomainsRenamer({"write": cd, "read": "sys"})(rx_cdc)
self.submodules.tx_cdc = ClockDomainsRenamer({"write": "sys", "read": cd})(tx_cdc)
self.comb += [
self.rx_cdc.source.connect(self.master.sink),
self.master.source.connect(self.tx_cdc.sink),
]
self.enable = Signal()
self.user_enable = []
self.user_request = []
def get_port(self, enable, request = None):
user_port = LiteI2CSlavePort()
internal_port = LiteI2CSlavePort()
tx_stream = user_port.sink
self.comb += tx_stream.connect(internal_port.sink)
rx_stream = internal_port.source
self.comb += rx_stream.connect(user_port.source)
if request is None:
request = Signal()
self.comb += request.eq(enable)
self.users.append(internal_port)
self.user_enable.append(self.enable.eq(enable))
self.user_request.append(request)
return user_port
def do_finalize(self):
self.submodules.rr = RoundRobin(len(self.users))
# TX
self.submodules.tx_mux = tx_mux = stream.Multiplexer(i2c_core2phy_layout, len(self.users))
# RX
self.submodules.rx_demux = rx_demux = stream.Demultiplexer(i2c_phy2core_layout, len(self.users))
for i, user in enumerate(self.users):
self.comb += [
user.sink.connect(getattr(tx_mux, f"sink{i}")),
getattr(rx_demux, f"source{i}").connect(user.source),
]
self.comb += [
self.rr.request.eq(Cat(self.user_request)),
self.tx_mux.source.connect(self.master.source),
self.tx_mux.sel.eq(self.rr.grant),
self.master.sink.connect(self.rx_demux.sink),
self.rx_demux.sel.eq(self.rr.grant),
Case(self.rr.grant, dict(enumerate(self.user_enable))),
]

View File

@ -0,0 +1,488 @@
#
# This file is part of LiteX.
#
# Copyright (c) 2024 Fin Maaß <f.maass@vogl-electronic.com>
# SPDX-License-Identifier: BSD-2-Clause
from migen import *
from litex.soc.cores.litei2c.common import *
from litex.soc.cores.litei2c.clkgen import LiteI2CClkGen
from litex.soc.interconnect import stream
from litex.soc.interconnect.csr import *
from litex.build.io import SDRTristate
from litex.soc.integration.doc import AutoDoc
# LiteI2C PHY Core ---------------------------------------------------------------------------------
class LiteI2CPHYCore(Module, AutoCSR, AutoDoc):
"""LiteI2C PHY instantiator
The ``LiteI2CPHYCore`` class provides a generic PHY that can be connected to the ``LiteI2C``.
Parameters
----------
pads : Object
I2C pads description.
clock_domain : str
The clock domain for the ``LiteI2CPHYCore``.
sys_clk_freq : int
Frequency of the system clock.
Attributes
----------
source : Endpoint(i2c_phy2core_layout), out
Data stream.
sink : Endpoint(i2c_core2phy_layout), in
Control stream.
enable : Signal(), in
Flash enable signal.
speed_mode : CSRStorage
Register which holds a clock divisor value applied to clkgen.
"""
def __init__(self, pads, clock_domain, sys_clk_freq):
self.source = source = stream.Endpoint(i2c_phy2core_layout)
self.sink = sink = stream.Endpoint(i2c_core2phy_layout)
self.enable = enable = Signal()
self._i2c_speed_mode = i2c_speed_mode = Signal(2)
self.speed_mode = speed_mode = CSRStorage(2, reset=0)
# # #
# Resynchronize CSR Clk Divisor to LiteI2C Clk Domain.
self.submodules += ResyncReg(speed_mode.storage, i2c_speed_mode, clock_domain)
# Clock Generator.
self.submodules.clkgen = clkgen = LiteI2CClkGen(pads, i2c_speed_mode, sys_clk_freq)
nack = Signal(reset_less=True)
# SDA
self.sda_o = sda_o = Signal()
self.sda_i = sda_i = Signal()
self.sda_oe = sda_oe = Signal()
self.specials += SDRTristate(
io = pads.sda,
o = Signal(), # I2C uses Pull-ups, only drive low.
oe = sda_oe & ~sda_o, # Drive when oe and sda is low.
i = sda_i,
)
bytes_send = Signal(3, reset_less=True)
bytes_recv = Signal(3, reset_less=True)
tx_done = Signal(reset_less=True)
# Data Shift Registers.
sr_addr = Signal(7, reset_less=True)
sr_cnt = Signal(8, reset_less=True)
sr_out_load = Signal()
sr_out_shift = Signal()
sr_out = Signal(len(sink.data), reset_less=True)
sr_out_en = Signal()
sr_in_shift = Signal()
sr_in = Signal(len(sink.data), reset_less=True)
len_tx_capped = Signal(3)
# Data Out Generation/Load/Shift.
self.comb += [
If(sr_out_en,
sda_oe.eq(1),
sda_o.eq(sr_out[-1:]),
),
If(sink.len_tx > 4,
len_tx_capped.eq(4),
).Else(
len_tx_capped.eq(sink.len_tx),
)
]
self.sync += If(sr_out_load,
sr_out.eq(sink.data << (len(sink.data) - len_tx_capped * 8)),
)
self.sync += If(sr_out_shift, sr_out.eq(Cat(Signal(1), sr_out)))
# Data In Shift.
self.sync += If(sr_in_shift, sr_in.eq(Cat(sda_i, sr_in)))
# FSM.
self.submodules.fsm = fsm = FSM(reset_state="WAIT-DATA")
fsm.act("WAIT-DATA",
NextValue(nack, 0),
NextValue(tx_done, 0),
# Wait for CS and a CMD from the Core.
If(enable & sink.valid,
# Start XFER.
NextState("START"),
),
)
fsm.act("START",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(1),
sda_o.eq(0),
NextValue(sr_addr, sink.addr),
NextValue(sr_cnt, 0),
If(clkgen.tx,
If(sink.recover,
NextState("RECOVER-1"),
).Else(
NextState("ADDR"),
),
),
)
fsm.act("ADDR",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(1),
sda_o.eq(sr_addr[-1]),
If(clkgen.tx,
If(sr_cnt == 6,
NextState("ADDR-RW"),
).Else(
NextValue(sr_addr, sr_addr << 1),
NextValue(sr_cnt, sr_cnt + 1),
),
),
)
fsm.act("ADDR-RW",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(1),
If((sink.len_tx > 0) & ~tx_done,
sda_o.eq(0),
).Elif(sink.len_rx > 0,
sda_o.eq(1),
).Else(
sda_o.eq(0),
),
If(clkgen.tx,
NextState("ADDR-ACK"),
)
)
fsm.act("ADDR-ACK",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(0),
If(clkgen.rx,
If(sda_i,
NextState("NACK-ERROR"),
).Else(
If((sink.len_tx > 0) & ~tx_done,
NextState("PRE-TX"),
).Elif(sink.len_rx > 0,
NextState("PRE-RX"),
).Else(
NextState("STOP-PRE"),
)
),
)
)
fsm.act("PRE-TX",
clkgen.en.eq(1),
NextValue(sr_cnt, 0),
NextValue(bytes_send, 0),
sr_out_load.eq(1),
If(clkgen.tx,
NextState("TX"),
),
)
fsm.act("TX",
# Generate Clk.
clkgen.en.eq(1),
sr_out_en.eq(1),
# Data Out Shift.
If(clkgen.tx,
If(sr_cnt == 7,
NextValue(sr_cnt, 0),
NextState("TX-ACK"),
NextValue(bytes_send, bytes_send + 1),
).Else(
NextValue(sr_cnt, sr_cnt + 1),
sr_out_shift.eq(1),
),
),
)
fsm.act("TX-ACK",
# Generate Clk.
clkgen.en.eq(1),
sr_out_en.eq(0),
sda_oe.eq(0),
If(clkgen.rx,
If(sda_i,
NextState("NACK-ERROR"),
).Else(
If((bytes_send == 4) & (sink.len_tx > 4),
NextState("TX-PRE-WAIT"),
).Elif(bytes_send < sink.len_tx,
NextState("TX-BEFORE-NEXT"),
).Else(
NextValue(tx_done, 1),
If(sink.len_rx > 0,
NextState("REPEATED-START-1"),
).Else(
NextState("STOP-PRE"),
),
),
),
),
)
fsm.act
fsm.act("TX-BEFORE-NEXT",
# Generate Clk.
clkgen.en.eq(1),
If(clkgen.tx,
sr_out_shift.eq(1),
NextState("TX"),
),
)
fsm.act("TX-PRE-WAIT",
# Generate Clk.
clkgen.en.eq(1),
sink.ready.eq(1),
If(clkgen.tx,
NextState("TX-WAIT-SEND-STATUS"),
),
)
fsm.act("TX-WAIT-SEND-STATUS",
# Generate Clk.
clkgen.en.eq(0),
clkgen.keep_low.eq(1),
source.unfinished_tx.eq(1),
source.valid.eq(1),
source.last.eq(1),
If(source.ready,
NextState("TX-WAIT"),
)
)
fsm.act("TX-WAIT",
# Generate Clk.
clkgen.en.eq(0),
clkgen.keep_low.eq(1),
NextValue(tx_done, 0),
If(enable & sink.valid,
NextState("PRE-TX"),
),
)
fsm.act("NACK-ERROR",
clkgen.en.eq(1),
NextValue(nack, 1),
If(clkgen.tx,
NextState("STOP"),
),
)
fsm.act("REPEATED-START-1",
# Generate Clk.
clkgen.en.eq(1),
If(clkgen.tx,
NextState("REPEATED-START-2"),
),
)
fsm.act("REPEATED-START-2",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(1),
sda_o.eq(1),
If(clkgen.rx,
NextState("START"),
),
)
fsm.act("PRE-RX",
clkgen.en.eq(1),
NextValue(sr_cnt, 0),
NextValue(bytes_recv, 0),
NextValue(sr_in, 0),
If(clkgen.tx,
NextState("RX"),
),
)
fsm.act("RX",
# Generate Clk.
clkgen.en.eq(1),
If(clkgen.rx,
NextValue(sr_cnt, sr_cnt + 1),
sr_in_shift.eq(1),
If(sr_cnt == 7,
NextValue(sr_cnt, 0),
NextValue(bytes_recv, bytes_recv + 1),
NextState("RX-PRE-ACK"),
),
),
)
fsm.act("RX-PRE-ACK",
# Generate Clk.
clkgen.en.eq(1),
If(clkgen.tx,
If(bytes_recv < sink.len_rx,
NextState("RX-ACK"),
).Else(
NextState("RX-NACK"),
),
),
)
fsm.act("RX-ACK",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(1),
sda_o.eq(0),
If(clkgen.tx,
NextValue(sr_cnt, 0),
If(bytes_recv == 4,
sink.ready.eq(1),
NextState("RX-WAIT-SEND-STATUS"),
).Else(
NextState("RX"),
),
),
)
fsm.act("RX-NACK",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(1),
sda_o.eq(1),
If(clkgen.tx,
NextState("STOP"),
),
)
fsm.act("RX-WAIT-SEND-STATUS",
# Generate Clk.
clkgen.en.eq(0),
clkgen.keep_low.eq(1),
source.data.eq(sr_in),
source.unfinished_rx.eq(1),
source.valid.eq(1),
source.last.eq(1),
If(source.ready,
NextState("RX-WAIT"),
)
)
fsm.act("RX-WAIT",
# Generate Clk.
clkgen.en.eq(0),
clkgen.keep_low.eq(1),
If(enable & sink.valid,
NextState("PRE-RX"),
),
)
fsm.act("STOP-PRE",
# Generate Clk.
clkgen.en.eq(1),
If(clkgen.tx,
NextState("STOP"),
),
)
fsm.act("STOP",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(1),
If(clkgen.rx,
sda_o.eq(1),
NextState("XFER-END"),
).Else(
sda_o.eq(0),
)
)
fsm.act("XFER-END",
# Accept CMD.
sink.ready.eq(1),
sda_oe.eq(1),
sda_o.eq(1),
# Send Status/Data to Core.
NextState("SEND-STATUS-DATA"),
)
fsm.act("SEND-STATUS-DATA",
# Send Data In to Core and return to WAIT when accepted.
sda_oe.eq(1),
sda_o.eq(1),
source.data.eq(sr_in),
source.nack.eq(nack),
source.valid.eq(1),
source.last.eq(1),
If(source.ready,
NextState("WAIT-DATA"),
)
)
fsm.act("RECOVER-1",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(1),
sda_o.eq(1),
If(sr_cnt < 9,
If(clkgen.tx,
NextValue(sr_cnt, sr_cnt + 1),
),
).Elif(clkgen.rx,
NextState("RECOVER-2"),
),
)
fsm.act("RECOVER-2",
# Generate Clk.
clkgen.en.eq(1),
sda_oe.eq(1),
sda_o.eq(0),
If(clkgen.tx,
NextState("STOP"),
),
)

View File

@ -0,0 +1,88 @@
#
# This file is part of LiteI2C
#
# Copyright (c) 2020 Antmicro <www.antmicro.com>
# SPDX-License-Identifier: BSD-2-Clause
from migen import *
from litex.soc.interconnect import stream
from litex.soc.interconnect.csr import *
from litex.soc.cores.litei2c.common import *
class LiteI2CMaster(Module, AutoCSR):
"""Generic LiteI2C Master
The ``LiteI2CMaster`` class provides a generic I2C master that can be controlled using CSRs.
Parameters
----------
fifo_depth : int
Depth of the internal TX/RX FIFO.
Attributes
----------
source : Endpoint(i2c_phy2core_layout), out
Data stream.
sink : Endpoint(i2c_core2phy_layout), in
Control stream.
enable : Signal(), out
Enable signal.
"""
def __init__(self, tx_fifo_depth=1, rx_fifo_depth=1):
self.sink = stream.Endpoint(i2c_phy2core_layout)
self.source = stream.Endpoint(i2c_core2phy_layout)
self.enable = Signal()
self._enable = CSRStorage()
self._settings = CSRStorage(fields=[
CSRField("len_tx", size=3, offset=0, description="I2C tx Xfer length (in bytes). Set to a value greater then 4 to anounce more data has to be transmitted."),
CSRField("len_rx", size=3, offset=8, description="I2C rx Xfer length (in bytes). Set to a value greater then 4 to anounce more data has to be received."),
CSRField("recover", size=1, offset=16, description="I2C recover bus. If set, the I2C bus will be recovered."),
], description="I2C transfer settings")
self._addr = CSRStorage(self.source.addr.nbits)
self._rxtx = CSR(self.source.data.nbits)
self._status = CSRStatus(fields=[
CSRField("tx_ready", size=1, offset=0, description="TX FIFO is not full."),
CSRField("rx_ready", size=1, offset=1, description="RX FIFO is not empty."),
CSRField("nack", size=1, offset=8, description="Error on transfer." ),
CSRField("tx_unfinished", size=1, offset=16, description="Another tx transfer is expected."),
CSRField("rx_unfinished", size=1, offset=17, description="Another rx transfer is expected.")
])
# FIFOs.
tx_fifo = stream.SyncFIFO(i2c_core2phy_layout, depth=tx_fifo_depth)
rx_fifo = stream.SyncFIFO(i2c_phy2core_layout, depth=rx_fifo_depth)
self.submodules += tx_fifo, rx_fifo
self.comb += self.sink.connect(rx_fifo.sink)
self.comb += tx_fifo.source.connect(self.source)
# I2C Enable.
self.comb += self.enable.eq(self._enable.storage)
# I2C TX.
self.comb += [
tx_fifo.sink.valid.eq(self._rxtx.re),
self._status.fields.tx_ready.eq(tx_fifo.sink.ready),
tx_fifo.sink.data.eq(self._rxtx.r),
tx_fifo.sink.addr.eq(self._addr.storage),
tx_fifo.sink.len_tx.eq(self._settings.fields.len_tx),
tx_fifo.sink.len_rx.eq(self._settings.fields.len_rx),
tx_fifo.sink.recover.eq(self._settings.fields.recover),
tx_fifo.sink.last.eq(1),
]
# I2C RX.
self.comb += [
rx_fifo.source.ready.eq(self._rxtx.we),
self._status.fields.rx_ready.eq(rx_fifo.source.valid),
self._status.fields.nack.eq(rx_fifo.source.nack),
self._status.fields.tx_unfinished.eq(rx_fifo.source.unfinished_tx),
self._status.fields.rx_unfinished.eq(rx_fifo.source.unfinished_rx),
self._rxtx.w.eq(rx_fifo.source.data),
]

View File

@ -2043,6 +2043,18 @@ class LiteXSoC(SoC):
add_ip_address_constants(self, "REMOTEIP", ethmac_remote_ip)
add_mac_address_constants(self, "MACADDR", ethmac_address)
# Add I2C Master -------------------------------------------------------------------------------
def add_i2c_master(self, name="i2cmaster", pads=None, **kwargs):
# Imports.
from litex.soc.cores.litei2c import LiteI2C
# Core.
self.check_if_exists(name)
if pads is None:
pads = self.platform.request(name)
i2c = LiteI2C(self.sys_clk_freq, pads=pads, **kwargs)
self.add_module(name=name, module=i2c)
# Add SPI Master --------------------------------------------------------------------------------
def add_spi_master(self, name="spimaster", pads=None, data_width=8, spi_clk_freq=1e6, with_clk_divider=True, **kwargs):
# Imports.