f4pga: cleanup and style

Signed-off-by: Unai Martinez-Corral <umartinezcorral@antmicro.com>
This commit is contained in:
Unai Martinez-Corral 2022-03-04 05:13:42 +01:00
parent 26fb1d63b0
commit 636da72d32
19 changed files with 779 additions and 617 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
*.pyc
*.sw*
/f4pga/build/

View File

@ -1,31 +1,28 @@
#!/usr/bin/env python3
"""
sfbuild - Symbiflow Build System
F4PGA Build System
This tool allows for building FPGA targets (such as bitstreams) for any supported
platform with just one simple command and a project file.
This tool allows for building FPGA targets (such as bitstreams) for any supported platform with just one simple command
and a project file.
The idea is that sfbuild wraps all the tools needed by different platforms in
"modules", which define inputs/outputs and various parameters. This allows
sfbuild to resolve dependencies for any target provided that a "flow definition"
file exists for such target. The flow defeinition file list modules available for
that platform and may tweak some settings of those modules.
The idea is that F4PGA wraps all the tools needed by different platforms in "modules", which define inputs/outputs and
various parameters.
This allows F4PGA to resolve dependencies for any target provided that a "flow definition" file exists for such target.
The flow defeinition file list modules available for that platform and may tweak some settings of those modules.
A basic example of using sfbuild:
$ sfbuild build --platform arty_35 -t bitstream
A basic example of using F4PGA:
This will make sfbuild attempt to create a bitstream for arty_35 platform.
flow.json is a flow configuration file, which should be created for a project
that uses sfbuild. Iontains project-specific definitions needed within the flow,
such as list of source code files.
$ f4pga build --platform arty_35 -t bitstream
This will make F4PGA attempt to create a bitstream for arty_35 platform.
``flow.json`` is a flow configuration file, which should be created for a project that uses F4PGA.
Contains project-specific definitions needed within the flow, such as list of source code files.
"""
from pathlib import Path
from argparse import Namespace
import os
from sys import argv as sys_argv
from os import environ
import json
from json import load as json_load, loads as json_loads
from typing import Iterable
from colorama import Fore, Style
@ -34,11 +31,11 @@ from f4pga.common import (
fatal,
scan_modules,
set_verbosity_level,
sfprint
sfprint,
sub as common_sub
)
from f4pga.module import *
from f4pga.cache import SymbiCache
import f4pga.ugly as ugly
from f4pga.flow_config import (
ProjectFlowConfig,
FlowConfig,
@ -54,10 +51,10 @@ from f4pga.argparser import setup_argparser, get_cli_flow_config
SYMBICACHEPATH = '.symbicache'
binpath = os.path.realpath(os.path.join(os.path.dirname(os.path.realpath(os.sys.argv[0])), '..'))
binpath = str(Path(sys_argv[0]).resolve().parent.parent)
mypath = str(Path(__file__).resolve().parent)
share_dir_path = os.path.realpath(f"{environ.get('F4PGA_INSTALL_DIR', '/usr/local')}/xc7/install/share/symbiflow")
share_dir_path = str(Path(f"{environ.get('F4PGA_INSTALL_DIR', '/usr/local')}/xc7/install/share/symbiflow").resolve())
class DependencyNotProducedException(Exception):
dep_name: str
@ -86,8 +83,7 @@ def req_exists(r):
""" Checks whether a dependency exists on a drive. """
if type(r) is str:
if not os.path.isfile(r) and not os.path.islink(r) \
and not os.path.isdir(r):
if not Path(r).is_file() and not Path(r).is_symlink() and not Path(r).is_dir():
return False
elif type(r) is list:
return not (False in map(req_exists, r))
@ -471,9 +467,27 @@ def setup_resolution_env():
r_env = ResolutionEnv({
'shareDir': share_dir_path,
'binDir': os.path.realpath(os.path.join(share_dir_path, '../../bin'))
'binDir': str((Path(share_dir_path) / '../../bin').resolve())
})
r_env.add_values(ugly.generate_values())
def _noisy_warnings():
"""
Emit some noisy warnings.
"""
environ['OUR_NOISY_WARNINGS'] = 'noisy_warnings.log'
return 'noisy_warnings.log'
def _generate_values():
"""
Generate initial values, available in configs.
"""
return {
'prjxray_db': common_sub('prjxray-config').decode().replace('\n', ''),
'python3': common_sub('which', 'python3').decode().replace('\n', ''),
'noisyWarnings': _noisy_warnings()
}
r_env.add_values(_generate_values())
return r_env
def open_project_flow_config(path: str) -> ProjectFlowConfig:
@ -509,7 +523,7 @@ def get_platform_name_for_part(part_name: str):
differ only in a type of package they use.
"""
with (Path(mypath) / 'part_db.json').open('r') as rfptr:
return json.load(rfptr).get(part_name.upper())
return json_load(rfptr).get(part_name.upper())
def cmd_build(args: Namespace):
""" sfbuild's `build` command implementation """
@ -535,7 +549,7 @@ def cmd_build(args: Namespace):
fatal(-1, 'No configuration was provided. Use `--flow`, `--platform` or '
'`--part` to configure flow..')
platform_path = os.path.join(mypath, 'platforms', platform + '.json')
platform_path = str(Path(mypath) / f'platforms/{platform}.json')
platform_def = None
try:
with open(platform_path) as platform_file:
@ -550,7 +564,7 @@ def cmd_build(args: Namespace):
sfprint(2, 'Scanning modules...')
scan_modules(mypath)
flow_definition_dict = json.loads(platform_def)
flow_definition_dict = json_loads(platform_def)
flow_def = FlowDefinition(flow_definition_dict, r_env)
flow_cfg = FlowConfig(project_flow_cfg, flow_def, platform)

View File

@ -1,69 +1,158 @@
from argparse import ArgumentParser, Namespace
import re
from re import finditer as re_finditer
def _add_flow_arg(parser: ArgumentParser):
parser.add_argument('-f', '--flow', metavar='flow_path', type=str,
help='Path to flow definition file')
parser.add_argument(
'-f',
'--flow',
metavar='flow_path',
type=str,
help='Path to flow definition file'
)
def _setup_build_parser(parser: ArgumentParser):
_add_flow_arg(parser)
parser.add_argument('-t', '--target', metavar='target_name', type=str,
help='Perform stages necessary to acquire target')
parser.add_argument('--platform', metavar='platform_name',
help='Target platform_name')
parser.add_argument('-P', '--pretend', action='store_true',
help='Show dependency resolution without executing flow')
parser.add_argument('-i', '--info', action='store_true',
help='Display info about available targets')
parser.add_argument('-c', '--nocache', action='store_true',
help='Ignore caching and rebuild everything up to the '
'target.')
parser.add_argument('-S', '--stageinfo', nargs=1, metavar='stage_name',
help='Display info about stage')
parser.add_argument('-r', '--requirements', action='store_true',
help='Display info about project\'s requirements.')
parser.add_argument('-p', '--part', metavar='part_name',
help='Name of the target chip')
parser.add_argument('--dep', '-D', action='append', default=[])
parser.add_argument('--val', '-V', action='append', default=[])
parser.add_argument(
'-t',
'--target',
metavar='target_name',
type=str,
help='Perform stages necessary to acquire target'
)
parser.add_argument(
'--platform',
metavar='platform_name',
help='Target platform_name'
)
parser.add_argument(
'-P',
'--pretend',
action='store_true',
help='Show dependency resolution without executing flow'
)
parser.add_argument(
'-i',
'--info',
action='store_true',
help='Display info about available targets'
)
parser.add_argument(
'-c',
'--nocache',
action='store_true',
help='Ignore caching and rebuild everything up to the target.'
)
parser.add_argument(
'-S',
'--stageinfo',
nargs=1,
metavar='stage_name',
help='Display info about stage'
)
parser.add_argument(
'-r',
'--requirements',
action='store_true',
help='Display info about project\'s requirements.'
)
parser.add_argument(
'-p',
'--part',
metavar='part_name',
help='Name of the target chip'
)
parser.add_argument(
'--dep',
'-D',
action='append',
default=[]
)
parser.add_argument(
'--val',
'-V',
action='append',
default=[]
)
# Currently unsupported
parser.add_argument('-M', '--moduleinfo', nargs=1,
parser.add_argument(
'-M',
'--moduleinfo',
nargs=1,
metavar='module_name_or_path',
help='Display info about module. Requires `-p` option '
'in case of module name')
parser.add_argument('-T', '--take_explicit_paths', nargs='+',
metavar='<name=path, ...>', type=str,
help='Specify stage inputs explicitely. This might be '
'required if some files got renamed or deleted and '
'symbiflow is unable to deduce the flow that lead '
'to dependencies required by the requested stage')
help='Display info about module. Requires `-p` option in case of module name'
)
parser.add_argument(
'-T',
'--take_explicit_paths',
nargs='+',
metavar='<name=path, ...>',
type=str,
help='Specify stage inputs explicitely. This might be required if some files got renamed or deleted and '
'symbiflow is unable to deduce the flow that lead to dependencies required by the requested stage'
)
def _setup_show_dep_parser(parser: ArgumentParser):
parser.add_argument('-p', '--platform', metavar='platform_name', type=str,
help='Name of the platform (use to display '
'platform-specific values.')
parser.add_argument('-s', '--stage', metavar='stage_name', type=str,
help='Name of the stage (use if you want to set the '
'value only for that stage). Requires `-p`.')
parser.add_argument(
'-p',
'--platform',
metavar='platform_name',
type=str,
help='Name of the platform (use to display platform-specific values.'
)
parser.add_argument(
'-s',
'--stage',
metavar='stage_name',
type=str,
help='Name of the stage (use if you want to set the value only for that stage). Requires `-p`.'
)
_add_flow_arg(parser)
# Set up argument parser for the program. Pretty self-explanatory.
def setup_argparser():
"""
Set up argument parser for the program.
"""
parser = ArgumentParser(description='SymbiFlow Build System')
parser.add_argument('-v', '--verbose', action='count', default=0)
parser.add_argument('-s', '--silent', action='store_true')
parser.add_argument(
'-v',
'--verbose',
action='count',
default=0
)
parser.add_argument(
'-s',
'--silent',
action='store_true'
)
subparsers = parser.add_subparsers(dest='command')
build = subparsers.add_parser('build')
_setup_build_parser(build)
show_dep = subparsers.add_parser('showd',
description='Show the value(s) assigned to a '
'dependency')
_setup_build_parser(subparsers.add_parser('build'))
show_dep = subparsers.add_parser('showd', description='Show the value(s) assigned to a dependency')
_setup_show_dep_parser(show_dep)
return parser
def _parse_depval(depvalstr: str):
"""
Parse a dependency or value definition in form of:
@ -94,6 +183,7 @@ def _parse_depval(depvalstr: str):
return d
def _unescaped_matches(regexp: str, s: str, escape_chr='\\'):
"""
Find all occurences of a pattern in a string that contains escape sequences.
@ -109,8 +199,7 @@ def _unescaped_matches(regexp: str, s: str, escape_chr='\\'):
offsets = []
offset = 0
for sl in s.split(escape_chr):
l = len(sl)
if l <= 1:
if len(sl) <= 1:
continue
noescape = sl[(1 if offset != 0 else 0):]
for _ in noescape:
@ -118,7 +207,7 @@ def _unescaped_matches(regexp: str, s: str, escape_chr='\\'):
offset += 2
noescapes += noescape
iter = re.finditer(regexp, noescapes)
iter = re_finditer(regexp, noescapes)
for m in iter:
start = m.start()
@ -127,10 +216,13 @@ def _unescaped_matches(regexp: str, s: str, escape_chr='\\'):
off2 = end + offsets[end]
yield off1, off2
def _unescaped_separated(regexp: str, s: str, escape_chr='\\'):
""" Yields substrings of a string that contains escape sequences. """
last_end = 0;
def _unescaped_separated(regexp: str, s: str, escape_chr='\\'):
"""
Yields substrings of a string that contains escape sequences.
"""
last_end = 0
for start, end in _unescaped_matches(regexp, s, escape_chr=escape_chr):
yield s[last_end:start]
last_end = end
@ -139,6 +231,7 @@ def _unescaped_separated(regexp: str, s: str, escape_chr='\\'):
else:
yield ''
def _parse_cli_value(s: str):
"""
Parse a value/dependency passed to CLI
@ -207,6 +300,7 @@ def _parse_cli_value(s: str):
# String
return s.replace('\\', '')
def get_cli_flow_config(args: Namespace, platform: str):
def create_defdict():
return {

View File

@ -1,19 +1,13 @@
import os
import zlib
import json
from pathlib import Path
from zlib import adler32 as zlib_adler32
from json import dump as json_dump, load as json_load, JSONDecodeError
def _get_file_hash(path: str):
with open(path, 'rb') as f:
b = f.read()
return str(zlib.adler32(b))
class SymbiCache:
"""
`SymbiCache` is used to track changes among dependencies and keep
the status of the files on a persistent storage.
`SymbiCache` is used to track changes among dependencies and keep the status of the files on a persistent storage.
Files which are tracked get their checksums calculated and stored in a file.
If file's checksum differs from the one saved in a file, that means, the file
has changed.
If file's checksum differs from the one saved in a file, that means, the file has changed.
"""
hashes: 'dict[str, dict[str, str]]'
@ -21,8 +15,9 @@ class SymbiCache:
cachefile_path: str
def __init__(self, cachefile_path):
""" `chachefile_path` - path to a file used for persistent storage of
checksums. """
"""
`chachefile_path` - path to a file used for persistent storage of checksums.
"""
self.status = {}
self.cachefile_path = cachefile_path
@ -47,42 +42,38 @@ class SymbiCache:
self.status[path] = {}
self.status[path][consumer] = status
def _get_last_hash(self, path: str, consumer: str):
last_hashes = self.hashes.get(path)
if last_hashes is None:
return None
return last_hashes.get(consumer)
def update(self, path: str, consumer: str):
""" Add/remove a file to.from the tracked files, update checksum
if necessary and calculate status.
""" Add/remove a file to.from the tracked files, update checksum if necessary and calculate status.
Multiple hashes are stored per file, one for each consumer module.
"__target" is used as a convention for a "fake" consumer in case the file
is requested as a target and not used by a module within the active flow.
"__target" is used as a convention for a "fake" consumer in case the file is requested as a target and not used
by a module within the active flow.
"""
isdir = os.path.isdir(path)
if not (os.path.isfile(path) or os.path.islink(path) or isdir):
isdir = Path(path).is_dir()
if not (Path(path).is_file() or Path(path).is_symlink() or isdir):
self._try_pop_consumer(path, consumer)
return True
hash = 0 # Directories always get '0' hash.
if not isdir:
hash = _get_file_hash(path)
last_hash = self._get_last_hash(path, consumer)
with Path(path).open('rb') as rfptr:
hash = str(zlib_adler32(rfptr.read()))
last_hashes = self.hashes.get(path)
last_hash = None if last_hashes is None else last_hashes.get(consumer)
if hash != last_hash:
self._try_push_consumer_status(path, consumer, 'changed')
self._try_push_consumer_hash(path, consumer, hash)
return True
else:
self._try_push_consumer_status(path, consumer, 'same')
return False
def get_status(self, path: str, consumer: str):
""" Get status for a file with a given path.
returns 'untracked' if the file is not tracked or hasn't been
treated with `update` procedure before calling `get_status`. """
returns 'untracked' if the file is not tracked or hasn't been treated with `update` procedure before calling
`get_status`.
"""
statuses = self.status.get(path)
if not statuses:
return 'untracked'
@ -95,21 +86,18 @@ class SymbiCache:
"""Loads cache's state from the persistent storage"""
try:
with open(self.cachefile_path, 'r') as f:
b = f.read()
self.hashes = json.loads(b)
except json.JSONDecodeError as jerr:
print('WARNING: .symbicache is corrupted! '
'This will cause flow to re-execute from the beggining.')
with Path(self.cachefile_path).open('r') as rfptr:
self.hashes = json_load(rfptr)
except JSONDecodeError as jerr:
print("""WARNING: .symbicache is corrupted!
This will cause flow to re-execute from the beggining.""")
self.hashes = {}
except FileNotFoundError:
print('Couldn\'t open .symbicache cache file. '
'This will cause flow to re-execute from the beggining.')
print("""Couldn\'t open .symbicache cache file.
This will cause flow to re-execute from the beggining.""")
self.hashes = {}
def save(self):
"""Saves cache's state to the persistent storage"""
with open(self.cachefile_path, 'w') as f:
b = json.dumps(self.hashes, indent=4)
f.write(b)
"""Saves cache's state to the persistent storage."""
with Path(self.cachefile_path).open('w') as wfptr:
json_dump(str(self.hashes), wfptr, indent=4)

View File

@ -1,9 +1,11 @@
from pathlib import Path
from os import environ, listdir as os_listdir
from sys import argv as sys_argv
from argparse import Namespace
import subprocess
import os
import shutil
import sys
import re
from shutil import move as sh_mv
from subprocess import run
from re import match as re_match, finditer as re_finditer
def decompose_depname(name: str):
spec = 'req'
@ -16,6 +18,7 @@ def decompose_depname(name: str):
name = name[:len(name) - 1]
return name, spec
def with_qualifier(name: str, q: str) -> str:
if q == 'req':
return decompose_depname(name)[0]
@ -24,25 +27,33 @@ def with_qualifier(name: str, q: str) -> str:
if q == 'demand':
return decompose_depname(name)[0] + '!'
_sfbuild_module_collection_name_to_path = {}
def scan_modules(mypath: str):
global _sfbuild_module_collection_name_to_path
sfbuild_home = mypath
sfbuild_home_dirs = os.listdir(sfbuild_home)
sfbuild_home_dirs = os_listdir(sfbuild_home)
sfbuild_module_dirs = \
[dir for dir in sfbuild_home_dirs if re.match('.*_modules$', dir)]
_sfbuild_module_collection_name_to_path = \
dict([(re.match('(.*)_modules$', moddir).groups()[0],
os.path.join(sfbuild_home, moddir))
for moddir in sfbuild_module_dirs])
[dir for dir in sfbuild_home_dirs if re_match('.*_modules$', dir)]
_sfbuild_module_collection_name_to_path = dict([
(
re_match('(.*)_modules$', moddir).groups()[0],
str(Path(sfbuild_home) / moddir)
)
for moddir in sfbuild_module_dirs
])
"""Resolves module location from modulestr"""
def resolve_modstr(modstr: str):
"""
Resolves module location from modulestr.
"""
sl = modstr.split(':')
if len(sl) > 2:
raise Exception('Incorrect module sysntax. '
'Expected one \':\' or one \'::\'')
raise Exception('Incorrect module sysntax. Expected one \':\' or one \'::\'')
if len(sl) < 2:
return modstr
collection_name = sl[0]
@ -51,14 +62,13 @@ def resolve_modstr(modstr: str):
col_path = _sfbuild_module_collection_name_to_path.get(collection_name)
if not col_path:
fatal(-1, f'Module collection {collection_name} does not exist')
return os.path.join(col_path, module_filename)
return str(Path(col_path) / module_filename)
def deep(fun):
"""
Create a recursive string transform function for 'str | list | dict',
i.e a dependency
Create a recursive string transform function for 'str | list | dict', i.e a dependency.
"""
def d(paths, *args, **kwargs):
if type(paths) is str:
return fun(paths)
@ -66,18 +76,13 @@ def deep(fun):
return [d(p) for p in paths];
elif type(paths) is dict:
return dict([(k, d(p)) for k, p in paths.items()])
return d
def file_noext(path: str):
""" Return a file without it's extenstion"""
m = re.match('(.*)\\.[^.]*$', path)
if m:
path = m.groups()[0]
return path
class VprArgs:
""" Represents argument list for VPR (Versatile Place and Route) """
"""
Represents argument list for VPR (Versatile Place and Route).
"""
arch_dir: str
arch_def: str
@ -91,13 +96,13 @@ class VprArgs:
def __init__(self, share: str, eblif, values: Namespace,
sdc_file: 'str | None' = None,
vpr_extra_opts: 'list | None' = None):
self.arch_dir = os.path.join(share, 'arch')
self.arch_dir = str(Path(share) / 'arch')
self.arch_def = values.arch_def
self.lookahead = values.rr_graph_lookahead_bin
self.rr_graph = values.rr_graph_real_bin
self.place_delay = values.vpr_place_delay
self.device_name = values.vpr_grid_layout_name
self.eblif = os.path.realpath(eblif)
self.eblif = str(Path(eblif).resolve())
if values.vpr_options is not None:
self.optional = options_dict_to_list(values.vpr_options)
else:
@ -107,13 +112,17 @@ class VprArgs:
if sdc_file is not None:
self.optional += ['--sdc_file', sdc_file]
class SubprocessException(Exception):
return_code: int
def sub(*args, env=None, cwd=None):
""" Execute subroutine """
out = subprocess.run(args, capture_output=True, env=env, cwd=cwd)
def sub(*args, env=None, cwd=None):
"""
Execute subroutine.
"""
out = run(args, capture_output=True, env=env, cwd=cwd)
if out.returncode != 0:
print(f'[ERROR]: {args[0]} non-zero return code.\n'
f'stderr:\n{out.stderr.decode()}\n\n'
@ -121,8 +130,11 @@ def sub(*args, env=None, cwd=None):
exit(out.returncode)
return out.stdout
def vpr(mode: str, vprargs: VprArgs, cwd=None):
""" Execute `vpr` """
"""
Execute `vpr`.
"""
modeargs = []
if mode == 'pack':
@ -132,15 +144,17 @@ def vpr(mode: str, vprargs: VprArgs, cwd=None):
elif mode == 'route':
modeargs = ['--route']
return sub(*(['vpr',
return sub(*([
'vpr',
vprargs.arch_def,
vprargs.eblif,
'--device', vprargs.device_name,
'--read_rr_graph', vprargs.rr_graph,
'--read_router_lookahead', vprargs.lookahead,
'--read_placement_delay_lookup', vprargs.place_delay] +
modeargs + vprargs.optional),
cwd=cwd)
'--read_placement_delay_lookup', vprargs.place_delay
] + modeargs + vprargs.optional), cwd=cwd)
_vpr_specific_values = [
'arch_def',
@ -150,10 +164,13 @@ _vpr_specific_values = [
'vpr_grid_layout_name',
'vpr_options?'
]
def vpr_specific_values():
global _vpr_specific_values
return _vpr_specific_values
def options_dict_to_list(opt_dict: dict):
"""
Converts a dictionary of named options for CLI program to a list.
@ -167,36 +184,44 @@ def options_dict_to_list(opt_dict: dict):
opts.append(str(val))
return opts
def noisy_warnings(device):
""" Emit some noisy warnings """
os.environ['OUR_NOISY_WARNINGS'] = 'noisy_warnings-' + device + '_pack.log'
"""
Emit some noisy warnings.
"""
environ['OUR_NOISY_WARNINGS'] = f'noisy_warnings-{device}_pack.log'
def my_path():
""" Get current PWD """
mypath = os.path.realpath(sys.argv[0])
return os.path.dirname(mypath)
"""
Get current PWD.
"""
return str(Path(sys_argv[0]).resolve().parent)
def save_vpr_log(filename, build_dir=''):
""" Save VPR logic (moves the default output file into a desired path) """
shutil.move(os.path.join(build_dir, 'vpr_stdout.log'), filename)
"""
Save VPR logic (moves the default output file into a desired path).
"""
sh_mv(str(Path(build_dir) / 'vpr_stdout.log'), filename)
def fatal(code, message):
"""
Print a message informing about an error that has occured and terminate program
with a given return code.
Print a message informing about an error that has occured and terminate program with a given return code.
"""
raise(Exception(f'[FATAL ERROR]: {message}'))
exit(code)
class ResolutionEnv:
"""
ResolutionEnv is used to hold onto mappings for variables used in flow and
perform text substitutions using those variables.
Variables can be referred in any "resolvable" string using the following
syntax: 'Some static text ${variable_name}'. The '${variable_name}' part
will be replaced by the value associated with name 'variable_name', is such
mapping exists.
ResolutionEnv is used to hold onto mappings for variables used in flow and perform text substitutions using those
variables.
Variables can be referred in any "resolvable" string using the following syntax: 'Some static text ${variable_name}'.
The '${variable_name}' part will be replaced by the value associated with name 'variable_name', is such mapping
exists.
values: dict
"""
@ -209,15 +234,14 @@ class ResolutionEnv:
def resolve(self, s, final=False):
"""
Perform resolution on `s`.
`s` can be a `str`, a `dict` with arbitrary keys and resolvable values,
or a `list` of resolvable values.
`s` can be a `str`, a `dict` with arbitrary keys and resolvable values, or a `list` of resolvable values.
final=True - resolve any unknown variables into ''
This is a hack and probably should be removed in the future
"""
if type(s) is str:
match_list = list(re.finditer('\$\{([^${}]*)\}', s))
# Assumption: re.finditer finds matches in a left-to-right order
match_list = list(re_finditer('\$\{([^${}]*)\}', s))
# Assumption: re_finditer finds matches in a left-to-right order
match_list.reverse()
for match in match_list:
match_str = match.group(1)
@ -242,24 +266,30 @@ class ResolutionEnv:
return s
def add_values(self, values: dict):
""" Add mappings from `values`"""
"""
Add mappings from `values`.
"""
for k, v in values.items():
self.values[k] = self.resolve(v)
verbosity_level = 0
def sfprint(verbosity: int, *args):
""" Print with regards to currently set verbosity level """
def sfprint(verbosity: int, *args):
"""
Print with regards to currently set verbosity level.
"""
global verbosity_level
if verbosity <= verbosity_level:
print(*args)
def set_verbosity_level(level: int):
global verbosity_level
verbosity_level = level
def get_verbosity_level() -> int:
global verbosity_level
return verbosity_level

View File

@ -1 +0,0 @@
# This is only to make pydoc recognize this catalogue as a package

View File

@ -1,39 +1,20 @@
#!/usr/bin/python3
from pathlib import Path
from shutil import move as sh_mv
# Symbiflow Stage Module
from f4pga.common import vpr_specific_values, VprArgs, get_verbosity_level, sub
from f4pga.module import Module, ModuleContext
# ----------------------------------------------------------------------------- #
import os
from f4pga.common import *
from f4pga.module import *
# ----------------------------------------------------------------------------- #
def concat_fasm(fasm: str, fasm_extra: str, output: str):
fasm_data = None
fasm_extra_data = None
with open(fasm, 'r') as fasm_file, open(fasm_extra, 'r') as fasm_extra_file:
fasm_data = fasm_file.read()
fasm_extra_data = fasm_extra_file.read()
data = fasm_data + '\n' + fasm_extra_data
with open(output, 'w') as output_file:
output_file.write(data)
def fasm_output_path(build_dir: str, top: str):
return f'{build_dir}/{top}.fasm'
class FasmModule(Module):
def map_io(self, ctx: ModuleContext):
build_dir = os.path.dirname(ctx.takes.eblif)
build_dir = str(Path(ctx.takes.eblif).parent)
return {
'fasm': fasm_output_path(build_dir, ctx.values.top)
'fasm': f'{(Path(build_dir)/ctx.values.top)!s}.fasm'
}
def execute(self, ctx: ModuleContext):
build_dir = os.path.dirname(ctx.takes.eblif)
build_dir = str(Path(ctx.takes.eblif).parent)
vprargs = VprArgs(ctx.share, ctx.takes.eblif, ctx.values)
@ -43,10 +24,14 @@ class FasmModule(Module):
if ctx.takes.sdc:
optional += ['--sdc', ctx.takes.sdc]
s = ['genfasm', vprargs.arch_def,
os.path.realpath(ctx.takes.eblif),
'--device', vprargs.device_name,
'--read_rr_graph', vprargs.rr_graph
s = [
'genfasm',
vprargs.arch_def,
str(Path(ctx.takes.eblif).resolve()),
'--device',
vprargs.device_name,
'--read_rr_graph',
vprargs.rr_graph
] + vprargs.optional
if get_verbosity_level() >= 2:
@ -56,13 +41,17 @@ class FasmModule(Module):
sub(*s, cwd=build_dir)
default_fasm_output_name = fasm_output_path(build_dir, ctx.values.top)
default_fasm_output_name = f'{(Path(build_dir)/ctx.values.top)!s}.fasm'
if default_fasm_output_name != ctx.outputs.fasm:
shutil.move(default_fasm_output_name, ctx.outputs.fasm)
sh_mv(default_fasm_output_name, ctx.outputs.fasm)
if ctx.takes.fasm_extra:
yield 'Appending extra FASM...'
concat_fasm(ctx.outputs.fasm, ctx.takes.fasm_extra, ctx.outputs.fasm)
with \
open(ctx.outputs.fasm, 'r') as fasm_file, \
open(ctx.takes.fasm_extra, 'r') as fasm_extra_file, \
open(ctx.outputs.fasm, 'w') as wfptr:
wfptr.write(f"{fasm_file.read()}\n{fasm_extra_file.read()}")
else:
yield 'No extra FASM to append'

View File

@ -1,7 +1,3 @@
#!/usr/bin/python3
# Symbiflow Stage Module
"""
This module is intended for wrapping simple scripts without rewriting them as
an sfbuild module. This is mostly to maintain compatibility with workflows
@ -12,53 +8,42 @@ Accepted module parameters:
* `script` (string, mandatory): Path to the script to be executed
* `interpreter` (string, optional): Interpreter for the script
* `cwd` (string, optional): Current Working Directory for the script
* `outputs` (dict[string -> dict[string -> string]],
mandatory):
A dict with output descriptions (dicts). Keys name output dependencies.
* `mode` (string, mandatory): "file" or "stdout". Describes how the output is
grabbed from the script.
* `file` (string, required if `mode` is "file"): Name of the file generated by the
script.
* `target` (string, required): Default name of the file of the generated
dependency. You can use all values available durng map_io stage. Each input
dependency alsogets two extra values associated with it:
`:dependency_name[noext]`, which contains the path to the dependency the
extension with anything after last "." removed and `:dependency_name[dir]` which
contains directory paths of the dependency. This is useful for deriving an output
name from the input.
* `outputs` (dict[string -> dict[string -> string]], mandatory):
A dict with output descriptions (dicts).
Keys name output dependencies.
* `mode` (string, mandatory): "file" or "stdout".
Describes how the output is grabbed from the script.
* `file` (string, required if `mode` is "file"): Name of the file generated by the script.
* `target` (string, required): Default name of the file of the generated dependency.
You can use all values available durng map_io stage.
Each input dependency alsogets two extra values associated with it:
`:dependency_name[noext]`, which contains the path to the dependency the extension with anything after last "."
removed and `:dependency_name[dir]` which contains directory paths of the dependency.
This is useful for deriving an output name from the input.
* `meta` (string, optional): Description of the output dependency.
* `inputs` (dict[string -> string | bool], mandatory):
A dict with input descriptions. Key is either a name of a named argument or a
position of unnamed argument prefaced with "#" (eg. "#1"). Positions are indexed
from 1, as it's a convention that 0th argument is the path of the executed program.
Values are strings that can contains references to variables to be resolved
after the project flow configuration is loaded (that means they can reference
values and dependencies which are to be set by the user). All of modules inputs
will be determined by the references used. Thus dependency and value definitions
are implicit. If the value of the resolved string is empty and is associated with a
named argument, the argument in question will be skipped entirely. This allows
using optional dependencies. To use a named argument as a flag instead, set it to
`true`.
A dict with input descriptions.
Key is either a name of a named argument or a position of unnamed argument prefaced with "#" (eg. "#1").
Positions are indexed from 1, as it's a convention that 0th argument is the path of the executed program.
Values are strings that can contains references to variables to be resolved after the project flow configuration is
loaded (that means they can reference values and dependencies which are to be set by the user).
All of modules inputs will be determined by the references used.
Thus dependency and value definitions are implicit.
If the value of the resolved string is empty and is associated with a named argument, the argument in question will be
skipped entirely.
This allows using optional dependencies.
To use a named argument as a flag instead, set it to `true`.
"""
# TODO: `environment` input kind
# ----------------------------------------------------------------------------- #
from pathlib import Path
from shutil import move as sh_mv
from re import match as re_match, finditer as re_finditer
import os
import shutil
import re
from f4pga.common import decompose_depname, deep, get_verbosity_level, sub
from f4pga.module import Module, ModuleContext
from f4pga.common import *
from f4pga.module import *
# ----------------------------------------------------------------------------- #
def _generate_stage_name(params):
stage_name = params.get('stage_name')
if stage_name is None:
stage_name = '<unknown>'
return f'{stage_name}-generic'
def _get_param(params, name: str):
param = params.get(name)
@ -67,6 +52,7 @@ def _get_param(params, name: str):
f'missing `{name}` field')
return param
def _parse_param_def(param_def: str):
if param_def[0] == '#':
return 'positional', int(param_def[1:])
@ -74,8 +60,6 @@ def _parse_param_def(param_def: str):
return 'environmental', param_def[1:]
return 'named', param_def
_file_noext_deep = deep(file_noext)
_realdirpath_deep = deep(lambda p: os.path.realpath(os.path.dirname(p)))
class InputReferences:
dependencies: 'set[str]'
@ -89,48 +73,34 @@ class InputReferences:
self.dependencies = set()
self.values = set()
def _get_input_references(input: str) -> InputReferences:
refs = InputReferences()
if type(input) is not str:
return refs
matches = re.finditer('\$\{([^${}]*)\}', input)
for match in matches:
for match in re_finditer('\$\{([^${}]*)\}', input):
match_str = match.group(1)
if match_str[0] == ':':
if len(match_str) < 2:
raise Exception('Dependency name must be at least 1 character '
'long')
dep_name = re.match('([^\\[\\]]*)', match_str[1:]).group(1)
refs.dependencies.add(dep_name)
else:
if match_str[0] != ':':
refs.values.add(match_str)
continue
if len(match_str) < 2:
raise Exception('Dependency name must be at least 1 character long')
refs.dependencies.add(re_match('([^\\[\\]]*)', match_str[1:]).group(1))
return refs
def _make_noop1():
def noop(_):
return
return noop
def _tailcall1(self, fun):
def newself(arg, self=self, fun=fun):
fun(arg)
self(arg)
return newself
def _add_extra_values_to_env(ctx: ModuleContext):
takes = dict(vars(ctx.takes).items())
for take_name, take_path in takes.items():
if take_path is None:
continue
attr_name = f':{take_name}[noext]'
ctx.r_env.values[attr_name] = _file_noext_deep(take_path)
attr_name = f':{take_name}[dir]'
dirname = _realdirpath_deep(take_path)
ctx.r_env.values[attr_name] = dirname
def _make_noop1():
def noop(_):
return
return noop
class GenericScriptWrapperModule(Module):
script_path: str
@ -139,8 +109,15 @@ class GenericScriptWrapperModule(Module):
interpreter: 'None | str'
cwd: 'None | str'
@staticmethod
def _add_extra_values_to_env(ctx: ModuleContext):
for take_name, take_path in vars(ctx.takes).items():
if take_path is not None:
ctx.r_env.values[f':{take_name}[noext]'] = deep(lambda p: str(Path(p).with_suffix('')))(take_path)
ctx.r_env.values[f':{take_name}[dir]'] = deep(lambda p: str(Path(p).parent.resolve()))(take_path)
def map_io(self, ctx: ModuleContext):
_add_extra_values_to_env(ctx)
self._add_extra_values_to_env(ctx)
outputs = {}
for dep, _, out_path in self.file_outputs:
@ -155,7 +132,7 @@ class GenericScriptWrapperModule(Module):
return outputs
def execute(self, ctx: ModuleContext):
_add_extra_values_to_env(ctx)
self._add_extra_values_to_env(ctx)
cwd = ctx.r_env.resolve(self.cwd)
@ -187,7 +164,7 @@ class GenericScriptWrapperModule(Module):
file = ctx.r_env.resolve(file, final=True)
target = ctx.r_env.resolve(target, final=True)
if target != file:
shutil.move(file, target)
sh_mv(file, target)
def _init_outputs(self, output_defs: 'dict[str, dict[str, str]]'):
self.stdout_target = None
@ -294,7 +271,8 @@ class GenericScriptWrapperModule(Module):
self.values.append(val)
def __init__(self, params):
self.name = _generate_stage_name(params)
stage_name = params.get('stage_name')
self.name = f"{'<unknown>' if stage_name is None else stage_name}-generic"
self.no_of_phases = 2
self.script_path = params.get('script')
self.interpreter = params.get('interpreter')

View File

@ -1,7 +1,3 @@
#!/usr/bin/python3
# Symbiflow Stage Module
"""
Rename (ie. change) dependencies and values of a module. This module wraps another,
module whoose name is specified in `params.module` and changes the names of the
@ -25,13 +21,10 @@ Accepted module parameters:
"""
# ----------------------------------------------------------------------------- #
from f4pga.common import *
from f4pga.module import *
from f4pga.module import Module, ModuleContext
from f4pga.module_runner import get_module
# ----------------------------------------------------------------------------- #
def _switch_keys(d: 'dict[str, ]', renames: 'dict[str, str]') -> 'dict[str, ]':
newd = {}
@ -43,6 +36,7 @@ def _switch_keys(d: 'dict[str, ]', renames: 'dict[str, str]') -> 'dict[str, ]':
newd[k] = v
return newd
def _switchback_attrs(d: Namespace, renames: 'dict[str, str]') -> SimpleNamespace:
newn = SimpleNamespace()
for k, v in vars(d).items():
@ -54,6 +48,7 @@ def _switchback_attrs(d: Namespace, renames: 'dict[str, str]') -> SimpleNamespac
setattr(newn, k, v)
return newn
def _switch_entries(l: 'list[str]', renames: 'dict[str, str]') -> 'list[str]':
newl = []
for e in l:
@ -65,12 +60,11 @@ def _switch_entries(l: 'list[str]', renames: 'dict[str, str]') -> 'list[str]':
newl.append(r if r is not None else e)
return newl
def _generate_stage_name(name: str):
return f'{name}-io_renamed'
def _or_empty_dict(d: 'dict | None'):
return d if d is not None else {}
class IORenameModule(Module):
module: Module
rename_takes: 'dict[str, str]'
@ -102,7 +96,7 @@ class IORenameModule(Module):
self.rename_values = _or_empty_dict(params.get("rename_values"))
self.module = module
self.name = _generate_stage_name(module.name)
self.name = f'{module.name}-io_renamed'
self.no_of_phases = module.no_of_phases
self.takes = _switch_entries(module.takes, self.rename_takes)
self.produces = _switch_entries(module.produces, self.rename_produces)

View File

@ -1,21 +1,14 @@
#!/usr/bin/python3
"""
This module is used as a helper in a abuild chain to automate creating build directiores.
It's currenty the only parametric module, meaning it can take user-provided input at an early stage in order to
determine its take/produces I/O.
This allows other repesenting configurable directories, such as a build directory as dependencies and by doing so, allow
the dependency algorithm to lazily create the directories if they become necessary.
"""
# Symbiflow Stage Module
from pathlib import Path
from f4pga.module import Module, ModuleContext
""" This module is used as a helper in a abuild chain to automate creating build
directiores. It' currenty the only parametric module, meaning it can take
user-provided input at an early stage in order todetermine its take/produces
I/O. This allows other repesenting configurable directories, such as a build
directory as dependencies and by doing so, allow the dependency algorithm to
lazily create the directories if they become necessary. """
# ----------------------------------------------------------------------------- #
import os
from f4pga.common import *
from f4pga.module import *
# ----------------------------------------------------------------------------- #
class MkDirsModule(Module):
deps_to_produce: 'dict[str, str]'
@ -27,7 +20,7 @@ class MkDirsModule(Module):
outputs = vars(ctx.outputs)
for _, path in outputs.items():
yield f'Creating directory {path}...'
os.makedirs(path, exist_ok=True)
Path(path).mkdir(parents=True, exist_ok=True)
def __init__(self, params):
self.name = 'mkdirs'

View File

@ -1,54 +1,54 @@
#!/usr/bin/python3
from pathlib import Path
from os import remove as os_remove
from shutil import move as sh_mv
# Symbiflow Stage Module
# ----------------------------------------------------------------------------- #
import os
import re
from f4pga.common import *
from f4pga.module import *
from f4pga.module import Module, ModuleContext
# ----------------------------------------------------------------------------- #
DEFAULT_TIMING_RPT = 'pre_pack.report_timing.setup.rpt'
DEFAULT_UTIL_RPT = 'packing_pin_util.rpt'
class PackModule(Module):
def map_io(self, ctx: ModuleContext):
p = file_noext(ctx.takes.eblif)
build_dir = os.path.dirname(p)
epath = Path(ctx.takes.eblif)
build_dir = epath.parent
return {
'net': p + '.net',
'util_rpt': os.path.join(build_dir, DEFAULT_UTIL_RPT),
'timing_rpt': os.path.join(build_dir, DEFAULT_TIMING_RPT)
'net': str(epath.with_suffix('.net')),
'util_rpt': str(build_dir / DEFAULT_UTIL_RPT),
'timing_rpt': str(build_dir / DEFAULT_TIMING_RPT)
}
def execute(self, ctx: ModuleContext):
vpr_args = VprArgs(ctx.share, ctx.takes.eblif, ctx.values,
sdc_file=ctx.takes.sdc)
build_dir = os.path.dirname(ctx.outputs.net)
noisy_warnings(ctx.values.device)
build_dir = Path(ctx.outputs.net).parent
yield 'Packing with VPR...'
vpr('pack', vpr_args, cwd=build_dir)
vpr(
'pack',
VprArgs(
ctx.share,
ctx.takes.eblif,
ctx.values,
sdc_file=ctx.takes.sdc
),
cwd=str(build_dir)
)
og_log = os.path.join(build_dir, 'vpr_stdout.log')
og_log = str(build_dir / 'vpr_stdout.log')
yield 'Moving/deleting files...'
if ctx.outputs.pack_log:
shutil.move(og_log, ctx.outputs.pack_log)
sh_mv(og_log, ctx.outputs.pack_log)
else:
os.remove(og_log)
os_remove(og_log)
if ctx.outputs.timing_rpt:
shutil.move(os.path.join(build_dir, DEFAULT_TIMING_RPT),
ctx.outputs.timing_rpt)
sh_mv(str(build_dir / DEFAULT_TIMING_RPT), ctx.outputs.timing_rpt)
if ctx.outputs.util_rpt:
shutil.move(os.path.join(build_dir, DEFAULT_UTIL_RPT),
ctx.outputs.util_rpt)
sh_mv(str(build_dir / DEFAULT_UTIL_RPT), ctx.outputs.util_rpt)
def __init__(self, _):
self.name = 'pack'

View File

@ -1,34 +1,28 @@
#!/usr/bin/python3
# Symbiflow Stage Module
# ----------------------------------------------------------------------------- #
from pathlib import Path
import os
from shutil import move as sh_mv
from re import match as re_match
from f4pga.common import *
from f4pga.module import *
from f4pga.module import Module, ModuleContext
# ----------------------------------------------------------------------------- #
def default_output_name(place_constraints):
p = place_constraints
m = re.match('(.*)\\.[^.]*$', place_constraints)
m = re_match('(.*)\\.[^.]*$', place_constraints)
if m:
p = m.groups()[0] + '.place'
else:
p += '.place'
return p
return m.groups()[0] + '.place'
return f'{p}.place'
def place_constraints_file(ctx: ModuleContext):
dummy =- False
p = ctx.takes.place_constraints
if not p:
if p:
return p, False
p = ctx.takes.io_place
if not p:
dummy = True
p = file_noext(ctx.takes.eblif) + '.place'
if p:
return p, False
return f'{Path(ctx.takes.eblif).stem}.place', True
return p, dummy
class PlaceModule(Module):
def map_io(self, ctx: ModuleContext):
@ -45,7 +39,7 @@ class PlaceModule(Module):
with open(place_constraints, 'wb') as f:
f.write(b'')
build_dir = os.path.dirname(ctx.takes.eblif)
build_dir = str(Path(ctx.takes.eblif).parent)
vpr_options = ['--fix_clusters', place_constraints]
@ -63,7 +57,7 @@ class PlaceModule(Module):
# the ones in flow configuration.
if ctx.is_output_explicit('place'):
output_file = default_output_name(place_constraints)
shutil.move(output_file, ctx.outputs.place)
sh_mv(output_file, ctx.outputs.place)
yield 'Saving log...'
save_vpr_log('place.log', build_dir=build_dir)

View File

@ -1,24 +1,17 @@
#!/usr/bin/python3
# Symbiflow Stage Module
# ----------------------------------------------------------------------------- #
import os
from pathlib import Path
from f4pga.common import *
from f4pga.module import *
from f4pga.module import Module, ModuleContext
# ----------------------------------------------------------------------------- #
class PlaceConstraintsModule(Module):
def map_io(self, ctx: ModuleContext):
return {
'place_constraints': file_noext(ctx.takes.net) + '.preplace'
'place_constraints': f'{Path(ctx.takes.net).stem!s}.preplace'
}
def execute(self, ctx: ModuleContext):
arch_dir = os.path.join(ctx.share, 'arch')
arch_def = os.path.join(arch_dir, ctx.values.device, 'arch.timing.xml')
arch_dir = str(Path(ctx.share) / 'arch')
arch_def = str(Path(arch_dir) / ctx.values.device / 'arch.timing.xml')
database = sub('prjxray-config').decode().replace('\n', '')

View File

@ -1,41 +1,41 @@
#!/usr/bin/python3
from pathlib import Path
from shutil import move as sh_mv
# Symbiflow Stage Module
# ----------------------------------------------------------------------------- #
import os
import shutil
from f4pga.common import *
from f4pga.module import *
from f4pga.module import Module, ModuleContext
# ----------------------------------------------------------------------------- #
def route_place_file(eblif: str):
return file_noext(eblif) + '.route'
def route_place_file(ctx: ModuleContext):
return str(Path(ctx.takes.eblif).with_suffix('.route'))
class RouteModule(Module):
def map_io(self, ctx: ModuleContext):
return {
'route': route_place_file(ctx.takes.eblif)
'route': route_place_file(ctx)
}
def execute(self, ctx: ModuleContext):
build_dir = os.path.dirname(ctx.takes.eblif)
build_dir = str(Path(ctx.takes.eblif).parent)
vpr_options = []
if ctx.values.vpr_options:
vpr_options = options_dict_to_list(ctx.values.vpr_options)
vprargs = VprArgs(ctx.share, ctx.takes.eblif, ctx.values,
sdc_file=ctx.takes.sdc)
yield 'Routing with VPR...'
vpr('route', vprargs, cwd=build_dir)
vpr(
'route',
VprArgs(
ctx.share,
ctx.takes.eblif,
ctx.values,
sdc_file=ctx.takes.sdc
),
cwd=build_dir
)
if ctx.is_output_explicit('route'):
shutil.move(route_place_file(ctx.takes.eblif), ctx.outputs.route)
sh_mv(route_place_file(ctx), ctx.outputs.route)
yield 'Saving log...'
save_vpr_log('route.log', build_dir=build_dir)

View File

@ -1,17 +1,12 @@
#!/usr/bin/python3
# Symbiflow Stage Module
# ----------------------------------------------------------------------------- #
import os
from f4pga.common import *
from f4pga.module import *
from f4pga.module import Module, ModuleContext
# ----------------------------------------------------------------------------- #
# Setup environmental variables for YOSYS TCL scripts
def yosys_setup_tcl_env(tcl_env_def):
"""
Setup environmental variables for YOSYS TCL scripts.
"""
env = {}
for key, value in tcl_env_def.items():
if value is None:
@ -22,6 +17,7 @@ def yosys_setup_tcl_env(tcl_env_def):
env[key] = v
return env
def yosys_synth(tcl, tcl_env, verilog_files=[], read_verilog_args=None, log=None):
# Set up environment for TCL weirdness
optional = []
@ -41,19 +37,15 @@ def yosys_synth(tcl, tcl_env, verilog_files=[], read_verilog_args=None, log=None
verilog_files = []
# Execute YOSYS command
return sub(*(['yosys', '-p', tcl] + optional + verilog_files),
env=env)
return sub(*(['yosys', '-p', tcl] + optional + verilog_files), env=env)
def yosys_conv(tcl, tcl_env, synth_json):
# Set up environment for TCL weirdness
env = os.environ.copy()
env.update(tcl_env)
return sub('yosys', '-p', f'read_json {synth_json}; tcl {tcl}', env=env)
# Execute YOSYS command
return sub('yosys', '-p', 'read_json ' + synth_json + '; tcl ' + tcl,
env=env)
# ----------------------------------------------------------------------------- #
class SynthModule(Module):
extra_products: 'list[str]'

View File

@ -1,22 +1,21 @@
import os
import json
from f4pga.common import file_noext, ResolutionEnv, deep
from f4pga.stage import Stage
from pathlib import Path
from copy import copy
from os import listdir as os_listdir
from json import dump as json_dump, load as json_load
from f4pga.common import ResolutionEnv, deep
from f4pga.stage import Stage
_realpath_deep = deep(os.path.realpath)
def open_flow_cfg(path: str) -> dict:
flow_cfg_json: str
with open(path, 'r') as flow_cfg_file:
flow_cfg_json = flow_cfg_file.read()
return json.loads(flow_cfg_json)
with Path(path).open('r') as rfptr:
return json_load(rfptr)
def save_flow_cfg(flow: dict, path: str):
flow_cfg_json = json.dumps(flow, indent=4)
with open(path, 'w') as flow_cfg_file:
flow_cfg_file.write(flow_cfg_json)
with Path(path).open('w') as wfptr:
json_dump(flow, wfptr, indent=4)
def _get_lazy_dict(parent: dict, name: str):
d = parent.get(name)
@ -25,69 +24,96 @@ def _get_lazy_dict(parent: dict, name: str):
parent[name] = d
return d
def _get_ov_dict(dname: str, flow: dict,
platform: 'str | None' = None, stage: 'str | None' = None):
d: dict
if platform:
def _get_ov_dict(
dname: str,
flow: dict,
platform: 'str | None' = None,
stage: 'str | None' = None
):
if not platform:
return _get_lazy_dict(flow, dname)
platform_dict: dict = flow[platform]
if stage:
stage_dict: dict = _get_lazy_dict(platform_dict, stage)
d = _get_lazy_dict(stage_dict, dname)
else:
d = _get_lazy_dict(platform_dict, dname)
else:
d = _get_lazy_dict(flow, dname)
return _get_lazy_dict(stage_dict, dname)
return _get_lazy_dict(platform_dict, dname)
return d
def _get_dep_dict(flow: dict,
platform: 'str | None' = None, stage: 'str | None' = None):
def _get_dep_dict(
flow: dict,
platform: 'str | None' = None,
stage: 'str | None' = None
):
return _get_ov_dict('dependencies', flow, platform, stage)
def _get_vals_dict(flow: dict,
platform: 'str | None' = None, stage: 'str | None' = None):
def _get_vals_dict(
flow: dict,
platform: 'str | None' = None,
stage: 'str | None' = None
):
return _get_ov_dict('values', flow, platform, stage)
def _add_ov(ov_dict_getter, failstr_constr, flow_cfg: dict, name: str,
values: list, platform: 'str | None' = None,
stage: 'str | None' = None) -> bool:
d = ov_dict_getter(flow_cfg, platform, stage)
def _add_ov(
ov_dict_getter,
failstr_constr,
flow_cfg: dict,
name: str,
values: list,
platform: 'str | None' = None,
stage: 'str | None' = None
) -> bool:
d = ov_dict_getter(flow_cfg, platform, stage)
deps = d.get(name)
if type(deps) is list:
deps += values
elif deps is None:
return True
if deps is None:
d[name] = values
else:
return True
print(failstr_constr(name))
return False
return True
def _rm_ov_by_values(ov_dict_getter, notset_str_constr, notlist_str_constr,
flow: dict, name: str, vals: list,
def _rm_ov_by_values(
ov_dict_getter,
notset_str_constr,
notlist_str_constr,
flow: dict,
name: str,
vals: list,
platform: 'str | None' = None,
stage: 'str | None' = None) -> bool:
values_to_remove = set(vals)
stage: 'str | None' = None
) -> bool:
d = ov_dict_getter(flow, platform, stage)
vallist: list = d.get(name)
if type(vallist) is list:
d[name] = [val for val in vallist if val not in values_to_remove]
elif type(vallist) is None:
d[name] = [val for val in vallist if val not in set(vals)]
return True
if type(vallist) is None:
print(notset_str_constr(name))
return False
else:
print(notlist_str_constr(name))
return False
return True
def _rm_ov_by_idx(ov_dict_getter, notset_str_constr, notlist_str_constr,
flow: dict, name: str, idcs: list,
def _rm_ov_by_idx(
ov_dict_getter,
notset_str_constr,
notlist_str_constr,
flow: dict,
name: str,
idcs: list,
platform: 'str | None' = None,
stage: 'str | None' = None) -> bool:
stage: 'str | None' = None
) -> bool:
idcs.sort(reverse=True)
if len(idcs) == 0:
@ -103,17 +129,22 @@ def _rm_ov_by_idx(ov_dict_getter, notset_str_constr, notlist_str_constr,
for idx in idcs:
vallist.pop(idx)
elif vallist is None:
return True
if vallist is None:
print(notset_str_constr(name))
return False
else:
print(notlist_str_constr(name))
return False
return True
def _get_ovs_raw(dict_name: str, flow_cfg,
platform: 'str | None', stage: 'str | None'):
def _get_ovs_raw(
dict_name: str,
flow_cfg,
platform: 'str | None',
stage: 'str | None'
):
vals = flow_cfg.get(dict_name)
if vals is None:
vals = {}
@ -128,48 +159,105 @@ def _get_ovs_raw(dict_name: str, flow_cfg,
return vals
def _remove_dependencies_by_values(flow: dict, name: str, deps: list,
def _remove_dependencies_by_values(
flow: dict,
name: str,
deps: list,
platform: 'str | None' = None,
stage: 'str | None' = None) -> bool:
stage: 'str | None' = None
) -> bool:
def notset_str_constr(dname):
return f'Dependency `{dname}` is not set. Nothing to remove.'
def notlist_str_constr(dname):
return f'Dependency `{dname}` is not a list! Use unsetd instead.'
return _rm_ov_by_values(_get_dep_dict, notset_str_constr, notlist_str_constr,
flow, name, deps, platform, stage)
return _rm_ov_by_values(
_get_dep_dict,
notset_str_constr,
notlist_str_constr,
flow,
name,
deps,
platform,
stage
)
def _remove_dependencies_by_idx(flow: dict, name: str, idcs: list,
def _remove_dependencies_by_idx(
flow: dict,
name: str,
idcs: list,
platform: 'str | None' = None,
stage: 'str | None' = None) -> bool:
stage: 'str | None' = None
) -> bool:
def notset_str_constr(dname):
return f'Dependency `{dname}` is not set. Nothing to remove.'
def notlist_str_constr(dname):
return f'Dependency `{dname}` is not a list! Use unsetd instead.'
return _rm_ov_by_idx(_get_dep_dict, notset_str_constr, notlist_str_constr,
flow, name, idcs, platform, stage)
return _rm_ov_by_idx(
_get_dep_dict,
notset_str_constr,
notlist_str_constr,
flow,
name,
idcs,
platform,
stage
)
def _remove_values_by_values(flow: dict, name: str, deps: list,
def _remove_values_by_values(
flow: dict,
name: str,
deps: list,
platform: 'str | None' = None,
stage: 'str | None' = None) -> bool:
stage: 'str | None' = None
) -> bool:
def notset_str_constr(vname):
return f'Value `{vname}` is not set. Nothing to remove.'
def notlist_str_constr(vname):
return f'Value `{vname}` is not a list! Use unsetv instead.'
return _rm_ov_by_values(_get_vals_dict, notset_str_constr, notlist_str_constr,
flow, name, deps, platform, stage)
return _rm_ov_by_values(
_get_vals_dict,
notset_str_constr,
notlist_str_constr,
flow,
name,
deps,
platform,
stage
)
def _remove_values_by_idx(flow: dict, name: str, idcs: list,
def _remove_values_by_idx(
flow: dict,
name: str,
idcs: list,
platform: 'str | None' = None,
stage: 'str | None' = None) -> bool:
stage: 'str | None' = None
) -> bool:
def notset_str_constr(dname):
return f'Dependency `{dname}` is not set. Nothing to remove.'
def notlist_str_constr(dname):
return f'Dependency `{dname}` is not a list! Use unsetv instead.'
return _rm_ov_by_idx(_get_vals_dict, notset_str_constr, notlist_str_constr,
flow, name, idcs, platform, stage)
return _rm_ov_by_idx(
_get_vals_dict,
notset_str_constr,
notlist_str_constr,
flow,
name,
idcs,
platform,
stage
)
def unset_dependency(flow: dict, name: str,
platform: 'str | None', stage: 'str | None'):
def unset_dependency(
flow: dict,
name: str,
platform: 'str | None',
stage: 'str | None'
):
d = _get_dep_dict(flow, platform, stage)
if d.get(name) is None:
print(f'Dependency `{name}` is not set!')
@ -177,22 +265,26 @@ def unset_dependency(flow: dict, name: str,
d.pop(name)
return True
def verify_platform_name(platform: str, mypath: str):
for plat_def_filename in os.listdir(os.path.join(mypath, 'platforms')):
platform_name = file_noext(plat_def_filename)
for plat_def_filename in os_listdir(str(Path(mypath) / 'platforms')):
platform_name = str(Path(plat_def_filename).stem)
if platform == platform_name:
return True
return False
def verify_stage(platform: str, stage: str, mypath: str):
# TODO: Verify stage
return True
def _is_kword(w: str):
return \
(w == 'dependencies') | (w == 'values') | \
(w == 'default_platform') | (w == 'default_target')
class FlowDefinition:
# stage name -> module path mapping
stages: 'dict[str, Stage]'
@ -225,6 +317,7 @@ class FlowDefinition:
r_env.add_values(stage.value_overrides)
return r_env
class ProjectFlowConfig:
flow_cfg: dict
# r_env: ResolutionEnv
@ -275,20 +368,26 @@ class ProjectFlowConfig:
return r_env
""" Get dependencies without value resolution applied """
def get_dependencies_raw(self, platform: 'str | None' = None):
"""
Get dependencies without value resolution applied.
"""
return _get_ovs_raw('dependencies', self.flow_cfg, platform, None)
""" Get values without value resolution applied """
def get_values_raw(self, platform: 'str | None' = None,
stage: 'str | None' = None):
def get_values_raw(
self,
platform: 'str | None' = None,
stage: 'str | None' = None
):
"""
Get values without value resolution applied.
"""
return _get_ovs_raw('values', self.flow_cfg, platform, stage)
def get_stage_value_overrides(self, platform: str, stage: str):
stage_cfg = self.flow_cfg[platform].get(stage)
if stage_cfg is None:
return {}
stage_vals_ovds = stage_cfg.get('values')
if stage_vals_ovds is None:
return {}
@ -317,8 +416,7 @@ class FlowConfig:
raw_project_deps = project_config.get_dependencies_raw(platform)
self.dependencies_explicit = \
_realpath_deep(self.r_env.resolve(raw_project_deps))
self.dependencies_explicit = deep(lambda p: str(Path(p).resolve()))(self.r_env.resolve(raw_project_deps))
for stage_name, stage in platform_def.stages.items():
project_val_ovds = \
@ -349,12 +447,9 @@ class FlowConfigException(Exception):
def __str__(self) -> str:
return f'Error in config `{self.path}: {self.message}'
def open_project_flow_cfg(path: str) -> ProjectFlowConfig:
cfg = ProjectFlowConfig(path)
flow_cfg_json: str
with open(path, 'r') as flow_cfg_file:
flow_cfg_json = flow_cfg_file.read()
cfg.flow_cfg = json.loads(flow_cfg_json)
with Path(path).open('r') as rfptr:
cfg.flow_cfg = json_load(rfptr)
return cfg

View File

@ -1,17 +1,22 @@
# Here are the things necessary to write a symbiflow Module
"""
Here are the things necessary to write an F4PGA Module.
"""
import abc
from types import SimpleNamespace
from f4pga.common import *
from colorama import Fore, Style
from abc import abstractmethod
from f4pga.common import (
decompose_depname,
ResolutionEnv
)
class Module:
"""
A `Module` is a wrapper for whatever tool is used in a flow.
Modules can request dependencies, values and are guranteed to have all the
required ones present when entering `exec` mode.
They also have to specify what dependencies they produce and create the files
for these dependencies.
Modules can request dependencies, values and are guranteed to have all the required ones present when entering
`exec` mode.
They also have to specify what dependencies they produce and create the files for these dependencies.
"""
no_of_phases: int
@ -21,16 +26,16 @@ class Module:
values: 'list[str]'
prod_meta: 'dict[str, str]'
@abc.abstractmethod
@abstractmethod
def execute(self, ctx):
"""
Executes module. Use yield to print a message informing about current
execution phase.
Executes module.
Use yield to print a message informing about current execution phase.
`ctx` is `ModuleContext`.
"""
pass
@abc.abstractmethod
@abstractmethod
def map_io(self, ctx) -> 'dict[str, ]':
"""
Returns paths for outputs derived from given inputs.
@ -44,48 +49,50 @@ class Module:
self.name = '<BASE STAGE>'
self.prod_meta = {}
class ModuleContext:
"""
A class for object holding mappings for dependencies and values as well as
other information needed during modules execution.
A class for object holding mappings for dependencies and values as well as other information needed during modules
execution.
"""
share: str # Absolute path to Symbiflow's share directory
bin: str # Absolute path to Symbiflow's bin directory
takes: SimpleNamespace # Maps symbolic dependency names to relative
# paths.
produces: SimpleNamespace # Contains mappings for explicitely specified
# dependencies. Useful mostly for checking for
# on-demand optional outputs (such as logs)
# with `is_output_explicit` method.
takes: SimpleNamespace # Maps symbolic dependency names to relative paths.
produces: SimpleNamespace # Contains mappings for explicitely specified dependencies.
# Useful mostly for checking for on-demand optional outputs (such as logs) with
# `is_output_explicit` method.
outputs: SimpleNamespace # Contains mappings for all available outputs.
values: SimpleNamespace # Contains all available requested values.
r_env: ResolutionEnv # `ResolutionEnvironmet` object holding mappings
# for current scope.
r_env: ResolutionEnv # `ResolutionEnvironmet` object holding mappings for current scope.
module_name: str # Name of the module.
def is_output_explicit(self, name: str):
""" True if user has explicitely specified output's path. """
o = getattr(self.produces, name)
return o is not None
"""
True if user has explicitely specified output's path.
"""
return getattr(self.produces, name) is not None
def _getreqmaybe(self, obj, deps: 'list[str]', deps_cfg: 'dict[str, ]'):
"""
Add attribute for a dependency or panic if a required dependency has not
been given to the module on its input.
Add attribute for a dependency or panic if a required dependency has not been given to the module on its input.
"""
for name in deps:
name, spec = decompose_depname(name)
value = deps_cfg.get(name)
if value is None and spec == 'req':
fatal(-1, f'Dependency `{name}` is required by module '
f'`{self.module_name}` but wasn\'t provided')
fatal(-1, f'Dependency `{name}` is required by module `{self.module_name}` but wasn\'t provided')
setattr(obj, name, self.r_env.resolve(value))
# `config` should be a dictionary given as modules input.
def __init__(self, module: Module, config: 'dict[str, ]',
r_env: ResolutionEnv, share: str, bin: str):
def __init__(
self,
module: Module,
config: 'dict[str, ]',
r_env: ResolutionEnv,
share: str,
bin: str
):
self.module_name = module.name
self.takes = SimpleNamespace()
self.produces = SimpleNamespace()
@ -122,6 +129,7 @@ class ModuleContext:
return mycopy
class ModuleRuntimeException(Exception):
info: str
@ -131,14 +139,15 @@ class ModuleRuntimeException(Exception):
def __str___(self):
return self.info
def get_mod_metadata(module: Module):
""" Get descriptions for produced dependencies. """
def get_mod_metadata(module: Module):
"""
Get descriptions for produced dependencies.
"""
meta = {}
has_meta = hasattr(module, 'prod_meta')
for prod in module.produces:
prod = prod.replace('?', '')
prod = prod.replace('!', '')
prod = prod.replace('?', '').replace('!', '')
if not has_meta:
meta[prod] = '<no descritption>'
continue

View File

@ -1,14 +1,16 @@
""" Dynamically import and run sfbuild modules """
"""
Dynamically import and run F4PGA modules.
"""
from contextlib import contextmanager
import importlib
import importlib.util
import os
import importlib.util as importlib_util
from pathlib import Path
from colorama import Style
from f4pga.module import Module, ModuleContext, get_mod_metadata
from f4pga.common import ResolutionEnv, deep, sfprint
from colorama import Fore, Style
_realpath_deep = deep(os.path.realpath)
@contextmanager
def _add_to_sys_path(path: str):
@ -20,17 +22,20 @@ def _add_to_sys_path(path: str):
finally:
sys.path = old_syspath
def import_module_from_path(path: str):
absolute_path = os.path.realpath(path)
absolute_path = str(Path(path).resolve())
with _add_to_sys_path(path):
spec = importlib.util.spec_from_file_location(absolute_path, absolute_path)
module = importlib.util.module_from_spec(spec)
spec = importlib_util.spec_from_file_location(absolute_path, absolute_path)
module = importlib_util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
# Once imported a module will be added to that dict to avaid re-importing it
preloaded_modules = {}
def get_module(path: str):
global preloaded_modules
@ -41,10 +46,10 @@ def get_module(path: str):
mod = import_module_from_path(path)
preloaded_modules[path] = mod
# All sfbuild modules should expose a `ModuleClass` type/alias which is a
# class implementing a Module interface
# All F4PGA modules should expose a `ModuleClass` type/alias which is a class implementing a Module interface
return mod.ModuleClass
class ModRunCtx:
share: str
bin: str
@ -58,6 +63,7 @@ class ModRunCtx:
def make_r_env(self):
return ResolutionEnv(self.config['values'])
class ModuleFailException(Exception):
module: str
mode: str
@ -69,8 +75,11 @@ class ModuleFailException(Exception):
self.e = e
def __str__(self) -> str:
return f'ModuleFailException:\n Module `{self.module}` failed ' \
f'MODE: \'{self.mode}\'\n\nException `{type(self.e)}`: {self.e}'
return f"""ModuleFailException:
Module `{self.module}` failed MODE: \'{self.mode}\'
Exception `{type(self.e)}`: {self.e}
"""
def module_io(module: Module):
return {
@ -80,32 +89,41 @@ def module_io(module: Module):
'meta': get_mod_metadata(module)
}
def module_map(module: Module, ctx: ModRunCtx):
try:
mod_ctx = ModuleContext(module, ctx.config, ctx.make_r_env(), ctx.share,
ctx.bin)
mod_ctx = ModuleContext(
module,
ctx.config,
ctx.make_r_env(),
ctx.share,
ctx.bin
)
except Exception as e:
raise ModuleFailException(module.name, 'map', e)
return _realpath_deep(vars(mod_ctx.outputs))
return deep(lambda p: str(Path(p).resolve()))(vars(mod_ctx.outputs))
def module_exec(module: Module, ctx: ModRunCtx):
try:
mod_ctx = ModuleContext(module, ctx.config, ctx.make_r_env(), ctx.share,
ctx.bin)
mod_ctx = ModuleContext(
module,
ctx.config,
ctx.make_r_env(),
ctx.share,
ctx.bin
)
except Exception as e:
raise ModuleFailException(module.name, 'exec', e)
sfprint(1, 'Executing module '
f'`{Style.BRIGHT + module.name + Style.RESET_ALL}`:')
sfprint(1, f'Executing module `{Style.BRIGHT + module.name + Style.RESET_ALL}`:')
current_phase = 1
try:
for phase_msg in module.execute(mod_ctx):
sfprint(1, f' {Style.BRIGHT}[{current_phase}/{module.no_of_phases}]'
f'{Style.RESET_ALL}: {phase_msg}')
sfprint(1, f' {Style.BRIGHT}[{current_phase}/{module.no_of_phases}] {Style.RESET_ALL}: {phase_msg}')
current_phase += 1
except Exception as e:
raise ModuleFailException(module.name, 'exec', e)
sfprint(1, f'Module `{Style.BRIGHT + module.name + Style.RESET_ALL}` '
'has finished its work!')
sfprint(1, f'Module `{Style.BRIGHT + module.name + Style.RESET_ALL}` has finished its work!')

View File

@ -1,19 +0,0 @@
""" The "ugly" module is dedicated for some *ugly* workarounds """
import os
from f4pga.common import sub as common_sub
def noisy_warnings():
""" Emit some noisy warnings """
os.environ['OUR_NOISY_WARNINGS'] = 'noisy_warnings.log'
return 'noisy_warnings.log'
def generate_values():
""" Generate initial values, available in configs """
return{
'prjxray_db': common_sub('prjxray-config').decode().replace('\n', ''),
'python3': common_sub('which', 'python3').decode().replace('\n', ''),
'noisyWarnings': noisy_warnings()
}