from lib.sata.common import * from lib.sata.link import SATALink from lib.sata.transport import SATATransport from lib.sata.command import SATACommand from lib.sata.test.hdd import * from lib.sata.test.common import * class CommandTXPacket(list): def __init__(self, write=0, read=0, identify=0, sector=0, count=0, data=[]): self.ongoing = False self.done = False self.write = write self.read = read self.identify = identify self.sector = sector self.count = count for d in data: self.append(d) class CommandStreamer(PacketStreamer): def __init__(self): PacketStreamer.__init__(self, command_tx_description(32), CommandTXPacket) def do_simulation(self, selfp): PacketStreamer.do_simulation(self, selfp) selfp.source.write = self.packet.write selfp.source.read = self.packet.read selfp.source.identify = self.packet.identify selfp.source.sector = self.packet.sector selfp.source.count = self.packet.count class CommandRXPacket(list): def __init__(self): self.ongoing = False self.done = False self.write = 0 self.read = 0 self.identify = 0 self.success = 0 self.failed = 0 class CommandLogger(PacketLogger): def __init__(self): PacketLogger.__init__(self, command_rx_description(32), CommandRXPacket) def do_simulation(self, selfp): selfp.sink.ack = 1 if selfp.sink.stb == 1 and selfp.sink.sop == 1: self.packet = CommandRXPacket() self.packet.write = selfp.sink.write self.packet.read = selfp.sink.read self.packet.identify = selfp.sink.identify self.packet.sucess = selfp.sink.success self.packet.failed = selfp.sink.failed self.packet.append(selfp.sink.data) elif selfp.sink.stb: self.packet.append(selfp.sink.data) if selfp.sink.stb == 1 and selfp.sink.eop == 1: self.packet.done = True class TB(Module): def __init__(self): self.hdd = HDD( link_debug=False, link_random_level=50, transport_debug=False, transport_loopback=False, hdd_debug=True) self.link = SATALink(self.hdd.phy) self.transport = SATATransport(self.link) self.command = SATACommand(self.transport) self.streamer = CommandStreamer() self.streamer_randomizer = Randomizer(command_tx_description(32), level=50) self.logger = CommandLogger() self.logger_randomizer = Randomizer(command_rx_description(32), level=50) self.pipeline = Pipeline( self.streamer, self.streamer_randomizer, self.command, self.logger_randomizer, self.logger ) def gen_simulation(self, selfp): hdd = self.hdd hdd.malloc(0, 64) write_data = [i for i in range(hdd.sectors2dwords(2))] write_len = hdd.dwords2sectors(len(write_data)) write_packet = CommandTXPacket(write=1, sector=2, count=write_len, data=write_data) yield from self.streamer.send(write_packet) yield from self.logger.receive() read_packet = CommandTXPacket(read=1, sector=2, count=write_len) yield from self.streamer.send(read_packet) yield from self.logger.receive() read_data = self.logger.packet # check results s, l, e = check(write_data, read_data) print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e)) identify_packet = CommandTXPacket(identify=1) yield from self.streamer.send(identify_packet) if __name__ == "__main__": run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)