diff --git a/litex/soc/cores/i2c.py b/litex/soc/cores/i2c.py new file mode 100644 index 000000000..702b35c22 --- /dev/null +++ b/litex/soc/cores/i2c.py @@ -0,0 +1,291 @@ +# +# This file is part of MiSoC and has been adapted/modified for Litex. +# +# Copyright 2007-2023 / M-Labs Ltd +# Copyright 2012-2015 / Enjoy-Digital +# Copyright from Misoc LICENCE file added above +# +# Copyright 2023 Andrew Dennison +# +# SPDX-License-Identifier: BSD-2-Clause + +from migen import * +from litex.gen import * +from litex.soc.interconnect import wishbone +from litex.soc.interconnect.csr_eventmanager import * + +# I2C----------------------------------------------------------------------------------------------- + +__all__ = [ + "I2CMaster", + "I2C_XFER_ADDR", "I2C_CONFIG_ADDR", + "I2C_ACK", "I2C_READ", "I2C_WRITE", "I2C_STOP", "I2C_START", "I2C_IDLE", +] + + +class I2CClockGen(LiteXModule): + def __init__(self, width): + self.load = Signal(width) + self.clk2x = Signal() + + cnt = Signal.like(self.load) + self.comb += [ + self.clk2x.eq(cnt == 0), + ] + self.sync += [ + If(self.clk2x, + cnt.eq(self.load), + ).Else( + cnt.eq(cnt - 1), + ), + ] + + +class I2CMasterMachine(LiteXModule): + def __init__(self, clock_width): + self.scl_o = Signal(reset=1) + self.sda_o = Signal(reset=1) + self.sda_i = Signal() + + self.cg = CEInserter()(I2CClockGen(clock_width)) + self.idle = Signal() + self.start = Signal() + self.stop = Signal() + self.write = Signal() + self.read = Signal() + self.ack = Signal() + self.data = Signal(8) + + ### + + busy = Signal() + bits = Signal(4) + + fsm = CEInserter()(FSM("IDLE")) + self.fsm = fsm + + fsm.act("IDLE", + # Valid combinations (lowest to highest priority): + # stop: lowest priority + # read (& optional stop with automatic NACK) + # write (& optional stop) + # start (indicates start or restart) + # start & write (& optional stop) + # start & write & read (& optional stop) + # lowest priority + # *** TODO: support compound commands with I2CMaster *** + If(self.stop & ~self.scl_o, + # stop is only valid after an ACK + NextState("STOP0"), + ), + If(self.read, + # post decrement so read first bit and shift in 7 + NextValue(bits, 8-1), + NextState("READ0"), + ), + If(self.write, + NextValue(bits, 8), + NextState("WRITE0"), + ), + # start could be requesting a restart + If(self.start, + NextState("RESTART0"), + ), + # highest priority: start only if scl is high + If(self.start & self.scl_o, + NextState("START0"), + ), + ) + + fsm.act("START0", + # Always entered with scl_o = 1 + NextValue(self.sda_o, 0), + NextState("IDLE")) + + fsm.act("RESTART0", + # Only entered from IDLE with scl_o = 0 + NextValue(self.sda_o, 1), + NextState("RESTART1")) + fsm.act("RESTART1", + NextValue(self.scl_o, 1), + NextState("START0")) + + fsm.act("STOP0", + # Only entered from IDLE with scl_o = 0 + NextValue(self.sda_o, 0), + NextState("STOP1")) + fsm.act("STOP1", + NextValue(self.scl_o, 1), + NextState("STOP2")) + fsm.act("STOP2", + NextValue(self.sda_o, 1), + NextState("IDLE")) + + fsm.act("WRITE0", + NextValue(self.scl_o, 0), + If(bits == 0, + NextValue(self.sda_o, 1), + NextState("READACK0"), + ).Else( + NextValue(self.sda_o, self.data[7]), + NextState("WRITE1"), + ) + ) + fsm.act("WRITE1", + NextValue(self.scl_o, 1), + NextValue(self.data[1:], self.data[:-1]), + NextValue(bits, bits - 1), + NextState("WRITE0"), + ) + fsm.act("READACK0", + NextValue(self.scl_o, 1), + NextState("READACK1"), + ) + fsm.act("READACK1", + # ACK => IDLE always with scl_o = 0 + NextValue(self.scl_o, 0), + NextValue(self.ack, ~self.sda_i), + NextState("IDLE") + ) + + fsm.act("READ0", + # ACK => IDLE => READ0 always with scl_o = 0 + NextValue(self.scl_o, 1), + NextState("READ1"), + ) + fsm.act("READ1", + NextValue(self.data[0], self.sda_i), + NextValue(self.scl_o, 0), + If(bits == 0, + NextValue(self.sda_o, ~self.ack), + NextState("WRITEACK0"), + ).Else( + #NextValue(self.sda_o, 1), must already be high + NextState("READ2"), + ) + ) + fsm.act("READ2", + NextValue(self.scl_o, 1), + NextValue(self.data[1:], self.data[:-1]), + NextValue(bits, bits - 1), + NextState("READ1"), + ) + fsm.act("WRITEACK0", + NextValue(self.scl_o, 1), + NextState("WRITEACK1"), + ) + fsm.act("WRITEACK1", + # ACK => IDLE always with scl_o = 0 + NextValue(self.scl_o, 0), + NextValue(self.sda_o, 1), + NextState("IDLE") + ) + + run = Signal() + self.comb += [ + run.eq(self.start | self.stop | self.write | self.read), + self.idle.eq(~run & fsm.ongoing("IDLE")), + self.cg.ce.eq(~self.idle), + fsm.ce.eq(run | self.cg.clk2x), + ] + +# Registers: +# config = Record([ +# ("div", 20), +# ]) +# xfer = Record([ +# ("data", 8), +# ("ack", 1), +# ("read", 1), +# ("write", 1), +# ("start", 1), +# ("stop", 1), +# ("idle", 1), +# ]) +class I2CMaster(LiteXModule): + def __init__(self, pads, bus=None): + if bus is None: + bus = wishbone.Interface(data_width=32) + self.bus = bus + + ### + + # Wishbone + self.i2c = i2c = I2CMasterMachine( + clock_width=20) + + self.sync += [ + # read + If(bus.adr[0], + bus.dat_r.eq(i2c.cg.load), + ).Else( + bus.dat_r.eq(Cat(i2c.data, i2c.ack, C(0, 4), i2c.idle)), + ), + + # write + i2c.read.eq(0), + i2c.write.eq(0), + i2c.start.eq(0), + i2c.stop.eq(0), + + bus.ack.eq(0), + If(bus.cyc & bus.stb & ~bus.ack, + bus.ack.eq(1), + If(bus.we, + If(bus.adr[0], + i2c.cg.load.eq(bus.dat_w), + ).Else( + i2c.data.eq(bus.dat_w[0:8]), + i2c.ack.eq(bus.dat_w[8]), + i2c.read.eq(bus.dat_w[9]), + i2c.write.eq(bus.dat_w[10]), + i2c.start.eq(bus.dat_w[11]), + i2c.stop.eq(bus.dat_w[12]), + ) + ) + ) + ] + + # I/O + self.scl_t = TSTriple() + self.scl_tristate = self.scl_t.get_tristate(pads.scl) + self.comb += [ + self.scl_t.oe.eq(~i2c.scl_o), + self.scl_t.o.eq(0), + ] + + self.sda_t = TSTriple() + self.sda_tristate = self.sda_t.get_tristate(pads.sda) + + self.scl_i_n = Signal() # previous scl_t.i + self.sda_oe_n = Signal() # previous sda_t.oe + self.sync += [ + self.scl_i_n.eq(self.scl_t.i), + self.sda_oe_n.eq(self.sda_t.oe), + ] + + self.comb += [ + self.sda_t.oe.eq(self.sda_oe_n), + # only change SDA when SCL is stable + If(self.scl_i_n == i2c.scl_o, + self.sda_t.oe.eq(~i2c.sda_o), + ), + self.sda_t.o.eq(0), + i2c.sda_i.eq(self.sda_t.i), + ] + + # Event Manager. + self.ev = EventManager() + self.ev.idle = EventSourceProcess(edge="rising") + self.ev.finalize() + self.comb += self.ev.idle.trigger.eq(i2c.idle) + +I2C_XFER_ADDR, I2C_CONFIG_ADDR = range(2) +( + I2C_ACK, + I2C_READ, + I2C_WRITE, + I2C_START, + I2C_STOP, + I2C_IDLE, +) = (1 << i for i in range(8, 14)) diff --git a/test/test_i2c.py b/test/test_i2c.py new file mode 100755 index 000000000..37f72abda --- /dev/null +++ b/test/test_i2c.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +# +# This file is part of MiSoC and has been adapted/modified for Litex. +# +# Copyright 2007-2023 / M-Labs Ltd +# Copyright 2012-2015 / Enjoy-Digital +# Copyright from Misoc LICENCE file added above +# +# Copyright 2023 Andrew Dennison +# +# SPDX-License-Identifier: BSD-2-Clause + +import unittest + +from migen import * +from migen.fhdl.specials import Tristate + +from litex.soc.cores.i2c import * + + +class _MockPads: + def __init__(self): + self.scl = Signal() + self.sda = Signal() + + +class _MockTristateImpl(Module): + def __init__(self, t): + t.i_mock = Signal(reset=True) + self.comb += [ + If(t.oe, + t.target.eq(t.o), + t.i.eq(t.o), + ).Else( + t.target.eq(t.i_mock), + t.i.eq(t.i_mock), + ), + ] + + +class _MockTristate: + """A mock `Tristate` for simulation + + This simulation ensures the TriState input (_i) tracks the output (_o) when output enable + (_oe) = 1. A new i_mock `Signal` is added - this can be written to in the simulation to represent + input from the external device. + + Example usage: + + class TestMyModule(unittest.TestCase): + def test_mymodule(self): + dut = MyModule() + io = Signal() + dut.io_t = TSTriple() + self.io_tristate = self.io_t.get_tristate(io) + + dut.comb += [ + dut.io_t.oe.eq(signal_for_oe), + dut.io_t.o.eq(signal_for_o), + signal_for_i.eq(dut.io_t.i), + ] + + def generator() + yield dut.io_tristate.i_mock.eq(some_value) + if (yield dut.io_t.oe): + self.assertEqual((yield dut.scl_t.i), (yield dut.io_t.o)) + else: + self.assertEqual((yield dut.scl_t.i), some_value) + + """ + + @staticmethod + def lower(t): + return _MockTristateImpl(t) + + +class TestI2C(unittest.TestCase): + def test_i2c(self): + pads = _MockPads() + dut = I2CMaster(pads) + + def check_trans(scl, sda, msg=""): + scl, sda = int(scl), int(sda) + scl_init, sda_init = (yield dut.scl_t.i), (yield dut.sda_t.i) + timeout = 0 + while True: + scl_now, sda_now = (yield dut.scl_t.i), (yield dut.sda_t.i) + if scl_now == scl and sda_now == sda: + return + timeout += 1 + self.assertLess(timeout, 20, + f"\n*** {msg} timeout. Waiting for: " + + f"scl:{scl_now} checking:{scl_init}=>{scl} " + + f"sda:{sda_now} checking:{sda_init}=>{sda} ***" + ) + yield + + def wait_idle(do=lambda: ()): + timeout = 0 + while True: + timeout += 1 + self.assertLess(timeout, 20) + idle = ((yield from dut.bus.read(I2C_XFER_ADDR)) & I2C_IDLE) != 0 + if idle: + return + yield + + def write_bit(value): + # print(f"write_bit:{value}") + yield from check_trans(scl=False, sda=value) + yield from check_trans(scl=True, sda=value) + + def write_ack(value): + # print(f"write_ack:{value}") + yield from check_trans(scl=False, sda=not value) + yield from check_trans(scl=True, sda=not value) + yield from wait_idle() + + def read_bit(value): + print(f"read_bit:{value}") + yield dut.sda_tristate.i_mock.eq(value) + yield from check_trans(scl=True, sda=value) + yield from check_trans(scl=False, sda=value) + yield dut.sda_tristate.i_mock.eq(True) + + def read_ack(value): + #print(f"read_ack:{value}") + yield from check_trans(scl=False, sda=True) + yield dut.sda_tristate.i_mock.eq(not value) + yield from check_trans(scl=True, sda=not value) + yield from wait_idle() + yield dut.sda_tristate.i_mock.eq(True) + ack = ((yield from dut.bus.read(I2C_XFER_ADDR)) & I2C_ACK) != 0 + self.assertEqual(ack, value) + + def i2c_restart(): + yield from check_trans(scl=False, sda=True, msg="checking restart precondition") + yield from dut.bus.write(I2C_XFER_ADDR, I2C_START) + yield from check_trans(scl=False, sda=True, msg="checking restart0") + yield from check_trans(scl=True, sda=True, msg="checking restart1") + yield from check_trans(scl=True, sda=False, msg="checking start0") + yield from wait_idle() + + def i2c_start(): + yield from check_trans(scl=True, sda=True, msg="checking start precondition") + yield from dut.bus.write(I2C_XFER_ADDR, I2C_START) + yield from check_trans(scl=True, sda=False, msg="checking start0") + yield from wait_idle() + + def i2c_stop(): + yield from check_trans(scl=False, sda=True, msg="checking stop after read or write") + yield from dut.bus.write(I2C_XFER_ADDR, I2C_STOP) + yield from check_trans(scl=False, sda=False, msg="checking STOP0") + yield from check_trans(scl=True, sda=False, msg="checking STOP1") + yield from check_trans(scl=True, sda=True, msg="checking STOP2") + yield from wait_idle() + + def i2c_write(value, ack=True): + value = int(value) + test_bin = "{0:08b}".format(value) + # print(f"I2C_WRITE | {hex(value)}:0x{test_bin}") + yield from dut.bus.write(I2C_XFER_ADDR, I2C_WRITE | value) + for i in list(test_bin): + yield from write_bit(int(i)) + yield from read_ack(True) + + def i2c_read(value, ack=True): + value = int(value) + test_bin = "{0:08b}".format(value) + print(f"I2C_READ | {hex(value)}:0x{test_bin}") + yield from dut.bus.write(I2C_XFER_ADDR, I2C_READ | (I2C_ACK if ack else 0)) + for i in list(test_bin): + yield from read_bit(int(i)) + yield dut.sda_tristate.i_mock.eq(True) + data = (yield from dut.bus.read(I2C_XFER_ADDR)) & 0xFF + self.assertEqual(data, value) + yield from write_ack(ack) + + def check(): + yield from dut.bus.write(I2C_CONFIG_ADDR, 4) + data = (yield from dut.bus.read(I2C_CONFIG_ADDR)) & 0xFF + self.assertEqual(data, 4) + + print("write 1 byte 0x18 to address 0x41") + yield from i2c_start() + yield from i2c_write(0x41 << 1 | 0) + yield from i2c_write(0x18, ack=False) + yield from i2c_stop() + + print("read 1 byte from address 0x41") + yield from i2c_start() + yield from i2c_write(0x41 << 1 | 1) + yield from i2c_read(0x18, ack=False) + + print("write 2 bytes 0x10 0x00 to address 0x11") + yield from i2c_restart() + yield from i2c_write(0x11 << 1 | 0) + yield from i2c_write(0x10, ack=True) + yield from i2c_write(0x00, ack=False) + yield from i2c_stop() + + print("read 1 byte from address 0x11") + yield from i2c_start() + yield from i2c_write(0x11 << 1 | 1) + yield from i2c_read(0x81, ack=False) + + print("read 2 bytes from address 0x55") + yield from i2c_restart() + yield from i2c_write(0x55 << 1 | 1) + yield from i2c_read(0xDE, ack=True) + yield from i2c_read(0xAD, ack=False) + yield from i2c_stop() + + clocks = { + "sys": 10, + "async": (10, 3), + } + generators = { + "sys": [ + check(), + ], + } + run_simulation(dut, generators, clocks, special_overrides={Tristate: _MockTristate}, vcd_name="i2c.vcd") + +if __name__ == "__main__": + unittest.main()