tools/remote/comms: base CommXY on CSRBuilder to allow using Comms directly in python scripts.

This way, user scripts can be use RemoteClient (communicating with the Server that has
already been opened on the right interface) or directly use CommXY in the scripts.

Using RemoteClient is more generic but can be slower (due to the Etherbone encoding between
the client and server). On fixed configuration using CommXY directly can then be faster
and also avoid manual opening of the server.
This commit is contained in:
Florent Kermarrec 2020-11-25 15:05:28 +01:00
parent 2c3687983c
commit 3d2574a488
5 changed files with 70 additions and 48 deletions

View File

@ -8,8 +8,13 @@ import os
import ctypes
import mmap
class CommPCIe:
def __init__(self, bar, debug=False):
from litex.tools.remote.csr_builder import CSRBuilder
# CommPCIe -----------------------------------------------------------------------------------------
class CommPCIe(CSRBuilder):
def __init__(self, bar, csr_csr=None, debug=False):
CSRBuilder.__init__(self, comm=self, csr_csv=csr_csv)
self.bar = bar
self.debug = debug

View File

@ -1,23 +1,29 @@
#
# This file is part of LiteX.
#
# Copyright (c) 2015-2019 Florent Kermarrec <florent@enjoy-digital.fr>
# Copyright (c) 2015-2020 Florent Kermarrec <florent@enjoy-digital.fr>
# SPDX-License-Identifier: BSD-2-Clause
import serial
import struct
from litex.tools.remote.csr_builder import CSRBuilder
# Constants ----------------------------------------------------------------------------------------
CMD_WRITE_BURST_INCR = 0x01
CMD_READ_BURST_INCR = 0x02
CMD_WRITE_BURST_FIXED = 0x03
CMD_READ_BURST_FIXED = 0x04
class CommUART:
def __init__(self, port, baudrate=115200, debug=False):
self.port = port
# CommUART -----------------------------------------------------------------------------------------
class CommUART(CSRBuilder):
def __init__(self, port, baudrate=115200, csr_csv=None, debug=False):
CSRBuilder.__init__(self, comm=self, csr_csv=csr_csv)
self.port = serial.serial_for_url(port, baudrate)
self.baudrate = str(baudrate)
self.debug = debug
self.port = serial.serial_for_url(port, baudrate)
self.debug = debug
def open(self):
if hasattr(self, "port"):
@ -50,9 +56,9 @@ class CommUART:
def read(self, addr, length=None, burst="incr"):
self._flush()
data = []
data = []
length_int = 1 if length is None else length
cmd = {
cmd = {
"incr" : CMD_READ_BURST_INCR,
"fixed": CMD_READ_BURST_FIXED,
}[burst]
@ -69,7 +75,7 @@ class CommUART:
def write(self, addr, data, burst="incr"):
self._flush()
data = data if isinstance(data, list) else [data]
data = data if isinstance(data, list) else [data]
length = len(data)
offset = 0
while length:

View File

@ -10,12 +10,16 @@ import socket
from litex.tools.remote.etherbone import EtherbonePacket, EtherboneRecord
from litex.tools.remote.etherbone import EtherboneReads, EtherboneWrites
from litex.tools.remote.csr_builder import CSRBuilder
class CommUDP:
def __init__(self, server="192.168.1.50", port=1234, debug=False):
# CommUDP ------------------------------------------------------------------------------------------
class CommUDP(CSRBuilder):
def __init__(self, server="192.168.1.50", port=1234, csr_csv=None, debug=False):
CSRBuilder.__init__(self, comm=self, csr_csv=csr_csv)
self.server = server
self.port = port
self.debug = debug
self.port = port
self.debug = debug
def open(self):
if hasattr(self, "socket"):

View File

@ -7,6 +7,8 @@
import usb.core
import time
from litex.tools.remote.csr_builder import CSRBuilder
# Wishbone USB Protocol Bridge
# ============================
#
@ -57,13 +59,16 @@ import time
# we only support 32-bit reads and writes, this is always 4. On big endian
# USB, this has the value {04, 00}.
class CommUSB:
def __init__(self, vid=None, pid=None, max_retries=10, debug=False):
self.vid = vid
self.pid = pid
self.debug = debug
# CommUSB ------------------------------------------------------------------------------------------
class CommUSB(CSRBuilder):
def __init__(self, vid=None, pid=None, max_retries=10, csr_csr=None, debug=False):
CSRBuilder.__init__(self, comm=self, csr_csv=csr_csv)
self.vid = vid
self.pid = pid
self.debug = debug
self.max_retries = max_retries
self.MAX_RECURSION_COUNT = 5
self.max_recursion_count = 5
def open(self):
if hasattr(self, "dev"):
@ -123,12 +128,12 @@ class CommUSB:
print("Access Denied. Maybe try using sudo?")
self.close()
self.open()
if depth < self.MAX_RECURSION_COUNT:
if depth < self.max_recursion_count:
return self.usb_read(addr, depth+1)
except TypeError:
self.close()
self.open()
if depth < self.MAX_RECURSION_COUNT:
if depth < self.max_recursion_count:
return self.usb_read(addr, depth+1)
def write(self, addr, data):
@ -154,5 +159,5 @@ class CommUSB:
print("Access Denied. Maybe try using sudo?")
self.close()
self.open()
if depth < self.MAX_RECURSION_COUNT:
if depth < self.max_recursion_count:
return self.usb_write(addr, value, depth+1)

View File

@ -1,12 +1,13 @@
#
# This file is part of LiteX.
#
# Copyright (c) 2015-2019 Florent Kermarrec <florent@enjoy-digital.fr>
# Copyright (c) 2015-2020 Florent Kermarrec <florent@enjoy-digital.fr>
# Copyright (c) 2016 Tim 'mithro' Ansell <mithro@mithis.com>
# SPDX-License-Identifier: BSD-2-Clause
import csv
# CSR Elements -------------------------------------------------------------------------------------
class CSRElements:
def __init__(self, d):
@ -23,16 +24,15 @@ class CSRElements:
pass
raise AttributeError("No such element " + attr)
class CSRRegister:
def __init__(self, readfn, writefn, name, addr, length, data_width, mode):
self.readfn = readfn
self.writefn = writefn
self.name = name
self.addr = addr
self.length = length
self.readfn = readfn
self.writefn = writefn
self.name = name
self.addr = addr
self.length = length
self.data_width = data_width
self.mode = mode
self.mode = mode
def read(self):
if self.mode not in ["rw", "ro"]:
@ -55,32 +55,34 @@ class CSRRegister:
datas.append((value >> ((self.length-1-i)*self.data_width)) & (2**self.data_width-1))
self.writefn(self.addr, datas)
class CSRMemoryRegion:
def __init__(self, base, size, type):
self.base = base
self.size = size
self.type = type
# CSR Builder --------------------------------------------------------------------------------------
class CSRBuilder:
def __init__(self, comm, csr_csv, csr_data_width=None):
self.items = self.get_csr_items(csr_csv)
self.constants = self.build_constants()
if csr_csv is not None:
self.items = self.get_csr_items(csr_csv)
self.constants = self.build_constants()
# Load csr_data_width from the constants, otherwise it must be provided
constant_csr_data_width = self.constants.d.get("config_csr_data_width", None)
if csr_data_width is None:
csr_data_width = constant_csr_data_width
if csr_data_width is None:
raise KeyError("csr_data_width not found in constants, please provide!")
if csr_data_width != constant_csr_data_width:
raise KeyError("csr_data_width of {} provided but {} found in constants".format(
csr_data_width, constant_csr_data_width))
# Load csr_data_width from the constants, otherwise it must be provided
constant_csr_data_width = self.constants.d.get("config_csr_data_width", None)
if csr_data_width is None:
csr_data_width = constant_csr_data_width
if csr_data_width is None:
raise KeyError("csr_data_width not found in constants, please provide!")
if csr_data_width != constant_csr_data_width:
raise KeyError("csr_data_width of {} provided but {} found in constants".format(
csr_data_width, constant_csr_data_width))
self.csr_data_width = csr_data_width
self.bases = self.build_bases()
self.regs = self.build_registers(comm.read, comm.write)
self.mems = self.build_memories()
self.csr_data_width = csr_data_width
self.bases = self.build_bases()
self.regs = self.build_registers(comm.read, comm.write)
self.mems = self.build_memories()
@staticmethod
def get_csr_items(csr_csv):