mirror of
https://github.com/enjoy-digital/litex.git
synced 2025-01-04 09:52:26 -05:00
Merge pull request #2018 from motec-research/add_i2c_master
Add i2c master
This commit is contained in:
commit
4301293b21
2 changed files with 517 additions and 0 deletions
291
litex/soc/cores/i2c.py
Normal file
291
litex/soc/cores/i2c.py
Normal file
|
@ -0,0 +1,291 @@
|
|||
#
|
||||
# This file is part of MiSoC and has been adapted/modified for Litex.
|
||||
#
|
||||
# Copyright 2007-2023 / M-Labs Ltd
|
||||
# Copyright 2012-2015 / Enjoy-Digital
|
||||
# Copyright from Misoc LICENCE file added above
|
||||
#
|
||||
# Copyright 2023 Andrew Dennison <andrew@motec.com.au>
|
||||
#
|
||||
# SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
from migen import *
|
||||
from litex.gen import *
|
||||
from litex.soc.interconnect import wishbone
|
||||
from litex.soc.interconnect.csr_eventmanager import *
|
||||
|
||||
# I2C-----------------------------------------------------------------------------------------------
|
||||
|
||||
__all__ = [
|
||||
"I2CMaster",
|
||||
"I2C_XFER_ADDR", "I2C_CONFIG_ADDR",
|
||||
"I2C_ACK", "I2C_READ", "I2C_WRITE", "I2C_STOP", "I2C_START", "I2C_IDLE",
|
||||
]
|
||||
|
||||
|
||||
class I2CClockGen(LiteXModule):
|
||||
def __init__(self, width):
|
||||
self.load = Signal(width)
|
||||
self.clk2x = Signal()
|
||||
|
||||
cnt = Signal.like(self.load)
|
||||
self.comb += [
|
||||
self.clk2x.eq(cnt == 0),
|
||||
]
|
||||
self.sync += [
|
||||
If(self.clk2x,
|
||||
cnt.eq(self.load),
|
||||
).Else(
|
||||
cnt.eq(cnt - 1),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
class I2CMasterMachine(LiteXModule):
|
||||
def __init__(self, clock_width):
|
||||
self.scl_o = Signal(reset=1)
|
||||
self.sda_o = Signal(reset=1)
|
||||
self.sda_i = Signal()
|
||||
|
||||
self.cg = CEInserter()(I2CClockGen(clock_width))
|
||||
self.idle = Signal()
|
||||
self.start = Signal()
|
||||
self.stop = Signal()
|
||||
self.write = Signal()
|
||||
self.read = Signal()
|
||||
self.ack = Signal()
|
||||
self.data = Signal(8)
|
||||
|
||||
###
|
||||
|
||||
busy = Signal()
|
||||
bits = Signal(4)
|
||||
|
||||
fsm = CEInserter()(FSM("IDLE"))
|
||||
self.fsm = fsm
|
||||
|
||||
fsm.act("IDLE",
|
||||
# Valid combinations (lowest to highest priority):
|
||||
# stop: lowest priority
|
||||
# read (& optional stop with automatic NACK)
|
||||
# write (& optional stop)
|
||||
# start (indicates start or restart)
|
||||
# start & write (& optional stop)
|
||||
# start & write & read (& optional stop)
|
||||
# lowest priority
|
||||
# *** TODO: support compound commands with I2CMaster ***
|
||||
If(self.stop & ~self.scl_o,
|
||||
# stop is only valid after an ACK
|
||||
NextState("STOP0"),
|
||||
),
|
||||
If(self.read,
|
||||
# post decrement so read first bit and shift in 7
|
||||
NextValue(bits, 8-1),
|
||||
NextState("READ0"),
|
||||
),
|
||||
If(self.write,
|
||||
NextValue(bits, 8),
|
||||
NextState("WRITE0"),
|
||||
),
|
||||
# start could be requesting a restart
|
||||
If(self.start,
|
||||
NextState("RESTART0"),
|
||||
),
|
||||
# highest priority: start only if scl is high
|
||||
If(self.start & self.scl_o,
|
||||
NextState("START0"),
|
||||
),
|
||||
)
|
||||
|
||||
fsm.act("START0",
|
||||
# Always entered with scl_o = 1
|
||||
NextValue(self.sda_o, 0),
|
||||
NextState("IDLE"))
|
||||
|
||||
fsm.act("RESTART0",
|
||||
# Only entered from IDLE with scl_o = 0
|
||||
NextValue(self.sda_o, 1),
|
||||
NextState("RESTART1"))
|
||||
fsm.act("RESTART1",
|
||||
NextValue(self.scl_o, 1),
|
||||
NextState("START0"))
|
||||
|
||||
fsm.act("STOP0",
|
||||
# Only entered from IDLE with scl_o = 0
|
||||
NextValue(self.sda_o, 0),
|
||||
NextState("STOP1"))
|
||||
fsm.act("STOP1",
|
||||
NextValue(self.scl_o, 1),
|
||||
NextState("STOP2"))
|
||||
fsm.act("STOP2",
|
||||
NextValue(self.sda_o, 1),
|
||||
NextState("IDLE"))
|
||||
|
||||
fsm.act("WRITE0",
|
||||
NextValue(self.scl_o, 0),
|
||||
If(bits == 0,
|
||||
NextValue(self.sda_o, 1),
|
||||
NextState("READACK0"),
|
||||
).Else(
|
||||
NextValue(self.sda_o, self.data[7]),
|
||||
NextState("WRITE1"),
|
||||
)
|
||||
)
|
||||
fsm.act("WRITE1",
|
||||
NextValue(self.scl_o, 1),
|
||||
NextValue(self.data[1:], self.data[:-1]),
|
||||
NextValue(bits, bits - 1),
|
||||
NextState("WRITE0"),
|
||||
)
|
||||
fsm.act("READACK0",
|
||||
NextValue(self.scl_o, 1),
|
||||
NextState("READACK1"),
|
||||
)
|
||||
fsm.act("READACK1",
|
||||
# ACK => IDLE always with scl_o = 0
|
||||
NextValue(self.scl_o, 0),
|
||||
NextValue(self.ack, ~self.sda_i),
|
||||
NextState("IDLE")
|
||||
)
|
||||
|
||||
fsm.act("READ0",
|
||||
# ACK => IDLE => READ0 always with scl_o = 0
|
||||
NextValue(self.scl_o, 1),
|
||||
NextState("READ1"),
|
||||
)
|
||||
fsm.act("READ1",
|
||||
NextValue(self.data[0], self.sda_i),
|
||||
NextValue(self.scl_o, 0),
|
||||
If(bits == 0,
|
||||
NextValue(self.sda_o, ~self.ack),
|
||||
NextState("WRITEACK0"),
|
||||
).Else(
|
||||
#NextValue(self.sda_o, 1), must already be high
|
||||
NextState("READ2"),
|
||||
)
|
||||
)
|
||||
fsm.act("READ2",
|
||||
NextValue(self.scl_o, 1),
|
||||
NextValue(self.data[1:], self.data[:-1]),
|
||||
NextValue(bits, bits - 1),
|
||||
NextState("READ1"),
|
||||
)
|
||||
fsm.act("WRITEACK0",
|
||||
NextValue(self.scl_o, 1),
|
||||
NextState("WRITEACK1"),
|
||||
)
|
||||
fsm.act("WRITEACK1",
|
||||
# ACK => IDLE always with scl_o = 0
|
||||
NextValue(self.scl_o, 0),
|
||||
NextValue(self.sda_o, 1),
|
||||
NextState("IDLE")
|
||||
)
|
||||
|
||||
run = Signal()
|
||||
self.comb += [
|
||||
run.eq(self.start | self.stop | self.write | self.read),
|
||||
self.idle.eq(~run & fsm.ongoing("IDLE")),
|
||||
self.cg.ce.eq(~self.idle),
|
||||
fsm.ce.eq(run | self.cg.clk2x),
|
||||
]
|
||||
|
||||
# Registers:
|
||||
# config = Record([
|
||||
# ("div", 20),
|
||||
# ])
|
||||
# xfer = Record([
|
||||
# ("data", 8),
|
||||
# ("ack", 1),
|
||||
# ("read", 1),
|
||||
# ("write", 1),
|
||||
# ("start", 1),
|
||||
# ("stop", 1),
|
||||
# ("idle", 1),
|
||||
# ])
|
||||
class I2CMaster(LiteXModule):
|
||||
def __init__(self, pads, bus=None):
|
||||
if bus is None:
|
||||
bus = wishbone.Interface(data_width=32)
|
||||
self.bus = bus
|
||||
|
||||
###
|
||||
|
||||
# Wishbone
|
||||
self.i2c = i2c = I2CMasterMachine(
|
||||
clock_width=20)
|
||||
|
||||
self.sync += [
|
||||
# read
|
||||
If(bus.adr[0],
|
||||
bus.dat_r.eq(i2c.cg.load),
|
||||
).Else(
|
||||
bus.dat_r.eq(Cat(i2c.data, i2c.ack, C(0, 4), i2c.idle)),
|
||||
),
|
||||
|
||||
# write
|
||||
i2c.read.eq(0),
|
||||
i2c.write.eq(0),
|
||||
i2c.start.eq(0),
|
||||
i2c.stop.eq(0),
|
||||
|
||||
bus.ack.eq(0),
|
||||
If(bus.cyc & bus.stb & ~bus.ack,
|
||||
bus.ack.eq(1),
|
||||
If(bus.we,
|
||||
If(bus.adr[0],
|
||||
i2c.cg.load.eq(bus.dat_w),
|
||||
).Else(
|
||||
i2c.data.eq(bus.dat_w[0:8]),
|
||||
i2c.ack.eq(bus.dat_w[8]),
|
||||
i2c.read.eq(bus.dat_w[9]),
|
||||
i2c.write.eq(bus.dat_w[10]),
|
||||
i2c.start.eq(bus.dat_w[11]),
|
||||
i2c.stop.eq(bus.dat_w[12]),
|
||||
)
|
||||
)
|
||||
)
|
||||
]
|
||||
|
||||
# I/O
|
||||
self.scl_t = TSTriple()
|
||||
self.scl_tristate = self.scl_t.get_tristate(pads.scl)
|
||||
self.comb += [
|
||||
self.scl_t.oe.eq(~i2c.scl_o),
|
||||
self.scl_t.o.eq(0),
|
||||
]
|
||||
|
||||
self.sda_t = TSTriple()
|
||||
self.sda_tristate = self.sda_t.get_tristate(pads.sda)
|
||||
|
||||
self.scl_i_n = Signal() # previous scl_t.i
|
||||
self.sda_oe_n = Signal() # previous sda_t.oe
|
||||
self.sync += [
|
||||
self.scl_i_n.eq(self.scl_t.i),
|
||||
self.sda_oe_n.eq(self.sda_t.oe),
|
||||
]
|
||||
|
||||
self.comb += [
|
||||
self.sda_t.oe.eq(self.sda_oe_n),
|
||||
# only change SDA when SCL is stable
|
||||
If(self.scl_i_n == i2c.scl_o,
|
||||
self.sda_t.oe.eq(~i2c.sda_o),
|
||||
),
|
||||
self.sda_t.o.eq(0),
|
||||
i2c.sda_i.eq(self.sda_t.i),
|
||||
]
|
||||
|
||||
# Event Manager.
|
||||
self.ev = EventManager()
|
||||
self.ev.idle = EventSourceProcess(edge="rising")
|
||||
self.ev.finalize()
|
||||
self.comb += self.ev.idle.trigger.eq(i2c.idle)
|
||||
|
||||
I2C_XFER_ADDR, I2C_CONFIG_ADDR = range(2)
|
||||
(
|
||||
I2C_ACK,
|
||||
I2C_READ,
|
||||
I2C_WRITE,
|
||||
I2C_START,
|
||||
I2C_STOP,
|
||||
I2C_IDLE,
|
||||
) = (1 << i for i in range(8, 14))
|
226
test/test_i2c.py
Executable file
226
test/test_i2c.py
Executable file
|
@ -0,0 +1,226 @@
|
|||
#!/usr/bin/env python3
|
||||
#
|
||||
# This file is part of MiSoC and has been adapted/modified for Litex.
|
||||
#
|
||||
# Copyright 2007-2023 / M-Labs Ltd
|
||||
# Copyright 2012-2015 / Enjoy-Digital
|
||||
# Copyright from Misoc LICENCE file added above
|
||||
#
|
||||
# Copyright 2023 Andrew Dennison <andrew@motec.com.au>
|
||||
#
|
||||
# SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
import unittest
|
||||
|
||||
from migen import *
|
||||
from migen.fhdl.specials import Tristate
|
||||
|
||||
from litex.soc.cores.i2c import *
|
||||
|
||||
|
||||
class _MockPads:
|
||||
def __init__(self):
|
||||
self.scl = Signal()
|
||||
self.sda = Signal()
|
||||
|
||||
|
||||
class _MockTristateImpl(Module):
|
||||
def __init__(self, t):
|
||||
t.i_mock = Signal(reset=True)
|
||||
self.comb += [
|
||||
If(t.oe,
|
||||
t.target.eq(t.o),
|
||||
t.i.eq(t.o),
|
||||
).Else(
|
||||
t.target.eq(t.i_mock),
|
||||
t.i.eq(t.i_mock),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
class _MockTristate:
|
||||
"""A mock `Tristate` for simulation
|
||||
|
||||
This simulation ensures the TriState input (_i) tracks the output (_o) when output enable
|
||||
(_oe) = 1. A new i_mock `Signal` is added - this can be written to in the simulation to represent
|
||||
input from the external device.
|
||||
|
||||
Example usage:
|
||||
|
||||
class TestMyModule(unittest.TestCase):
|
||||
def test_mymodule(self):
|
||||
dut = MyModule()
|
||||
io = Signal()
|
||||
dut.io_t = TSTriple()
|
||||
self.io_tristate = self.io_t.get_tristate(io)
|
||||
|
||||
dut.comb += [
|
||||
dut.io_t.oe.eq(signal_for_oe),
|
||||
dut.io_t.o.eq(signal_for_o),
|
||||
signal_for_i.eq(dut.io_t.i),
|
||||
]
|
||||
|
||||
def generator()
|
||||
yield dut.io_tristate.i_mock.eq(some_value)
|
||||
if (yield dut.io_t.oe):
|
||||
self.assertEqual((yield dut.scl_t.i), (yield dut.io_t.o))
|
||||
else:
|
||||
self.assertEqual((yield dut.scl_t.i), some_value)
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def lower(t):
|
||||
return _MockTristateImpl(t)
|
||||
|
||||
|
||||
class TestI2C(unittest.TestCase):
|
||||
def test_i2c(self):
|
||||
pads = _MockPads()
|
||||
dut = I2CMaster(pads)
|
||||
|
||||
def check_trans(scl, sda, msg=""):
|
||||
scl, sda = int(scl), int(sda)
|
||||
scl_init, sda_init = (yield dut.scl_t.i), (yield dut.sda_t.i)
|
||||
timeout = 0
|
||||
while True:
|
||||
scl_now, sda_now = (yield dut.scl_t.i), (yield dut.sda_t.i)
|
||||
if scl_now == scl and sda_now == sda:
|
||||
return
|
||||
timeout += 1
|
||||
self.assertLess(timeout, 20,
|
||||
f"\n*** {msg} timeout. Waiting for: " +
|
||||
f"scl:{scl_now} checking:{scl_init}=>{scl} " +
|
||||
f"sda:{sda_now} checking:{sda_init}=>{sda} ***"
|
||||
)
|
||||
yield
|
||||
|
||||
def wait_idle(do=lambda: ()):
|
||||
timeout = 0
|
||||
while True:
|
||||
timeout += 1
|
||||
self.assertLess(timeout, 20)
|
||||
idle = ((yield from dut.bus.read(I2C_XFER_ADDR)) & I2C_IDLE) != 0
|
||||
if idle:
|
||||
return
|
||||
yield
|
||||
|
||||
def write_bit(value):
|
||||
# print(f"write_bit:{value}")
|
||||
yield from check_trans(scl=False, sda=value)
|
||||
yield from check_trans(scl=True, sda=value)
|
||||
|
||||
def write_ack(value):
|
||||
# print(f"write_ack:{value}")
|
||||
yield from check_trans(scl=False, sda=not value)
|
||||
yield from check_trans(scl=True, sda=not value)
|
||||
yield from wait_idle()
|
||||
|
||||
def read_bit(value):
|
||||
print(f"read_bit:{value}")
|
||||
yield dut.sda_tristate.i_mock.eq(value)
|
||||
yield from check_trans(scl=True, sda=value)
|
||||
yield from check_trans(scl=False, sda=value)
|
||||
yield dut.sda_tristate.i_mock.eq(True)
|
||||
|
||||
def read_ack(value):
|
||||
#print(f"read_ack:{value}")
|
||||
yield from check_trans(scl=False, sda=True)
|
||||
yield dut.sda_tristate.i_mock.eq(not value)
|
||||
yield from check_trans(scl=True, sda=not value)
|
||||
yield from wait_idle()
|
||||
yield dut.sda_tristate.i_mock.eq(True)
|
||||
ack = ((yield from dut.bus.read(I2C_XFER_ADDR)) & I2C_ACK) != 0
|
||||
self.assertEqual(ack, value)
|
||||
|
||||
def i2c_restart():
|
||||
yield from check_trans(scl=False, sda=True, msg="checking restart precondition")
|
||||
yield from dut.bus.write(I2C_XFER_ADDR, I2C_START)
|
||||
yield from check_trans(scl=False, sda=True, msg="checking restart0")
|
||||
yield from check_trans(scl=True, sda=True, msg="checking restart1")
|
||||
yield from check_trans(scl=True, sda=False, msg="checking start0")
|
||||
yield from wait_idle()
|
||||
|
||||
def i2c_start():
|
||||
yield from check_trans(scl=True, sda=True, msg="checking start precondition")
|
||||
yield from dut.bus.write(I2C_XFER_ADDR, I2C_START)
|
||||
yield from check_trans(scl=True, sda=False, msg="checking start0")
|
||||
yield from wait_idle()
|
||||
|
||||
def i2c_stop():
|
||||
yield from check_trans(scl=False, sda=True, msg="checking stop after read or write")
|
||||
yield from dut.bus.write(I2C_XFER_ADDR, I2C_STOP)
|
||||
yield from check_trans(scl=False, sda=False, msg="checking STOP0")
|
||||
yield from check_trans(scl=True, sda=False, msg="checking STOP1")
|
||||
yield from check_trans(scl=True, sda=True, msg="checking STOP2")
|
||||
yield from wait_idle()
|
||||
|
||||
def i2c_write(value, ack=True):
|
||||
value = int(value)
|
||||
test_bin = "{0:08b}".format(value)
|
||||
# print(f"I2C_WRITE | {hex(value)}:0x{test_bin}")
|
||||
yield from dut.bus.write(I2C_XFER_ADDR, I2C_WRITE | value)
|
||||
for i in list(test_bin):
|
||||
yield from write_bit(int(i))
|
||||
yield from read_ack(True)
|
||||
|
||||
def i2c_read(value, ack=True):
|
||||
value = int(value)
|
||||
test_bin = "{0:08b}".format(value)
|
||||
print(f"I2C_READ | {hex(value)}:0x{test_bin}")
|
||||
yield from dut.bus.write(I2C_XFER_ADDR, I2C_READ | (I2C_ACK if ack else 0))
|
||||
for i in list(test_bin):
|
||||
yield from read_bit(int(i))
|
||||
yield dut.sda_tristate.i_mock.eq(True)
|
||||
data = (yield from dut.bus.read(I2C_XFER_ADDR)) & 0xFF
|
||||
self.assertEqual(data, value)
|
||||
yield from write_ack(ack)
|
||||
|
||||
def check():
|
||||
yield from dut.bus.write(I2C_CONFIG_ADDR, 4)
|
||||
data = (yield from dut.bus.read(I2C_CONFIG_ADDR)) & 0xFF
|
||||
self.assertEqual(data, 4)
|
||||
|
||||
print("write 1 byte 0x18 to address 0x41")
|
||||
yield from i2c_start()
|
||||
yield from i2c_write(0x41 << 1 | 0)
|
||||
yield from i2c_write(0x18, ack=False)
|
||||
yield from i2c_stop()
|
||||
|
||||
print("read 1 byte from address 0x41")
|
||||
yield from i2c_start()
|
||||
yield from i2c_write(0x41 << 1 | 1)
|
||||
yield from i2c_read(0x18, ack=False)
|
||||
|
||||
print("write 2 bytes 0x10 0x00 to address 0x11")
|
||||
yield from i2c_restart()
|
||||
yield from i2c_write(0x11 << 1 | 0)
|
||||
yield from i2c_write(0x10, ack=True)
|
||||
yield from i2c_write(0x00, ack=False)
|
||||
yield from i2c_stop()
|
||||
|
||||
print("read 1 byte from address 0x11")
|
||||
yield from i2c_start()
|
||||
yield from i2c_write(0x11 << 1 | 1)
|
||||
yield from i2c_read(0x81, ack=False)
|
||||
|
||||
print("read 2 bytes from address 0x55")
|
||||
yield from i2c_restart()
|
||||
yield from i2c_write(0x55 << 1 | 1)
|
||||
yield from i2c_read(0xDE, ack=True)
|
||||
yield from i2c_read(0xAD, ack=False)
|
||||
yield from i2c_stop()
|
||||
|
||||
clocks = {
|
||||
"sys": 10,
|
||||
"async": (10, 3),
|
||||
}
|
||||
generators = {
|
||||
"sys": [
|
||||
check(),
|
||||
],
|
||||
}
|
||||
run_simulation(dut, generators, clocks, special_overrides={Tristate: _MockTristate}, vcd_name="i2c.vcd")
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
Reference in a new issue