Skip to content

Commit 1dc4895

Browse files
authored
multiprocessing: Don't create pool many times (#297)
Have a single process pool instead of recreating it every time pass(es) are to be executed. We only need to make sure to cancel running jobs. This brings a significant speedup especially for small cases or for late stages of reductions in general, where it takes just a fraction of a second to discover a reduction. Now we don't have to pay the price for initializing the Pebble pool - hundreds of milliseconds - repeatedly. While empirical measurements show 1.5x-2x speedup, unfortunately most of it will be "eaten" by the switch to "forkserver" multiprocessing mode - the latter seems unavoidable to fix occasional deadlocks.
1 parent 0316a25 commit 1dc4895

File tree

1 file changed

+68
-63
lines changed

1 file changed

+68
-63
lines changed

cvise/utils/testing.py

Lines changed: 68 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -414,14 +414,24 @@ def __init__(
414414
== 0
415415
)
416416

417+
self.mpmanager = multiprocessing.Manager()
417418
self.mplogger = mplogging.MPLogger(self.parallel_tests)
419+
self.worker_pool: Union[pebble.ProcessPool, None] = None
418420

419421
def __enter__(self):
422+
self.worker_pool = pebble.ProcessPool(
423+
max_workers=self.parallel_tests,
424+
initializer=_init_worker_process,
425+
initargs=[self.mplogger.worker_process_initializer()],
426+
)
427+
self.exit_stack.enter_context(self.worker_pool)
420428
self.exit_stack.enter_context(self.mplogger)
421429
return self
422430

423431
def __exit__(self, exc_type, exc_val, exc_tb):
424432
self.exit_stack.__exit__(exc_type, exc_val, exc_tb)
433+
self.worker_pool.stop()
434+
self.worker_pool = None
425435

