From dd12a78587f349fd77a1b41f30035e2724a03e72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Boczar?= Date: Thu, 13 Feb 2020 10:23:32 +0100 Subject: [PATCH] test: reduce disk usage when running benchmarks in parallel --- test/run_benchmarks.py | 56 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/test/run_benchmarks.py b/test/run_benchmarks.py index c90b03e..5fb5ce5 100755 --- a/test/run_benchmarks.py +++ b/test/run_benchmarks.py @@ -456,19 +456,59 @@ def run_single_benchmark(func_args): return result +InQueueItem = namedtuple('InQueueItem', ['index', 'config']) +OutQueueItem = namedtuple('OutQueueItem', ['index', 'result']) + + +def run_parallel(configurations, output_base_dir, njobs, ignore_failures): + from multiprocessing import Process, Queue + import queue + + def worker(in_queue, out_queue, out_dir): + while True: + in_item = in_queue.get() + if in_item is None: + return + result = run_single_benchmark((in_item.config, out_dir, ignore_failures)) + out_queue.put(OutQueueItem(in_item.index, result)) + + if njobs == 0: + njobs = os.cpu_count() + print('Using {:d} parallel jobs'.format(njobs)) + + # use one directory per worker, as running each benchmark in separate directory + # takes too much disk space (~2GB per 100 benchmarks) + dir_pool = [os.path.join(output_base_dir, 'worker_%02d' % i) for i in range(njobs)] + + in_queue, out_queue = Queue(), Queue() + workers = [Process(target=worker, args=(in_queue, out_queue, dir)) for dir in dir_pool] + for w in workers: + w.start() + + # put all benchmark configurations with index to retrieve them in order + for i, config in enumerate(configurations): + in_queue.put(InQueueItem(i, config)) + + # send "finish signal" for each worker + for _ in workers: + in_queue.put(None) + + # retrieve results in proper order + out_items = [out_queue.get() for _ in configurations] + results = [out.result for out in sorted(out_items, key=lambda o: o.index)] + + for p in workers: + p.join() + + return results + + def run_benchmarks(configurations, output_base_dir, njobs, ignore_failures): print('Running {:d} benchmarks ...'.format(len(configurations))) if njobs == 1: results = [run_single_benchmark((config, output_base_dir, ignore_failures)) for config in configurations] else: - import multiprocessing - func_args = [(config, os.path.join(output_base_dir, config.name.replace(' ', '_')), ignore_failures) - for config in configurations] - if njobs == 0: - njobs = os.cpu_count() - print('Using {:d} parallel jobs'.format(njobs)) - with multiprocessing.Pool(processes=njobs) as pool: - results = pool.map(run_single_benchmark, func_args) + results = run_parallel(configurations, output_base_dir, njobs, ignore_failures) run_data = [RunCache.RunData(config, result) for config, result in zip(configurations, results)] return run_data