#!/usr/bin/env python3 """ sfbuild - Symbiflow Build System This tool allows for building FPGA targets (such as bitstreams) for any supported platform with just one simple command and a project file. The idea is that sfbuild wraps all the tools needed by different platforms in "modules", which define inputs/outputs and various parameters. This allows sfbuild to resolve dependencies for any target provided that a "flow definition" file exists for such target. The flow defeinition file list modules available for that platform and may tweak some settings of those modules. A basic example of using sfbuild: $ sfbuild build --platform arty_35 -t bitstream This will make sfbuild attempt to create a bitstream for arty_35 platform. flow.json is a flow configuration file, which should be created for a project that uses sfbuild. Iontains project-specific definitions needed within the flow, such as list of source code files. """ from argparse import Namespace import os import json from typing import Iterable from colorama import Fore, Style from sf_common import ResolutionEnv, fatal, scan_modules, set_verbosity_level, \ sfprint from sf_module import * from sf_cache import SymbiCache import sf_ugly from sf_flow_config import ProjectFlowConfig, FlowConfig, FlowDefinition, \ open_project_flow_cfg, verify_platform_name, \ verify_stage from sf_module_runner import * from sf_module_inspector import get_module_info from sf_stage import Stage from sf_argparse import setup_argparser, get_cli_flow_config SYMBICACHEPATH = '.symbicache' mypath = os.path.realpath(os.sys.argv[0]) mypath = os.path.dirname(mypath) binpath = os.path.realpath(os.path.join(mypath, '..')) share_dir_path = os.path.realpath(os.path.join(mypath, '../../share/symbiflow')) class DependencyNotProducedException(Exception): dep_name: str provider: str def __init__(self, dep_name: str, provider: str): self.dep_name = dep_name self.provider = provider def __str__(self) -> str: return f'Stage `{self.provider}` did not produce promised ' \ f'dependency `{self.dep_name}`' def dep_value_str(dep: str): return ':' + dep def platform_stages(platform_flow, r_env): """ Iterates over all stages available in a given flow. """ stage_options = platform_flow.get('stage_options') for stage_name, modulestr in platform_flow['stages'].items(): mod_opts = stage_options.get(stage_name) if stage_options else None yield Stage(stage_name, modulestr, mod_opts, r_env) def req_exists(r): """ Checks whether a dependency exists on a drive. """ if type(r) is str: if not os.path.isfile(r) and not os.path.islink(r) \ and not os.path.isdir(r): return False elif type(r) is list: return not (False in map(req_exists, r)) else: raise Exception(f'Requirements can be currently checked only for single ' f'paths, or path lists (reason: {r})') return True def map_outputs_to_stages(stages: 'list[Stage]'): """ Associates a stage with every possible output. This is commonly refferef to as `os_map` (output-stage-map) through the code. """ os_map: 'dict[str, Stage]' = {} # Output-Stage map for stage in stages: for output in stage.produces: if not os_map.get(output.name): os_map[output.name] = stage elif os_map[output.name] != stage: raise Exception(f'Dependency `{output.name}` is generated by ' f'stage `{os_map[output.name].name}` and ' f'`{stage.name}`. Dependencies can have only one ' 'provider at most.') return os_map def filter_existing_deps(deps: 'dict[str, ]', symbicache): return [(n, p) for n, p in deps.items() \ if req_exists(p)] # and not dep_differ(p, symbicache)] def get_stage_values_override(og_values: dict, stage: Stage): values = og_values.copy() values.update(stage.value_ovds) return values def prepare_stage_io_input(stage: Stage): return { 'params': stage.params } if stage.params is not None else {} def prepare_stage_input(stage: Stage, platform_name: str, values: dict, dep_paths: 'dict[str, ]', config_paths: 'dict[str, ]'): takes = {} for take in stage.takes: paths = dep_paths.get(take.name) if paths: # Some takes may be not required takes[take.name] = paths produces = {} for prod in stage.produces: if dep_paths.get(prod.name): produces[prod.name] = dep_paths[prod.name] elif config_paths.get(prod.name): produces[prod.name] = config_paths[prod.name] stage_mod_cfg = { 'takes': takes, 'produces': produces, 'values': values, 'platform': platform_name, } return stage_mod_cfg def update_dep_statuses(paths, consumer: str, symbicache: SymbiCache): if type(paths) is str: return symbicache.update(paths, consumer) elif type(paths) is list: for p in paths: return update_dep_statuses(p, consumer, symbicache) elif type(paths) is dict: for _, p in paths.items(): return update_dep_statuses(p, consumer, symbicache) fatal(-1, 'WRONG PATHS TYPE') def dep_differ(paths, consumer: str, symbicache: SymbiCache): """ Check if a dependency differs from its last version, lack of dependency is treated as "differs" """ if type(paths) is str: s = symbicache.get_status(paths, consumer) if s == 'untracked': symbicache.update(paths, consumer) return symbicache.get_status(paths, consumer) != 'same' elif type(paths) is list: return True in [dep_differ(p, consumer, symbicache) for p in paths] elif type(paths) is dict: return True in [dep_differ(p, consumer, symbicache) \ for _, p in paths.items()] return False def dep_will_differ(target: str, paths, consumer: str, os_map: 'dict[str, Stage]', run_stages: 'set[str]', symbicache: SymbiCache): """ Check if a dependency or any of the dependencies it depends on differ from their last versions. """ provider = os_map.get(target) if provider: return (provider.name in run_stages) or \ dep_differ(paths, consumer, symbicache) return dep_differ(paths, consumer, symbicache) def _print_unreachable_stage_message(provider: Stage, take: str): sfprint(0, ' Stage ' f'`{Style.BRIGHT + provider.name + Style.RESET_ALL}` is ' 'unreachable due to unmet dependency ' f'`{Style.BRIGHT + take.name + Style.RESET_ALL}`') def config_mod_runctx(stage: Stage, platform_name: str, values: 'dict[str, ]', dep_paths: 'dict[str, str | list[str]]', config_paths: 'dict[str, str | list[str]]'): config = prepare_stage_input(stage, platform_name, values, dep_paths, config_paths) return ModRunCtx(share_dir_path, binpath, config) class Flow: """ Describes a complete, configured flow, ready for execution. """ # Dependendecy to build target: str # Values in global scope cfg: FlowConfig # dependency-producer map os_map: 'dict[str, Stage]' # Paths resolved for dependencies dep_paths: 'dict[str, str | list[str]]' # Explicit configs for dependency paths # config_paths: 'dict[str, str | list[str]]' # Stages that need to be run run_stages: 'set[str]' # Number of stages that relied on outdated version of a (checked) dependency deps_rebuilds: 'dict[str, int]' symbicache: 'SymbiCache | None' flow_cfg: FlowConfig def __init__(self, target: str, cfg: FlowConfig, symbicache: 'SymbiCache | None'): self.target = target self.os_map = map_outputs_to_stages(cfg.stages.values()) explicit_deps = cfg.get_dependency_overrides() # print(explicit_deps) self.dep_paths = dict(filter_existing_deps(explicit_deps, symbicache)) self.run_stages = set() self.symbicache = symbicache self.cfg = cfg self.deps_rebuilds = {} self._resolve_dependencies(self.target, set()) def _dep_will_differ(self, dep: str, paths, consumer: str): if not self.symbicache: # Handle --nocache mode return True return dep_will_differ(dep, paths, consumer, self.os_map, self.run_stages, self.symbicache) def _resolve_dependencies(self, dep: str, stages_checked: 'set[str]'): # Initialize the dependency status if necessary if self.deps_rebuilds.get(dep) is None: self.deps_rebuilds[dep] = 0 # Check if an explicit dependency is already resolved paths = self.dep_paths.get(dep) if paths and not self.os_map.get(dep): return # Check if a stage can provide the required dependency provider = self.os_map.get(dep) if not provider or provider.name in stages_checked: return # TODO: Check if the dependency is "on-demand" and force it in provider's # config if it is. for take in provider.takes: self._resolve_dependencies(take.name, stages_checked) # If any of the required dependencies is unavailable, then the # provider stage cannot be run take_paths = self.dep_paths.get(take.name) # Add input path to values (dirty hack) provider.value_overrides[dep_value_str(take.name)] = take_paths if not take_paths and take.spec == 'req': _print_unreachable_stage_message(provider, take) return if self._dep_will_differ(take.name, take_paths, provider.name): sfprint(2, f'{take.name} is causing rebuild for {provider.name}') self.run_stages.add(provider.name) self.deps_rebuilds[take.name] += 1 stage_values = self.cfg.get_r_env(provider.name).values modrunctx = config_mod_runctx(provider, self.cfg.platform, stage_values, self.dep_paths, self.cfg.get_dependency_overrides()) outputs = module_map(provider.module, modrunctx) stages_checked.add(provider.name) self.dep_paths.update(outputs) for _, out_paths in outputs.items(): if (out_paths is not None) and not (req_exists(out_paths)): self.run_stages.add(provider.name) # Verify module's outputs and add paths as values. outs = outputs.keys() # print(outs) for o in provider.produces: if o.name not in outs: if o.spec == 'req' or (o.spec == 'demand' and \ o.name in self.cfg.get_dependency_overrides().keys()): fatal(-1, f'Module {provider.name} did not produce a mapping ' f'for a required output `{o.name}`') else: # Remove an on-demand/optional output that is not produced # from os_map. self.os_map.pop(o.name) # Add a value for the output (dirty ack yet again) o_path = outputs.get(o.name) if o_path is not None: provider.value_overrides[dep_value_str(o.name)] = \ outputs.get(o.name) def print_resolved_dependencies(self, verbosity: int): deps = list(self.deps_rebuilds.keys()) deps.sort() for dep in deps: status = Fore.RED + '[X]' + Fore.RESET source = Fore.YELLOW + 'MISSING' + Fore.RESET paths = self.dep_paths.get(dep) if paths: exists = req_exists(paths) provider = self.os_map.get(dep) if provider and provider.name in self.run_stages: if exists: status = Fore.YELLOW + '[R]' + Fore.RESET else: status = Fore.YELLOW + '[S]' + Fore.RESET source = f'{Fore.BLUE + self.os_map[dep].name + Fore.RESET} ' \ f'-> {paths}' elif exists: if self.deps_rebuilds[dep] > 0: status = Fore.GREEN + '[N]' + Fore.RESET else: status = Fore.GREEN + '[O]' + Fore.RESET source = paths elif self.os_map.get(dep): status = Fore.RED + '[U]' + Fore.RESET source = \ f'{Fore.BLUE + self.os_map[dep].name + Fore.RESET} -> ???' sfprint(verbosity, f' {Style.BRIGHT + status} ' f'{dep + Style.RESET_ALL}: {source}') def _build_dep(self, dep): paths = self.dep_paths.get(dep) provider = self.os_map.get(dep) run = (provider.name in self.run_stages) if provider else False if not paths: sfprint(2, f'Dependency {dep} is unresolved.') return False if req_exists(paths) and not run: return True else: assert(provider) any_dep_differ = False if self.symbicache else True for p_dep in provider.takes: if not self._build_dep(p_dep.name): assert (p_dep.spec != 'req') continue if self.symbicache: any_dep_differ |= \ update_dep_statuses(self.dep_paths[p_dep.name], provider.name, self.symbicache) # If dependencies remained the same, consider the dep as up-to date # For example, when changing a comment in Verilog source code, # the initial dependency resolution will report a need for complete # rebuild, however, after the synthesis stage, the generated eblif # will reamin the same, thus making it unnecessary to continue the # rebuild process. if (not any_dep_differ) and req_exists(paths): sfprint(2, f'Skipping rebuild of `' f'{Style.BRIGHT + dep + Style.RESET_ALL}` because all ' f'of it\'s dependencies remained unchanged') return True stage_values = self.cfg.get_r_env(provider.name).values modrunctx = config_mod_runctx(provider, self.cfg.platform, stage_values, self.dep_paths, self.cfg.get_dependency_overrides()) module_exec(provider.module, modrunctx) self.run_stages.discard(provider.name) if not req_exists(paths): raise DependencyNotProducedException(dep, provider.name) return True def execute(self): self._build_dep(self.target) if self.symbicache: update_dep_statuses(self.dep_paths[self.target], '__target', self.symbicache) sfprint(0, f'Target `{Style.BRIGHT + self.target + Style.RESET_ALL}` ' f'-> {self.dep_paths[self.target]}') def display_dep_info(stages: 'Iterable[Stage]'): sfprint(0, 'Platform dependencies/targets:') longest_out_name_len = 0 for stage in stages: for out in stage.produces: l = len(out.name) if l > longest_out_name_len: longest_out_name_len = l desc_indent = longest_out_name_len + 7 nl_indentstr = '\n' for _ in range(0, desc_indent): nl_indentstr += ' ' for stage in stages: for out in stage.produces: pname = Style.BRIGHT + out.name + Style.RESET_ALL indent = '' for _ in range(0, desc_indent - len(pname) + 3): indent += ' ' specstr = '???' if out.spec == 'req': specstr = f'{Fore.BLUE}guaranteed{Fore.RESET}' elif out.spec == 'maybe': specstr = f'{Fore.YELLOW}not guaranteed{Fore.RESET}' elif out.spec == 'demand': specstr = f'{Fore.RED}on-demand{Fore.RESET}' pgen = f'{Style.DIM}stage: `{stage.name}`, '\ f'spec: {specstr}{Style.RESET_ALL}' pdesc = stage.meta[out.name].replace('\n', nl_indentstr) sfprint(0, f' {Style.BRIGHT + out.name + Style.RESET_ALL}:' f'{indent}{pdesc}{nl_indentstr}{pgen}') def display_stage_info(stage: Stage): if stage is None: sfprint(0, f'Stage does not exist') sfbuild_fail() return sfprint(0, f'Stage `{Style.BRIGHT}{stage.name}{Style.RESET_ALL}`:') sfprint(0, f' Module: `{Style.BRIGHT}{stage.module.name}{Style.RESET_ALL}`') sfprint(0, f' Module info:') mod_info = get_module_info(stage.module) mod_info = '\n '.join(mod_info.split('\n')) sfprint(0, f' {mod_info}') sfbuild_done_str = Style.BRIGHT + Fore.GREEN + 'DONE' sfbuild_silent = 0 def sfbuild_fail(): global sfbuild_done_str sfbuild_done_str = Style.BRIGHT + Fore.RED + 'FAILED' def sfbuild_done(): sfprint(1, f'sfbuild: {sfbuild_done_str}' f'{Style.RESET_ALL + Fore.RESET}') exit(0) def setup_resolution_env(): """ Sets up a ResolutionEnv with sfbuild's default built-ins. """ r_env = ResolutionEnv({ 'shareDir': share_dir_path, 'binDir': os.path.realpath(os.path.join(share_dir_path, '../../bin')) }) r_env.add_values(sf_ugly.generate_values()) return r_env def open_project_flow_config(path: str) -> ProjectFlowConfig: try: flow_cfg = open_project_flow_cfg(path) except FileNotFoundError as _: fatal(-1, 'The provided flow configuration file does not exist') return flow_cfg def verify_platform_stage_params(flow_cfg: FlowConfig, platform: 'str | None' = None, stage: 'str | None' = None): if platform: if not verify_platform_name(platform, mypath): sfprint(0, f'Platform `{platform}`` is unsupported.') return False if args.platform not in flow_cfg.platforms(): sfprint(0, f'Platform `{platform}`` is not in project.') return False if stage: if not verify_stage(platform, stage, mypath): sfprint(0, f'Stage `{stage}` is invalid.') sfbuild_fail() return False return True def get_platform_name_for_part(part_name: str): """ Gets a name that identifies the platform setup required for a specific chip. The reason for such distinction is that plenty of chips with different names differ only in a type of package they use. """ d: dict with open(os.path.join(mypath, 'part_db/parts.json')) as f: d = json.loads(f.read()) return d.get(part_name.upper()) def cmd_build(args: Namespace): """ sfbuild's `build` command implementation """ project_flow_cfg: ProjectFlowConfig = None platform = args.platform if platform is None: if args.part: platform = get_platform_name_for_part(args.part) if args.flow: project_flow_cfg = open_project_flow_config(args.flow) elif platform is not None: project_flow_cfg = ProjectFlowConfig('.temp.flow.json') project_flow_cfg.flow_cfg = get_cli_flow_config(args, platform) if platform is None and project_flow_cfg is not None: platform = project_flow_cfg.get_default_platform() if platform is None: fatal(-1, 'You have to specify a platform name or a part name or ' 'configure a default platform.') if platform is None or project_flow_cfg is None: fatal(-1, 'No configuration was provided. Use `--flow`, `--platform` or ' '`--part` to configure flow..') platform_path = os.path.join(mypath, 'platforms', platform + '.json') platform_def = None try: with open(platform_path) as platform_file: platform_def = platform_file.read() except FileNotFoundError as _: fatal(-1, f'The platform flow definition file {platform_path} for the platform ' f'{platform} referenced in flow definition file {args.flow} ' 'cannot be found.') r_env = setup_resolution_env() sfprint(2, 'Scanning modules...') scan_modules(mypath) flow_definition_dict = json.loads(platform_def) flow_def = FlowDefinition(flow_definition_dict, r_env) flow_cfg = FlowConfig(project_flow_cfg, flow_def, platform) if len(flow_cfg.stages) == 0: fatal(-1, 'Platform flow does not define any stage') if args.info: display_dep_info(flow_cfg.stages.values()) sfbuild_done() if args.stageinfo: display_stage_info(flow_cfg.stages.get(args.stageinfo[0])) sfbuild_done() target = args.target if target is None: target = project_flow_cfg.get_default_target(platform) if target is None: fatal(-1, 'Please specify desired target using `--target` option ' 'or configure a default target.') flow = Flow( target=target, cfg=flow_cfg, symbicache=SymbiCache(SYMBICACHEPATH) if not args.nocache else None ) dep_print_verbosity = 0 if args.pretend else 2 sfprint(dep_print_verbosity, '\nProject status:') flow.print_resolved_dependencies(dep_print_verbosity) sfprint(dep_print_verbosity, '') if args.pretend: sfbuild_done() try: flow.execute() except Exception as e: sfprint(0, e) sfbuild_fail() if flow.symbicache: flow.symbicache.save() def cmd_show_dependencies(args: Namespace): """ sfbuild's `showd` command implementation """ flow_cfg = open_project_flow_config(args.flow) if not verify_platform_stage_params(flow_cfg, args.platform): sfbuild_fail() return platform_overrides: 'set | None' = None if args.platform is not None: platform_overrides = \ set(flow_cfg.get_dependency_platform_overrides(args.platform).keys()) display_list = [] raw_deps = flow_cfg.get_dependencies_raw(args.platform) for dep_name, dep_paths in raw_deps.items(): prstr: str if (platform_overrides is not None) and (dep_name in platform_overrides): prstr = f'{Style.DIM}({args.platform}){Style.RESET_ALL} ' \ f'{Style.BRIGHT + dep_name + Style.RESET_ALL}: {dep_paths}' else: prstr = f'{Style.BRIGHT + dep_name + Style.RESET_ALL}: {dep_paths}' display_list.append((dep_name, prstr)) display_list.sort(key = lambda p: p[0]) for _, prstr in display_list: sfprint(0, prstr) set_verbosity_level(-1) if __name__ == '__main__': parser = setup_argparser() args = parser.parse_args() set_verbosity_level(args.verbose - (1 if args.silent else 0)) if args.command == 'build': cmd_build(args) sfbuild_done() if args.command == 'showd': cmd_show_dependencies(args) sfbuild_done() sfprint(0, 'Please use a command.\nUse `--help` flag to learn more.') sfbuild_done()