426436
def remove_roots(self):
427437
if self.save_temps:
@@ -739,61 +749,51 @@ def maybe_update_success_candidate(
739749
new.take_file_ownership(env.test_case_path)
740750
self.success_candidate = new
741751

742-
def terminate_all(self, pool):
752+
def terminate_all(self):
743753
for job in self.jobs:
744754
self.mplogger.ignore_logs_from_job(job.order)
745-
pool.stop()
746-
pool.join()
755+
job.future.cancel()
756+
self.release_all_jobs()
747757

748758
def run_parallel_tests(self) -> None:
749759
assert not self.jobs
750-
with pebble.ProcessPool(
751-
max_workers=self.parallel_tests,
752-
initializer=_init_worker_process,
753-
initargs=[self.mplogger.worker_process_initializer()],
754-
) as pool:
755-
try:
756-
self.current_batch_start_order = self.order
757-
self.next_pass_id = 0
758-
self.giveup_reported = False
759-
assert self.success_candidate is None
760-
if self.interleaving:
761-
self.folding_manager = FoldingManager()
762-
763-
for pass_id, ctx in enumerate(self.pass_contexts):
764-
# Clean up the information about previously running jobs.
765-
ctx.running_transform_order_to_state = {}
766-
# Unfinished initializations from the last run will need to be restarted.
767-
if ctx.stage == PassStage.IN_INIT:
768-
ctx.stage = PassStage.BEFORE_INIT
769-
# Previously finished passes are eligible for reinitialization (used for "interleaving" mode only -
770-
# in the old single-pass mode we're expected to return to let subsequent passes work).
771-
if (
772-
self.interleaving
773-
and ctx.stage == PassStage.ENUMERATING
774-
and ctx.state is None
775-
and pass_id not in self.pass_reinit_queue
776-
):
777-
self.pass_reinit_queue.append(pass_id)
778-
779-
while self.jobs or any(c.can_start_job_now() for c in self.pass_contexts):
780-
sigmonitor.maybe_retrigger_action()
781-
782-
# schedule new jobs, as long as there are free workers
783-
while len(self.jobs) < self.parallel_tests and self.maybe_schedule_job(pool):
784-
pass
760+
self.current_batch_start_order = self.order
761+
self.next_pass_id = 0
762+
self.giveup_reported = False
763+
assert self.success_candidate is None
764+
if self.interleaving:
765+
self.folding_manager = FoldingManager()
766+
767+
for pass_id, ctx in enumerate(self.pass_contexts):
768+
# Clean up the information about previously running jobs.
769+
ctx.running_transform_order_to_state = {}
770+
# Unfinished initializations from the last run will need to be restarted.
771+
if ctx.stage == PassStage.IN_INIT:
772+
ctx.stage = PassStage.BEFORE_INIT
773+
# Previously finished passes are eligible for reinitialization (used for "interleaving" mode only -
774+
# in the old single-pass mode we're expected to return to let subsequent passes work).
775+
if (
776+
self.interleaving
777+
and ctx.stage == PassStage.ENUMERATING
778+
and ctx.state is None
779+
and pass_id not in self.pass_reinit_queue
780+
):
781+
self.pass_reinit_queue.append(pass_id)
785782

786-
# no more jobs could be scheduled at the moment - wait for some results
787-
wait([j.future for j in self.jobs], return_when=FIRST_COMPLETED, timeout=self.EVENT_LOOP_TIMEOUT)
788-
self.process_done_futures()
783+
while self.jobs or any(c.can_start_job_now() for c in self.pass_contexts):
784+
sigmonitor.maybe_retrigger_action()
789785

790-
# exit if we found successful transformation(s) and don't want to try better ones
791-
if self.success_candidate and self.should_proceed_with_success_candidate():
792-
break
786+
# schedule new jobs, as long as there are free workers
787+
while len(self.jobs) < self.parallel_tests and self.maybe_schedule_job():
788+
pass
793789

794-
finally:
795-
# Abort running jobs - by default the process pool waits for the ongoing jobs' completion.
796-
self.terminate_all(pool)
790+
# no more jobs could be scheduled at the moment - wait for some results
791+
wait([j.future for j in self.jobs], return_when=FIRST_COMPLETED, timeout=self.EVENT_LOOP_TIMEOUT)
792+
self.process_done_futures()
793+
794+
# exit if we found successful transformation(s) and don't want to try better ones
795+
if self.success_candidate and self.should_proceed_with_success_candidate():
796+
break
797797

798798
def run_passes(self, passes: List[AbstractPass], interleaving: bool):
799799
assert len(passes) == 1 or interleaving
@@ -813,8 +813,7 @@ def run_passes(self, passes: List[AbstractPass], interleaving: bool):
813813
self.pass_contexts.append(PassContext.create(pass_))
814814
self.interleaving = interleaving
815815
self.jobs = []
816-
m = multiprocessing.Manager()
817-
self.pid_queue = m.Queue()
816+
self.pid_queue = self.mpmanager.Queue()
818817
cache_key = repr([c.pass_ for c in self.pass_contexts])
819818

820819
pass_titles = ', '.join(repr(c.pass_) for c in self.pass_contexts)
@@ -858,7 +857,10 @@ def run_passes(self, passes: List[AbstractPass], interleaving: bool):
858857
self.log_key_event('toggle print diff')
859858
self.print_diff = not self.print_diff
860859

861-
self.run_parallel_tests()
860+
try:
861+
self.run_parallel_tests()
862+
finally:
863+
self.terminate_all()
862864
self.kill_pid_queue()
863865

864866
is_success = self.success_candidate is not None
@@ -875,7 +877,6 @@ def run_passes(self, passes: List[AbstractPass], interleaving: bool):
875877
)
876878
break
877879

878-
self.release_all_jobs()
879880
if not is_success:
880881
break
881882

@@ -903,7 +904,7 @@ def run_passes(self, passes: List[AbstractPass], interleaving: bool):
903904
except (KeyboardInterrupt, SystemExit):
904905
logging.info('Exiting now ...')
905906
# Clean temporary files for all jobs and passes.
906-
self.release_all_jobs()
907+
self.terminate_all()
907908
self.remove_roots()
908909
sys.exit(1)
909910

@@ -982,12 +983,12 @@ def should_proceed_with_success_candidate(self):
982983
self.order - self.current_batch_start_order, self.parallel_tests, len(self.pass_contexts)
983984
)
984985

