Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 68 additions & 63 deletions cvise/utils/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,24 @@ def __init__(
== 0
)

self.mpmanager = multiprocessing.Manager()
self.mplogger = mplogging.MPLogger(self.parallel_tests)
self.worker_pool: Union[pebble.ProcessPool, None] = None

def __enter__(self):
self.worker_pool = pebble.ProcessPool(
max_workers=self.parallel_tests,
initializer=_init_worker_process,
initargs=[self.mplogger.worker_process_initializer()],
)
self.exit_stack.enter_context(self.worker_pool)
self.exit_stack.enter_context(self.mplogger)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.exit_stack.__exit__(exc_type, exc_val, exc_tb)
self.worker_pool.stop()
self.worker_pool = None

def remove_roots(self):
if self.save_temps:
Expand Down Expand Up @@ -739,61 +749,51 @@ def maybe_update_success_candidate(
new.take_file_ownership(env.test_case_path)
self.success_candidate = new

def terminate_all(self, pool):
def terminate_all(self):
for job in self.jobs:
self.mplogger.ignore_logs_from_job(job.order)
pool.stop()
pool.join()
job.future.cancel()
self.release_all_jobs()

def run_parallel_tests(self) -> None:
assert not self.jobs
with pebble.ProcessPool(
max_workers=self.parallel_tests,
initializer=_init_worker_process,
initargs=[self.mplogger.worker_process_initializer()],
) as pool:
try:
self.current_batch_start_order = self.order
self.next_pass_id = 0
self.giveup_reported = False
assert self.success_candidate is None
if self.interleaving:
self.folding_manager = FoldingManager()

for pass_id, ctx in enumerate(self.pass_contexts):
# Clean up the information about previously running jobs.
ctx.running_transform_order_to_state = {}
# Unfinished initializations from the last run will need to be restarted.
if ctx.stage == PassStage.IN_INIT:
ctx.stage = PassStage.BEFORE_INIT
# Previously finished passes are eligible for reinitialization (used for "interleaving" mode only -
# in the old single-pass mode we're expected to return to let subsequent passes work).
if (
self.interleaving
and ctx.stage == PassStage.ENUMERATING
and ctx.state is None
and pass_id not in self.pass_reinit_queue
):
self.pass_reinit_queue.append(pass_id)

while self.jobs or any(c.can_start_job_now() for c in self.pass_contexts):
sigmonitor.maybe_retrigger_action()

# schedule new jobs, as long as there are free workers
while len(self.jobs) < self.parallel_tests and self.maybe_schedule_job(pool):
pass
self.current_batch_start_order = self.order
self.next_pass_id = 0
self.giveup_reported = False
assert self.success_candidate is None
if self.interleaving:
self.folding_manager = FoldingManager()

for pass_id, ctx in enumerate(self.pass_contexts):
# Clean up the information about previously running jobs.
ctx.running_transform_order_to_state = {}
# Unfinished initializations from the last run will need to be restarted.
if ctx.stage == PassStage.IN_INIT:
ctx.stage = PassStage.BEFORE_INIT
# Previously finished passes are eligible for reinitialization (used for "interleaving" mode only -
# in the old single-pass mode we're expected to return to let subsequent passes work).
if (
self.interleaving
and ctx.stage == PassStage.ENUMERATING
and ctx.state is None
and pass_id not in self.pass_reinit_queue
):
self.pass_reinit_queue.append(pass_id)

# no more jobs could be scheduled at the moment - wait for some results
wait([j.future for j in self.jobs], return_when=FIRST_COMPLETED, timeout=self.EVENT_LOOP_TIMEOUT)
self.process_done_futures()
while self.jobs or any(c.can_start_job_now() for c in self.pass_contexts):
sigmonitor.maybe_retrigger_action()

# exit if we found successful transformation(s) and don't want to try better ones
if self.success_candidate and self.should_proceed_with_success_candidate():
break
# schedule new jobs, as long as there are free workers
while len(self.jobs) < self.parallel_tests and self.maybe_schedule_job():
pass

finally:
# Abort running jobs - by default the process pool waits for the ongoing jobs' completion.
self.terminate_all(pool)
# no more jobs could be scheduled at the moment - wait for some results
wait([j.future for j in self.jobs], return_when=FIRST_COMPLETED, timeout=self.EVENT_LOOP_TIMEOUT)
self.process_done_futures()

# exit if we found successful transformation(s) and don't want to try better ones
if self.success_candidate and self.should_proceed_with_success_candidate():
break

def run_passes(self, passes: List[AbstractPass], interleaving: bool):
assert len(passes) == 1 or interleaving
Expand All @@ -813,8 +813,7 @@ def run_passes(self, passes: List[AbstractPass], interleaving: bool):
self.pass_contexts.append(PassContext.create(pass_))
self.interleaving = interleaving
self.jobs = []
m = multiprocessing.Manager()
self.pid_queue = m.Queue()
self.pid_queue = self.mpmanager.Queue()
cache_key = repr([c.pass_ for c in self.pass_contexts])

pass_titles = ', '.join(repr(c.pass_) for c in self.pass_contexts)
Expand Down Expand Up @@ -858,7 +857,10 @@ def run_passes(self, passes: List[AbstractPass], interleaving: bool):
self.log_key_event('toggle print diff')
self.print_diff = not self.print_diff

self.run_parallel_tests()
try:
self.run_parallel_tests()
finally:
self.terminate_all()
self.kill_pid_queue()

is_success = self.success_candidate is not None
Expand All @@ -875,7 +877,6 @@ def run_passes(self, passes: List[AbstractPass], interleaving: bool):
)
break

