import operator from collections import defaultdict from migen.fhdl.structure import * from migen.fhdl.structure import _Operator, _Assign, _Fragment from migen.fhdl.tools import list_inputs __all__ = ["Simulator"] class ClockState: def __init__(self, period, times_before_tick): self.period = period self.times_before_tick = times_before_tick class TimeManager: def __init__(self, description): self.clocks = dict() for k, v in description.items(): if not isinstance(v, tuple): v = v, 0 self.clocks[k] = ClockState(v[0], v[0] - v[1]) def tick(self): r = set() dt = min(cs.times_before_tick for cs in self.clocks.values()) for k, cs in self.clocks.items(): if cs.times_before_tick == dt: r.add(k) cs.times_before_tick -= dt if not cs.times_before_tick: cs.times_before_tick += cs.period return r str2op = { "~": operator.invert, "+": operator.add, "-": operator.sub, "*": operator.mul, ">>>": operator.rshift, "<<<": operator.lshift, "&": operator.and_, "^": operator.xor, "|": operator.or_, "<": operator.lt, "<=": operator.le, "==": operator.eq, "!=": operator.ne, ">": operator.gt, ">=": operator.ge, } class Evaluator: def __init__(self): self.signal_values = dict() self.modifications = dict() def commit(self): r = set() for k, v in self.modifications.items(): if k not in self.signal_values or self.signal_values[k] != v: self.signal_values[k] = v r.add(k) self.modifications.clear() return r def eval(self, node): if isinstance(node, (int, bool)): return node elif isinstance(node, Signal): try: return self.signal_values[node] except KeyError: return node.reset elif isinstance(node, _Operator): operands = [self.eval(o) for o in node.operands] if node.op == "-": if len(operands) == 1: return -operands[0] else: return operands[0] - operands[1] else: return str2op[node.op](*operands) else: # TODO: Cat, Slice, Array, ClockSignal, ResetSignal, Memory raise NotImplementedError def assign(self, signal, value): value = value & (2**signal.nbits - 1) if signal.signed and (value & 2**(signal.nbits - 1)): value -= 2**signal.nbits self.modifications[signal] = value def execute(self, statements): for s in statements: if isinstance(s, _Assign): value = self.eval(s.r) if isinstance(s.l, Signal): self.assign(s.l, value) else: # TODO: Cat, Slice, Array, ClockSignal, ResetSignal, Memory raise NotImplementedError elif isinstance(s, If): if self.eval(s.cond): self.execute(s.t) else: self.execute(s.f) else: # TODO: Case raise NotImplementedError # TODO: instances via Iverilog/VPI # TODO: VCD output class Simulator: def __init__(self, fragment_or_module, generators, clocks={"sys": 100}): if isinstance(fragment_or_module, _Fragment): self.fragment = fragment_or_module else: self.fragment = fragment_or_module.get_fragment() if not isinstance(generators, dict): generators = {"sys": generators} self.generators = dict() for k, v in generators.items(): if isinstance(v, list): self.generators[k] = v else: self.generators[k] = [v] # TODO: insert_resets self.time = TimeManager(clocks) self.evaluator = Evaluator() self.comb_dependent_statements = defaultdict(list) for statement in self.fragment.comb: for signal in list_inputs(statement): self.comb_dependent_statements[signal].append(statement) def _commit_and_comb_propagate(self): modified = self.evaluator.commit() while modified: for signal in modified: self.evaluator.execute(self.comb_dependent_statements[signal]) modified = self.evaluator.commit() def _eval_nested_lists(self, x): if isinstance(x, list): return [self._eval_nested_lists(e) for e in x] elif isinstance(x, Signal): return self.evaluator.eval(x) else: raise ValueError def _process_generators(self, cd): exhausted = [] for generator in self.generators[cd]: reply = None while True: try: request = generator.send(reply) if request is None: break # next cycle elif isinstance(request, tuple): self.evaluator.assign(*request) else: reply = self._eval_nested_lists(request) except StopIteration: exhausted.append(generator) break for generator in exhausted: self.generators[cd].remove(generator) def _continue_simulation(self): # TODO: passive generators return any(self.generators.values()) def run(self): self.evaluator.execute(self.fragment.comb) self._commit_and_comb_propagate() while True: cds = self.time.tick() for cd in cds: if cd in self.fragment.sync: self.evaluator.execute(self.fragment.sync[cd]) if cd in self.generators: self._process_generators(cd) self._commit_and_comb_propagate() if not self._continue_simulation(): break