soc/tools/litex_term: continue cleanup
This commit is contained in:
parent
ed2e623994
commit
247ecc5d8a
|
@ -32,20 +32,19 @@ else:
|
|||
|
||||
|
||||
sfl_magic_len = 14
|
||||
sfl_magic_req = bytes("sL5DdSMmkekro\n", "utf-8")
|
||||
sfl_magic_ack = bytes("z6IHG7cYDID6o\n", "utf-8")
|
||||
sfl_magic_req = b"sL5DdSMmkekro\n"
|
||||
sfl_magic_ack = b"z6IHG7cYDID6o\n"
|
||||
|
||||
# General commands
|
||||
sfl_cmd_abort = 0x00
|
||||
sfl_cmd_load = 0x01
|
||||
sfl_cmd_jump = 0x02
|
||||
|
||||
sfl_cmd_abort = b"\x00"
|
||||
sfl_cmd_load = b"\x01"
|
||||
sfl_cmd_jump = b"\x02"
|
||||
|
||||
# Replies
|
||||
sfl_ack_success = 'K'
|
||||
sfl_ack_crcerror = 'C'
|
||||
sfl_ack_unknown = 'U'
|
||||
sfl_ack_error = 'E'
|
||||
sfl_ack_success = b"K"
|
||||
sfl_ack_crcerror = b"C"
|
||||
sfl_ack_unknown = b"U"
|
||||
sfl_ack_error = b"E"
|
||||
|
||||
|
||||
crc16_table = [
|
||||
|
@ -93,30 +92,18 @@ def crc16(l):
|
|||
|
||||
class SFLFrame:
|
||||
def __init__(self):
|
||||
self.length = None
|
||||
self.cmd = None
|
||||
self.payload = []
|
||||
self.crc = None
|
||||
self.raw = []
|
||||
self.cmd = bytes()
|
||||
self.payload = bytes()
|
||||
|
||||
def compute_crc(self):
|
||||
crc_data = []
|
||||
crc_data.append(self.cmd)
|
||||
for d in self.payload:
|
||||
crc_data.append(d)
|
||||
self.crc = crc16(crc_data)
|
||||
return self.crc
|
||||
return crc16(self.cmd + self.payload)
|
||||
|
||||
def encode(self):
|
||||
self.raw = []
|
||||
self.raw.append(self.length)
|
||||
self.compute_crc()
|
||||
for d in self.crc.to_bytes(2, "big"):
|
||||
self.raw.append(d)
|
||||
self.raw.append(self.cmd)
|
||||
for d in self.payload:
|
||||
self.raw.append(d)
|
||||
|
||||
packet = bytes([len(self.payload)])
|
||||
packet += self.compute_crc().to_bytes(2, "big")
|
||||
packet += self.cmd
|
||||
packet += self.payload
|
||||
return packet
|
||||
|
||||
class LiteXTerm:
|
||||
def __init__(self, kernel_image, kernel_address):
|
||||
|
@ -126,7 +113,7 @@ class LiteXTerm:
|
|||
self.reader_alive = False
|
||||
self.writer_alive = False
|
||||
|
||||
self.detect_magic_bytes = bytearray([0 for i in range(len(sfl_magic_req))])
|
||||
self.detect_magic_bytes = bytes(len(sfl_magic_req))
|
||||
|
||||
def open(self, port, speed):
|
||||
self.serial = serial.serial_for_url(
|
||||
|
@ -146,12 +133,11 @@ class LiteXTerm:
|
|||
self.serial.close()
|
||||
|
||||
def send_frame(self, frame):
|
||||
frame.encode()
|
||||
retry = 1
|
||||
while retry:
|
||||
self.serial.write(frame.raw)
|
||||
self.serial.write(frame.encode())
|
||||
# Get the reply from the device
|
||||
reply = self.serial.read().decode()
|
||||
reply = self.serial.read()
|
||||
if reply == sfl_ack_success:
|
||||
retry = 0
|
||||
elif reply == sfl_ack_crcerror:
|
||||
|
@ -173,12 +159,9 @@ class LiteXTerm:
|
|||
print("{}%\r".format(100*position//length), end="")
|
||||
frame = SFLFrame()
|
||||
frame_data = data[:251]
|
||||
frame.length = len(frame_data) + 4
|
||||
frame.cmd = sfl_cmd_load
|
||||
for d in current_address.to_bytes(4, "big"):
|
||||
frame.payload.append(d)
|
||||
for d in frame_data:
|
||||
frame.payload.append(d)
|
||||
frame.payload = current_address.to_bytes(4, "big")
|
||||
frame.payload += frame_data
|
||||
if self.send_frame(frame) == 0:
|
||||
return
|
||||
current_address += len(frame_data)
|
||||
|
@ -195,10 +178,8 @@ class LiteXTerm:
|
|||
def boot(self):
|
||||
print("[TERM] Booting the device.")
|
||||
frame = SFLFrame()
|
||||
frame.length = 4
|
||||
frame.cmd = sfl_cmd_jump
|
||||
for d in self.kernel_address.to_bytes(4, "big"):
|
||||
frame.payload.append(d)
|
||||
frame.payload = self.kernel_address.to_bytes(4, "big")
|
||||
self.send_frame(frame)
|
||||
|
||||
def detect_magic(self, data):
|
||||
|
@ -219,15 +200,15 @@ class LiteXTerm:
|
|||
def reader(self):
|
||||
try:
|
||||
while self.reader_alive:
|
||||
c = self.serial.read().decode()
|
||||
if c == '\r':
|
||||
sys.stdout.write('\n')
|
||||
c = self.serial.read()
|
||||
if c == b"\r":
|
||||
sys.stdout.write(b"\n")
|
||||
else:
|
||||
sys.stdout.write(c)
|
||||
sys.stdout.write(c.decode())
|
||||
sys.stdout.flush()
|
||||
|
||||
if self.kernel_image is not None:
|
||||
if self.detect_magic(bytes(c, "utf-8")):
|
||||
if self.detect_magic(c):
|
||||
self.answer_magic()
|
||||
|
||||
except serial.SerialException:
|
||||
|
|
Loading…
Reference in New Issue