mirror of
https://github.com/enjoy-digital/litex.git
synced 2025-01-04 09:52:26 -05:00
99 lines
3.5 KiB
Python
99 lines
3.5 KiB
Python
import math
|
|
|
|
from misoclib.mem.litesata.common import *
|
|
from misoclib.mem.litesata.test.common import *
|
|
|
|
from misoclib.mem.litesata.test.model.phy import *
|
|
from misoclib.mem.litesata.test.model.link import *
|
|
from misoclib.mem.litesata.test.model.transport import *
|
|
from misoclib.mem.litesata.test.model.command import *
|
|
|
|
|
|
def print_hdd(s, n=None):
|
|
print_with_prefix(s, "[HDD{}]: ".format("" if n is None else str(n)))
|
|
|
|
|
|
class HDDMemRegion:
|
|
def __init__(self, base, count, sector_size):
|
|
self.base = base
|
|
self.count = count
|
|
self.data = [0]*(count*sector_size//4)
|
|
|
|
|
|
class HDD(Module):
|
|
def __init__(self, n=None,
|
|
link_debug=False, link_random_level=0,
|
|
transport_debug=False, transport_loopback=False,
|
|
hdd_debug=False,
|
|
):
|
|
self.n = n
|
|
self.submodules.phy = PHYLayer()
|
|
self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
|
|
self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
|
|
self.submodules.command = CommandLayer(self.transport)
|
|
|
|
self.command.set_hdd(self)
|
|
|
|
self.debug = hdd_debug
|
|
self.mem = None
|
|
self.wr_sector = 0
|
|
self.wr_end_sector = 0
|
|
self.rd_sector = 0
|
|
self.rx_end_sector = 0
|
|
|
|
def malloc(self, sector, count):
|
|
if self.debug:
|
|
s = "Allocating {n} sectors: {s} to {e}".format(n=count, s=sector, e=sector+count-1)
|
|
s += " ({} KB)".format(count*logical_sector_size//1024)
|
|
print_hdd(s, self.n)
|
|
self.mem = HDDMemRegion(sector, count, logical_sector_size)
|
|
|
|
def write(self, sector, data):
|
|
n = math.ceil(dwords2sectors(len(data)))
|
|
if self.debug:
|
|
if n == 1:
|
|
s = "{}".format(sector)
|
|
else:
|
|
s = "{s} to {e}".format(s=sector, e=sector+n-1)
|
|
print_hdd("Writing sector " + s, self.n)
|
|
for i in range(len(data)):
|
|
offset = sectors2dwords(sector)
|
|
self.mem.data[offset+i] = data[i]
|
|
|
|
def read(self, sector, count):
|
|
if self.debug:
|
|
if count == 1:
|
|
s = "{}".format(sector)
|
|
else:
|
|
s = "{s} to {e}".format(s=sector, e=sector+count-1)
|
|
print_hdd("Reading sector " + s, self.n)
|
|
data = []
|
|
for i in range(sectors2dwords(count)):
|
|
data.append(self.mem.data[sectors2dwords(sector)+i])
|
|
return data
|
|
|
|
def write_dma_callback(self, fis):
|
|
self.wr_sector = fis.lba_lsb + (fis.lba_msb << 32)
|
|
self.wr_end_sector = self.wr_sector + fis.count
|
|
return [FIS_DMA_ACTIVATE_D2H()]
|
|
|
|
def read_dma_callback(self, fis):
|
|
self.rd_sector = fis.lba_lsb + (fis.lba_msb << 32)
|
|
self.rd_end_sector = self.rd_sector + fis.count
|
|
packets = []
|
|
while self.rd_sector != self.rd_end_sector:
|
|
count = min(self.rd_end_sector-self.rd_sector, (fis_max_dwords*4)//logical_sector_size)
|
|
packet = self.read(self.rd_sector, count)
|
|
packet.insert(0, 0)
|
|
packets.append(FIS_DATA(packet, direction="D2H"))
|
|
self.rd_sector += count
|
|
packets.append(FIS_REG_D2H())
|
|
return packets
|
|
|
|
def data_callback(self, fis):
|
|
self.write(self.wr_sector, fis.packet[1:])
|
|
self.wr_sector += dwords2sectors(len(fis.packet[1:]))
|
|
if self.wr_sector == self.wr_end_sector:
|
|
return [FIS_REG_D2H()]
|
|
else:
|
|
return [FIS_DMA_ACTIVATE_D2H()]
|