import random from copy import deepcopy from litex.gen import * from litex.soc.interconnect.stream import Sink, Source from liteeth.common import * def print_with_prefix(s, prefix=""): if not isinstance(s, str): s = s.__repr__() s = s.split("\n") for l in s: print(prefix + l) def seed_to_data(seed, random=True): if random: return (seed * 0x31415979 + 1) & 0xffffffff else: return seed def split_bytes(v, n, endianness="big"): r = [] r_bytes = v.to_bytes(n, byteorder=endianness) for byte in r_bytes: r.append(int(byte)) return r def merge_bytes(b, endianness="big"): return int.from_bytes(bytes(b), endianness) def get_field_data(field, datas): v = merge_bytes(datas[field.byte:field.byte+math.ceil(field.width/8)]) return (v >> field.offset) & (2**field.width-1) def comp(p1, p2): r = True for x, y in zip(p1, p2): if x != y: r = False return r def check(p1, p2): p1 = deepcopy(p1) p2 = deepcopy(p2) if isinstance(p1, int): return 0, 1, int(p1 != p2) else: if len(p1) >= len(p2): ref, res = p1, p2 else: ref, res = p2, p1 shift = 0 while((ref[0] != res[0]) and (len(res) > 1)): res.pop(0) shift += 1 length = min(len(ref), len(res)) errors = 0 for i in range(length): if ref.pop(0) != res.pop(0): errors += 1 return shift, length, errors def randn(max_n): return random.randint(0, max_n-1) class Packet(list): def __init__(self, init=[]): self.ongoing = False self.done = False for data in init: self.append(data) class PacketStreamer(Module): def __init__(self, description, last_be=None): self.source = Source(description) self.last_be = last_be # # # self.packets = [] self.packet = Packet() self.packet.done = True def send(self, packet): packet = deepcopy(packet) self.packets.append(packet) return packet def send_blocking(self, packet): packet = self.send(packet) while not packet.done: yield def do_simulation(self, selfp): if len(self.packets) and self.packet.done: self.packet = self.packets.pop(0) if not self.packet.ongoing and not self.packet.done: selfp.source.stb = 1 if self.source.description.packetized: selfp.source.sop = 1 selfp.source.data = self.packet.pop(0) self.packet.ongoing = True elif selfp.source.stb == 1 and selfp.source.ack == 1: if self.source.description.packetized: selfp.source.sop = 0 if len(self.packet) == 1: selfp.source.eop = 1 if self.last_be is not None: selfp.source.last_be = self.last_be else: selfp.source.eop = 0 if self.last_be is not None: selfp.source.last_be = 0 if len(self.packet) > 0: selfp.source.stb = 1 selfp.source.data = self.packet.pop(0) else: self.packet.done = True selfp.source.stb = 0 class PacketLogger(Module): def __init__(self, description): self.sink = Sink(description) # # # self.packet = Packet() def receive(self): self.packet.done = False while not self.packet.done: yield def do_simulation(self, selfp): selfp.sink.ack = 1 if selfp.sink.stb: if self.sink.description.packetized: if selfp.sink.sop: self.packet = Packet() self.packet.append(selfp.sink.data) else: self.packet.append(selfp.sink.data) if selfp.sink.eop: self.packet.done = True else: self.packet.append(selfp.sink.data) class AckRandomizer(Module): def __init__(self, description, level=0): self.level = level self.sink = Sink(description) self.source = Source(description) self.run = Signal() self.comb += \ If(self.run, Record.connect(self.sink, self.source) ).Else( self.source.stb.eq(0), self.sink.ack.eq(0), ) def do_simulation(self, selfp): n = randn(100) if n < self.level: selfp.run = 0 else: selfp.run = 1