test: reduce disk usage when running benchmarks in parallel

This commit is contained in:
Jędrzej Boczar 2020-02-13 10:23:32 +01:00
parent 5cd33f490f
commit dd12a78587
1 changed files with 48 additions and 8 deletions

View File

@ -456,19 +456,59 @@ def run_single_benchmark(func_args):
return result
InQueueItem = namedtuple('InQueueItem', ['index', 'config'])
OutQueueItem = namedtuple('OutQueueItem', ['index', 'result'])
def run_parallel(configurations, output_base_dir, njobs, ignore_failures):
from multiprocessing import Process, Queue
import queue
def worker(in_queue, out_queue, out_dir):
while True:
in_item = in_queue.get()
if in_item is None:
return
result = run_single_benchmark((in_item.config, out_dir, ignore_failures))
out_queue.put(OutQueueItem(in_item.index, result))
if njobs == 0:
njobs = os.cpu_count()
print('Using {:d} parallel jobs'.format(njobs))
# use one directory per worker, as running each benchmark in separate directory
# takes too much disk space (~2GB per 100 benchmarks)
dir_pool = [os.path.join(output_base_dir, 'worker_%02d' % i) for i in range(njobs)]
in_queue, out_queue = Queue(), Queue()
workers = [Process(target=worker, args=(in_queue, out_queue, dir)) for dir in dir_pool]
for w in workers:
w.start()
# put all benchmark configurations with index to retrieve them in order
for i, config in enumerate(configurations):
in_queue.put(InQueueItem(i, config))
# send "finish signal" for each worker
for _ in workers:
in_queue.put(None)
# retrieve results in proper order
out_items = [out_queue.get() for _ in configurations]
results = [out.result for out in sorted(out_items, key=lambda o: o.index)]
for p in workers:
p.join()
return results
def run_benchmarks(configurations, output_base_dir, njobs, ignore_failures):
print('Running {:d} benchmarks ...'.format(len(configurations)))
if njobs == 1:
results = [run_single_benchmark((config, output_base_dir, ignore_failures)) for config in configurations]
else:
import multiprocessing
func_args = [(config, os.path.join(output_base_dir, config.name.replace(' ', '_')), ignore_failures)
for config in configurations]
if njobs == 0:
njobs = os.cpu_count()
print('Using {:d} parallel jobs'.format(njobs))
with multiprocessing.Pool(processes=njobs) as pool:
results = pool.map(run_single_benchmark, func_args)
results = run_parallel(configurations, output_base_dir, njobs, ignore_failures)
run_data = [RunCache.RunData(config, result) for config, result in zip(configurations, results)]
return run_data