mirror of
https://github.com/enjoy-digital/litex.git
synced 2025-01-04 09:52:26 -05:00
70 lines
2.4 KiB
Python
70 lines
2.4 KiB
Python
# This file is Copyright (c) 2015-2019 Florent Kermarrec <florent@enjoy-digital.fr>
|
|
# This file is Copyright (c) 2016 Tim 'mithro' Ansell <mithro@mithis.com>
|
|
# License: BSD
|
|
|
|
import socket
|
|
|
|
from litex.tools.remote.etherbone import EtherbonePacket, EtherboneRecord
|
|
from litex.tools.remote.etherbone import EtherboneReads, EtherboneWrites
|
|
from litex.tools.remote.etherbone import EtherboneIPC
|
|
from litex.tools.remote.csr_builder import CSRBuilder
|
|
|
|
|
|
class RemoteClient(EtherboneIPC, CSRBuilder):
|
|
def __init__(self, host="localhost", port=1234, csr_csv="csr.csv", csr_data_width=None, debug=False):
|
|
if csr_csv is not None:
|
|
CSRBuilder.__init__(self, self, csr_csv, csr_data_width)
|
|
else:
|
|
assert csr_data_width is not None
|
|
self.host = host
|
|
self.port = port
|
|
self.debug = debug
|
|
|
|
def open(self):
|
|
if hasattr(self, "socket"):
|
|
return
|
|
self.socket = socket.create_connection((self.host, self.port), 5.0)
|
|
self.socket.settimeout(5.0)
|
|
|
|
def close(self):
|
|
if not hasattr(self, "socket"):
|
|
return
|
|
self.socket.close()
|
|
del self.socket
|
|
|
|
def read(self, addr, length=None):
|
|
length_int = 1 if length is None else length
|
|
# prepare packet
|
|
record = EtherboneRecord()
|
|
record.reads = EtherboneReads(addrs=[addr + 4*j for j in range(length_int)])
|
|
record.rcount = len(record.reads)
|
|
|
|
# send packet
|
|
packet = EtherbonePacket()
|
|
packet.records = [record]
|
|
packet.encode()
|
|
self.send_packet(self.socket, packet[:])
|
|
|
|
# receive response
|
|
packet = EtherbonePacket(self.receive_packet(self.socket))
|
|
packet.decode()
|
|
datas = packet.records.pop().writes.get_datas()
|
|
if self.debug:
|
|
for i, data in enumerate(datas):
|
|
print("read {:08x} @ {:08x}".format(data, addr + 4*i))
|
|
return datas[0] if length is None else datas
|
|
|
|
def write(self, addr, datas):
|
|
datas = datas if isinstance(datas, list) else [datas]
|
|
record = EtherboneRecord()
|
|
record.writes = EtherboneWrites(base_addr=addr, datas=[d for d in datas])
|
|
record.wcount = len(record.writes)
|
|
|
|
packet = EtherbonePacket()
|
|
packet.records = [record]
|
|
packet.encode()
|
|
self.send_packet(self.socket, packet)
|
|
|
|
if self.debug:
|
|
for i, data in enumerate(datas):
|
|
print("write {:08x} @ {:08x}".format(data, addr + 4*i))
|