self.release_all_jobs()
if not is_success:
break

Expand Down Expand Up @@ -903,7 +904,7 @@ def run_passes(self, passes: List[AbstractPass], interleaving: bool):
except (KeyboardInterrupt, SystemExit):
logging.info('Exiting now ...')
# Clean temporary files for all jobs and passes.
self.release_all_jobs()
self.terminate_all()
self.remove_roots()
sys.exit(1)

Expand Down Expand Up @@ -982,12 +983,12 @@ def should_proceed_with_success_candidate(self):
self.order - self.current_batch_start_order, self.parallel_tests, len(self.pass_contexts)
)

def maybe_schedule_job(self, pool: pebble.ProcessPool) -> bool:
def maybe_schedule_job(self) -> bool:
# The order matters below - higher-priority job types come earlier:
# 1. Initializing a pass regularly (at the beginning of the batch of jobs).
for pass_id, ctx in enumerate(self.pass_contexts):
if ctx.can_init_now():
self.schedule_init(pool, pass_id)
self.schedule_init(pass_id)
return True
# 2. Reinitializing a previously finished pass.
# We throttle reinits (only once out of REINIT_JOB_INTERVAL jobs) because they're only occasionally useful: for
Expand All @@ -1004,7 +1005,7 @@ def maybe_schedule_job(self, pool: pebble.ProcessPool) -> bool:
assert ctx.state is None
ctx.stage = PassStage.BEFORE_INIT
self.last_reinit_job_order = self.order
self.schedule_init(pool, pass_id)
self.schedule_init(pass_id)
return True
# 3. Attempting a fold (simultaneous application) of previously discovered successful transformations; only
# supported in the "interleaving" pass execution mode.
Expand All @@ -1014,18 +1015,18 @@ def maybe_schedule_job(self, pool: pebble.ProcessPool) -> bool:
self.success_candidate.pass_state if self.success_candidate else None,
)
if folding_state:
self.schedule_fold(pool, folding_state)
self.schedule_fold(folding_state)
return True
# 4. Attempting a transformation using the next heuristic in the round-robin fashion.
if any(ctx.can_transform_now() for ctx in self.pass_contexts):
while not self.pass_contexts[self.next_pass_id].can_transform_now():
self.next_pass_id = (self.next_pass_id + 1) % len(self.pass_contexts)
self.schedule_transform(pool, self.next_pass_id)
self.schedule_transform(self.next_pass_id)
self.next_pass_id = (self.next_pass_id + 1) % len(self.pass_contexts)
return True
return False

def schedule_init(self, pool: pebble.ProcessPool, pass_id: int) -> None:
def schedule_init(self, pass_id: int) -> None:
ctx = self.pass_contexts[pass_id]
assert ctx.can_init_now()

Expand All @@ -1047,7 +1048,7 @@ def schedule_init(self, pool: pebble.ProcessPool, pass_id: int) -> None:
job_timeout=self.timeout,
pid_queue=self.pid_queue,
)
future = pool.schedule(_worker_process_job_wrapper, args=[self.order, env.run])
future = self.worker_pool.schedule(_worker_process_job_wrapper, args=[self.order, env.run])
self.jobs.append(
Job(
type=JobType.INIT,
Expand All @@ -1064,7 +1065,7 @@ def schedule_init(self, pool: pebble.ProcessPool, pass_id: int) -> None:
ctx.stage = PassStage.IN_INIT
self.order += 1

def schedule_transform(self, pool: pebble.ProcessPool, pass_id: int) -> None:
def schedule_transform(self, pass_id: int) -> None:
ctx = self.pass_contexts[pass_id]
assert ctx.can_transform_now()
assert ctx.state is not None
Expand All @@ -1085,7 +1086,9 @@ def schedule_transform(self, pool: pebble.ProcessPool, pass_id: int) -> None:
ctx.pass_.transform,
self.pid_queue,
)
future = pool.schedule(_worker_process_job_wrapper, args=[self.order, env.run], timeout=self.timeout)
future = self.worker_pool.schedule(
_worker_process_job_wrapper, args=[self.order, env.run], timeout=self.timeout
)
self.jobs.append(
Job(
type=JobType.TRANSFORM,
Expand All @@ -1104,7 +1107,7 @@ def schedule_transform(self, pool: pebble.ProcessPool, pass_id: int) -> None:
self.order += 1
ctx.state = ctx.pass_.advance(self.current_test_case, ctx.state)

def schedule_fold(self, pool: pebble.ProcessPool, folding_state: FoldingStateIn) -> None:
def schedule_fold(self, folding_state: FoldingStateIn) -> None:
assert self.interleaving

should_copy_test_cases = False # the fold transform creates the files itself
Expand All @@ -1120,7 +1123,9 @@ def schedule_fold(self, pool: pebble.ProcessPool, folding_state: FoldingStateIn)
FoldingManager.transform,
self.pid_queue,
)
future = pool.schedule(_worker_process_job_wrapper, args=[self.order, env.run], timeout=self.timeout)
future = self.worker_pool.schedule(
_worker_process_job_wrapper, args=[self.order, env.run], timeout=self.timeout
)
self.jobs.append(
Job(
type=JobType.TRANSFORM,
Expand Down