985-
def maybe_schedule_job(self, pool: pebble.ProcessPool) -> bool:
986+
def maybe_schedule_job(self) -> bool:
986987
# The order matters below - higher-priority job types come earlier:
987988
# 1. Initializing a pass regularly (at the beginning of the batch of jobs).
988989
for pass_id, ctx in enumerate(self.pass_contexts):
989990
if ctx.can_init_now():
990-
self.schedule_init(pool, pass_id)
991+
self.schedule_init(pass_id)
991992
return True
992993
# 2. Reinitializing a previously finished pass.
993994
# We throttle reinits (only once out of REINIT_JOB_INTERVAL jobs) because they're only occasionally useful: for
@@ -1004,7 +1005,7 @@ def maybe_schedule_job(self, pool: pebble.ProcessPool) -> bool:
10041005
assert ctx.state is None
10051006
ctx.stage = PassStage.BEFORE_INIT
10061007
self.last_reinit_job_order = self.order
1007-
self.schedule_init(pool, pass_id)
1008+
self.schedule_init(pass_id)
10081009
return True
10091010
# 3. Attempting a fold (simultaneous application) of previously discovered successful transformations; only
10101011
# supported in the "interleaving" pass execution mode.
@@ -1014,18 +1015,18 @@ def maybe_schedule_job(self, pool: pebble.ProcessPool) -> bool:
10141015
self.success_candidate.pass_state if self.success_candidate else None,
10151016
)
10161017
if folding_state:
1017-
self.schedule_fold(pool, folding_state)
1018+
self.schedule_fold(folding_state)
10181019
return True
10191020
# 4. Attempting a transformation using the next heuristic in the round-robin fashion.
10201021
if any(ctx.can_transform_now() for ctx in self.pass_contexts):
10211022
while not self.pass_contexts[self.next_pass_id].can_transform_now():
10221023
self.next_pass_id = (self.next_pass_id + 1) % len(self.pass_contexts)
1023-
self.schedule_transform(pool, self.next_pass_id)
1024+
self.schedule_transform(self.next_pass_id)
10241025
self.next_pass_id = (self.next_pass_id + 1) % len(self.pass_contexts)
10251026
return True
10261027
return False
10271028

1028-
def schedule_init(self, pool: pebble.ProcessPool, pass_id: int) -> None:
1029+
def schedule_init(self, pass_id: int) -> None:
10291030
ctx = self.pass_contexts[pass_id]
10301031
assert ctx.can_init_now()
10311032

@@ -1047,7 +1048,7 @@ def schedule_init(self, pool: pebble.ProcessPool, pass_id: int) -> None:
10471048
job_timeout=self.timeout,
10481049
pid_queue=self.pid_queue,
10491050
)
1050-
future = pool.schedule(_worker_process_job_wrapper, args=[self.order, env.run])
1051+
future = self.worker_pool.schedule(_worker_process_job_wrapper, args=[self.order, env.run])
10511052
self.jobs.append(
10521053
Job(
10531054
type=JobType.INIT,
@@ -1064,7 +1065,7 @@ def schedule_init(self, pool: pebble.ProcessPool, pass_id: int) -> None:
10641065
ctx.stage = PassStage.IN_INIT
10651066
self.order += 1
10661067

1067-
def schedule_transform(self, pool: pebble.ProcessPool, pass_id: int) -> None:
1068+
def schedule_transform(self, pass_id: int) -> None:
10681069
ctx = self.pass_contexts[pass_id]
10691070
assert ctx.can_transform_now()
10701071
assert ctx.state is not None
@@ -1085,7 +1086,9 @@ def schedule_transform(self, pool: pebble.ProcessPool, pass_id: int) -> None:
10851086
ctx.pass_.transform,
10861087
self.pid_queue,
10871088
)
1088-
future = pool.schedule(_worker_process_job_wrapper, args=[self.order, env.run], timeout=self.timeout)
1089+
future = self.worker_pool.schedule(
1090+
_worker_process_job_wrapper, args=[self.order, env.run], timeout=self.timeout
1091+
)
10891092
self.jobs.append(
10901093
Job(
10911094
type=JobType.TRANSFORM,
@@ -1104,7 +1107,7 @@ def schedule_transform(self, pool: pebble.ProcessPool, pass_id: int) -> None:
11041107
self.order += 1
11051108
ctx.state = ctx.pass_.advance(self.current_test_case, ctx.state)
11061109

1107-
def schedule_fold(self, pool: pebble.ProcessPool, folding_state: FoldingStateIn) -> None:
1110+
def schedule_fold(self, folding_state: FoldingStateIn) -> None:
11081111
assert self.interleaving
11091112

11101113
should_copy_test_cases = False # the fold transform creates the files itself
@@ -1120,7 +1123,9 @@ def schedule_fold(self, pool: pebble.ProcessPool, folding_state: FoldingStateIn)
11201123
FoldingManager.transform,
11211124
self.pid_queue,
11221125
)
1123-
future = pool.schedule(_worker_process_job_wrapper, args=[self.order, env.run], timeout=self.timeout)
1126+
future = self.worker_pool.schedule(
1127+
_worker_process_job_wrapper, args=[self.order, env.run], timeout=self.timeout
1128+
)
11241129
self.jobs.append(
11251130
Job(
11261131
type=JobType.TRANSFORM,

0 commit comments

Comments
 (0)