f4cache: Handle status checks without updates. Fix and improve project status reporting

Signed-off-by: Krzysztof Boronski <kboronski@antmicro.com>
This commit is contained in:
Krzysztof Boronski 2022-05-27 13:36:41 -05:00
parent 47733138a3
commit 607e303e93
3 changed files with 86 additions and 29 deletions

View File

@ -162,6 +162,8 @@ def dep_differ(paths, consumer: str, f4cache: F4Cache):
""" """
if type(paths) is str: if type(paths) is str:
if not Path(paths).exists():
return True
return f4cache.get_status(paths, consumer) != 'same' return f4cache.get_status(paths, consumer) != 'same'
elif type(paths) is list: elif type(paths) is list:
return True in [dep_differ(p, consumer, f4cache) for p in paths] return True in [dep_differ(p, consumer, f4cache) for p in paths]
@ -196,6 +198,10 @@ def config_mod_runctx(stage: Stage, values: 'dict[str, ]',
dep_paths, config_paths) dep_paths, config_paths)
return ModRunCtx(share_dir_path, binpath, config) return ModRunCtx(share_dir_path, binpath, config)
def _process_dep_path(path: str, f4cache: F4Cache):
f4cache.process_file(Path(path))
_cache_deps = deep(_process_dep_path)
class Flow: class Flow:
""" Describes a complete, configured flow, ready for execution. """ """ Describes a complete, configured flow, ready for execution. """
@ -222,9 +228,11 @@ class Flow:
self.os_map = map_outputs_to_stages(cfg.stages.values()) self.os_map = map_outputs_to_stages(cfg.stages.values())
explicit_deps = cfg.get_dependency_overrides() explicit_deps = cfg.get_dependency_overrides()
# print(explicit_deps)
self.dep_paths = dict(filter_existing_deps(explicit_deps, f4cache)) self.dep_paths = dict(filter_existing_deps(explicit_deps, f4cache))
if f4cache is not None:
for dep in self.dep_paths.values():
_cache_deps(dep, f4cache)
self.run_stages = set() self.run_stages = set()
self.f4cache = f4cache self.f4cache = f4cache
self.cfg = cfg self.cfg = cfg
@ -239,7 +247,11 @@ class Flow:
self.os_map, self.run_stages, self.os_map, self.run_stages,
self.f4cache) self.f4cache)
def _resolve_dependencies(self, dep: str, stages_checked: 'set[str]'): def _resolve_dependencies(self, dep: str, stages_checked: 'set[str]',
skip_dep_warnings: 'set[str]' = None):
if skip_dep_warnings is None:
skip_dep_warnings = set()
# Initialize the dependency status if necessary # Initialize the dependency status if necessary
if self.deps_rebuilds.get(dep) is None: if self.deps_rebuilds.get(dep) is None:
self.deps_rebuilds[dep] = 0 self.deps_rebuilds[dep] = 0
@ -256,7 +268,7 @@ class Flow:
# config if it is. # config if it is.
for take in provider.takes: for take in provider.takes:
self._resolve_dependencies(take.name, stages_checked) self._resolve_dependencies(take.name, stages_checked, skip_dep_warnings)
# If any of the required dependencies is unavailable, then the # If any of the required dependencies is unavailable, then the
# provider stage cannot be run # provider stage cannot be run
take_paths = self.dep_paths.get(take.name) take_paths = self.dep_paths.get(take.name)
@ -267,8 +279,21 @@ class Flow:
_print_unreachable_stage_message(provider, take) _print_unreachable_stage_message(provider, take)
return return
if self._dep_will_differ(take.name, take_paths, provider.name): will_differ = False
sfprint(2, f'{take.name} is causing rebuild for {provider.name}') if take_paths is None:
# TODO: This won't trigger rebuild if an optional dependency got removed
will_differ = False
elif req_exists(take_paths):
will_differ = self._dep_will_differ(take.name, take_paths, provider.name)
else:
will_differ = True
if will_differ:
if take.name not in skip_dep_warnings:
sfprint(2, f'{Style.BRIGHT}{take.name}{Style.RESET_ALL} is causing '
f'rebuild for `{Style.BRIGHT}{provider.name}{Style.RESET_ALL}`')
skip_dep_warnings.add(take.name)
self.run_stages.add(provider.name) self.run_stages.add(provider.name)
self.deps_rebuilds[take.name] += 1 self.deps_rebuilds[take.name] += 1
@ -277,6 +302,9 @@ class Flow:
self.cfg.get_dependency_overrides()) self.cfg.get_dependency_overrides())
outputs = module_map(provider.module, modrunctx) outputs = module_map(provider.module, modrunctx)
for output_paths in outputs.values():
if req_exists(output_paths) and self.f4cache:
_cache_deps(output_paths, self.f4cache)
stages_checked.add(provider.name) stages_checked.add(provider.name)
self.dep_paths.update(outputs) self.dep_paths.update(outputs)
@ -287,7 +315,6 @@ class Flow:
# Verify module's outputs and add paths as values. # Verify module's outputs and add paths as values.
outs = outputs.keys() outs = outputs.keys()
# print(outs)
for o in provider.produces: for o in provider.produces:
if o.name not in outs: if o.name not in outs:
if o.spec == 'req' or (o.spec == 'demand' and \ if o.spec == 'req' or (o.spec == 'demand' and \
@ -350,7 +377,7 @@ class Flow:
if req_exists(paths) and not run: if req_exists(paths) and not run:
return True return True
else: else:
assert(provider) assert provider
any_dep_differ = False if (self.f4cache is not None) else True any_dep_differ = False if (self.f4cache is not None) else True
for p_dep in provider.takes: for p_dep in provider.takes:
@ -382,17 +409,22 @@ class Flow:
self.run_stages.discard(provider.name) self.run_stages.discard(provider.name)
if not req_exists(paths): for product in provider.produces:
exists = req_exists(paths)
if (product.spec == 'req') and not exists:
raise DependencyNotProducedException(dep, provider.name) raise DependencyNotProducedException(dep, provider.name)
if exists and self.f4cache:
_cache_deps(self.dep_paths[product.name], self.f4cache)
return True return True
def execute(self): def execute(self):
self._build_dep(self.target) self._build_dep(self.target)
if self.f4cache: if self.f4cache:
_cache_deps(self.dep_paths[self.target], self.f4cache)
update_dep_statuses(self.dep_paths[self.target], '__target', update_dep_statuses(self.dep_paths[self.target], '__target',
self.f4cache) self.f4cache)
sfprint(0, f'Target `{Style.BRIGHT + self.target + Style.RESET_ALL}` ' sfprint(0, f'Target {Style.BRIGHT + self.target + Style.RESET_ALL} '
f'-> {self.dep_paths[self.target]}') f'-> {self.dep_paths[self.target]}')
def display_dep_info(stages: 'Iterable[Stage]'): def display_dep_info(stages: 'Iterable[Stage]'):
@ -595,8 +627,10 @@ def cmd_build(args: Namespace):
try: try:
flow.execute() flow.execute()
except AssertionError as e:
raise e
except Exception as e: except Exception as e:
sfprint(0, e) sfprint(0, f'{e}')
sfbuild_fail() sfbuild_fail()
if flow.f4cache: if flow.f4cache:

View File

@ -4,6 +4,12 @@ from json import dump as json_dump, load as json_load, JSONDecodeError
from f4pga.common import sfprint from f4pga.common import sfprint
def _get_hash(path: Path):
if not path.is_dir():
with path.open('rb') as rfptr:
return zlib_adler32(rfptr.read())
return 0 # Directories always get '0' hash.
class F4Cache: class F4Cache:
""" """
`F4Cache` is used to track changes among dependencies and keep the status of the files on a persistent storage. `F4Cache` is used to track changes among dependencies and keep the status of the files on a persistent storage.
@ -12,6 +18,7 @@ class F4Cache:
""" """
hashes: 'dict[str, dict[str, str]]' hashes: 'dict[str, dict[str, str]]'
current_hashes: 'dict[str, str]'
status: 'dict[str, str]' status: 'dict[str, str]'
cachefile_path: str cachefile_path: str
@ -21,6 +28,7 @@ class F4Cache:
""" """
self.status = {} self.status = {}
self.current_hashes = {}
self.cachefile_path = cachefile_path self.cachefile_path = cachefile_path
self.load() self.load()
@ -43,6 +51,12 @@ class F4Cache:
self.status[path] = {} self.status[path] = {}
self.status[path][consumer] = status self.status[path][consumer] = status
def process_file(self, path: Path):
""" Process file for tracking with f4cache. """
hash = _get_hash(path)
self.current_hashes[path.as_posix()] = hash
def update(self, path: Path, consumer: str): def update(self, path: Path, consumer: str):
""" Add/remove a file to.from the tracked files, update checksum if necessary and calculate status. """ Add/remove a file to.from the tracked files, update checksum if necessary and calculate status.
@ -51,34 +65,41 @@ class F4Cache:
by a module within the active flow. by a module within the active flow.
""" """
exists = path.exists() posix_path = path.as_posix()
isdir = path.is_dir() assert self.current_hashes.get(posix_path) is not None
if not exists:
self._try_pop_consumer(path.as_posix(), consumer) if not path.exists():
self._try_pop_consumer(posix_path, consumer)
return True return True
hash = 0 # Directories always get '0' hash.
if (not isdir) and exists:
with path.open('rb') as rfptr:
hash = str(zlib_adler32(rfptr.read()))
last_hashes = self.hashes.get(path.as_posix()) hash = self.current_hashes[posix_path]
last_hashes = self.hashes.get(posix_path)
last_hash = None if last_hashes is None else last_hashes.get(consumer) last_hash = None if last_hashes is None else last_hashes.get(consumer)
if hash != last_hash: if hash != last_hash:
self._try_push_consumer_status(path.as_posix(), consumer, 'changed') self._try_push_consumer_status(posix_path, consumer, 'changed')
self._try_push_consumer_hash(path.as_posix(), consumer, hash) self._try_push_consumer_hash(posix_path, consumer, hash)
return True return True
self._try_push_consumer_status(path.as_posix(), consumer, 'same') self._try_push_consumer_status(posix_path, consumer, 'same')
return False return False
def get_status(self, path: str, consumer: str): def get_status(self, path: str, consumer: str):
""" Get status for a file with a given path. """ Get status for a file with a given path.
returns 'untracked' if the file is not tracked or hasn't been treated with `update` procedure before calling returns 'untracked' if the file is not tracked.
`get_status`.
""" """
assert self.current_hashes.get(path) is not None
statuses = self.status.get(path) statuses = self.status.get(path)
if not statuses: if not statuses:
hashes = self.hashes.get(path)
if hashes is not None:
last_hash = hashes.get(consumer)
if last_hash is not None:
if self.current_hashes[path] != last_hash:
return 'changed'
return 'same'
return 'untracked' return 'untracked'
status = statuses.get(consumer) status = statuses.get(consumer)
if not status: if not status:

View File

@ -71,11 +71,13 @@ def deep(fun):
""" """
def d(paths, *args, **kwargs): def d(paths, *args, **kwargs):
if type(paths) is str: if type(paths) is str:
return fun(paths) return fun(paths, *args, **kwargs)
elif type(paths) is list: elif type(paths) is list:
return [d(p) for p in paths]; return [d(p, *args, **kwargs) for p in paths];
elif type(paths) is dict: elif type(paths) is dict:
return dict([(k, d(p)) for k, p in paths.items()]) return dict([(k, d(p, *args, **kwargs)) for k, p in paths.items()])
else:
raise RuntimeError(f'paths is of type {type(paths)}')
return d return d