diff --git a/bluesky/run_engine.py b/bluesky/run_engine.py index a58ff8696..87e0d75dc 100644 --- a/bluesky/run_engine.py +++ b/bluesky/run_engine.py @@ -326,7 +326,7 @@ def block(ev: Threading.Event) -> None: _UNCACHEABLE_COMMANDS = ['pause', 'subscribe', 'unsubscribe', 'stage', 'unstage', 'monitor', 'unmonitor', 'open_run', 'close_run', 'install_suspender', - 'remove_suspender'] + 'remove_suspender', '_start_suspender'] @property def state(self): @@ -464,7 +464,8 @@ def setup_run_permit(): 'clear_checkpoint': self._clear_checkpoint, 'rewindable': self._rewindable, 'pause': self._pause, - 'resume': self._resume, + '_resume_from_suspender': self._resume, + '_start_suspender': self._start_suspender, 'collect': self._collect, 'kickoff': self._kickoff, 'complete': self._complete, @@ -1147,58 +1148,15 @@ async def _request_suspend(pre_plan, post_plan, justification): self._state = 'aborting' if not was_paused: self._task.cancel() - if justification is not None: - print("Justification for this suspension:\n%s" % justification) - for current_run in self._run_bundlers.values(): - current_run.record_interruption('resume') - # During suspend, all motors should be stopped. Call stop() on - # every object we ever set(). - self._stop_movable_objects(success=True) - # Notify Devices of the pause in case they want to clean up. - for obj in self._objs_seen: - if hasattr(obj, 'pause'): - try: - obj.pause() - except NoReplayAllowed: - self._reset_checkpoint_state_meth() - - # rewind to the last checkpoint - new_plan = self._rewind() - # queue up the cached messages - self._plan_stack.append(new_plan) - self._response_stack.append(None) - - self._plan_stack.append(single_gen( - Msg('rewindable', None, self.rewindable))) - self._response_stack.append(None) + print(f"Justification for this suspension:\n{justification}") - # if there is a post plan add it between the wait - # and the cached messages - if post_plan is not None: - if callable(post_plan): - post_plan = post_plan() - self._plan_stack.append(ensure_generator(post_plan)) - self._response_stack.append(None) - - # tell the devices they are ready to go again - self._plan_stack.append(single_gen(Msg('resume', None, ))) - self._response_stack.append(None) - - # add the wait on the future to the stack - self._plan_stack.append(single_gen(Msg('wait_for', None, [fut, ]))) + # add starting the suspender logic to the stack + self._plan_stack.append( + single_gen(Msg('_start_suspender', None, pre_plan, post_plan, justification, fut)) + ) self._response_stack.append(None) - # if there is a pre plan add on top of the wait - if pre_plan is not None: - if callable(pre_plan): - pre_plan = pre_plan() - self._plan_stack.append(ensure_generator(pre_plan)) - self._response_stack.append(None) - - self._plan_stack.append(single_gen( - Msg('rewindable', None, False))) - self._response_stack.append(None) # The event loop is still running. The pre_plan will be processed, # and then the RunEngine will be hung up on processing the # 'wait_for' message until `fut` is set. @@ -1211,6 +1169,55 @@ async def _request_suspend(pre_plan, post_plan, justification): self.loop.create_task, _request_suspend(pre_plan, post_plan, justification)) + async def _start_suspender(self, msg): + """ + An internal message to do the initial work of starting a suspender + """ + pre_plan, post_plan, justification, fut = msg.args + for current_run in self._run_bundlers.values(): + current_run.record_interruption( + justification if justification is not None else "suspended" + ) + # During suspend, all motors should be stopped. Call stop() on + # every object we ever set(). + self._stop_movable_objects(success=True) + # Notify Devices of the pause in case they want to clean up. + for obj in self._objs_seen: + if hasattr(obj, 'pause'): + try: + obj.pause() + except NoReplayAllowed: + self._reset_checkpoint_state_meth() + # rewind to the last checkpoint + rewind_plan = self._rewind() + was_rewindable = self.rewindable + + if callable(pre_plan): + pre_plan = pre_plan() + if callable(post_plan): + post_plan = post_plan() + + def suspender_helper_inner_plan(): + # none of this should run again. + yield Msg('rewindable', None, False) + # if there is a pre plan add on top of the wait + if pre_plan is not None: + yield from ensure_generator(pre_plan) + # wait for the future from the suspender to be released + yield Msg('wait_for', None, [fut, ]) + # do the work we need to do to resume + yield Msg('_resume_from_suspender', None, ) + # if there is a post plan, run it + if post_plan is not None: + yield from ensure_generator(post_plan) + # put rewindable back the way it was + yield Msg('rewindable', None, was_rewindable) + yield from rewind_plan + + # add the above helper to the plan stack + self._plan_stack.append(suspender_helper_inner_plan()) + self._response_stack.append(None) + def abort(self, reason=''): """ Stop a running or paused plan and mark it as aborted. @@ -1458,19 +1465,24 @@ async def _run(self): # the new response to be added new_response = None - # This 'await' must be here to ensure that - # this coroutine breaks out of its current behavior - # before trying to get the next message from the - # top of the generator stack in case there has - # been a pause requested. Without this the next - # message after the pause may be processed first - # on resume (instead of the first message in - # self._msg_cache). - - # This sleep has to be inside of this try block so - # that any of the 'async' exceptions get thrown in the - # correct place - await asyncio.sleep(0, **self._loop_for_kwargs) + # This 'await' must be here to ensure that this coroutine + # breaks out of its current behavior before trying to get + # the next message from the top of the generator stack in + # case there has been a pause requested. Without this the + # next message after the pause may be processed first on + # resume (instead of the first message in self._msg_cache). + # This await also gives the co-routine for requesting + # suspends a chance to run. + + # This sleep has to be inside of this try block so that any + # of the 'async' exceptions get thrown in the correct + # place. + + # If we are handling an exception, then burn through the + # current plan stack before rather than allowing a pause or + # suspension to try and finish firing. + if stashed_exception is None: + await asyncio.sleep(0, **self._loop_for_kwargs) # always pop off a result, we are either sending it back in # or throwing an exception in, in either case the left hand # side of the yield in the plan will be moved past @@ -1482,12 +1494,10 @@ async def _run(self): stashed_exception = self._exception self._exception = None # The case where we have a stashed exception - if (stashed_exception is not None or - isinstance(resp, Exception)): + if (stashed_exception is not None or isinstance(resp, Exception)): # throw the exception at the current plan try: - msg = self._plan_stack[-1].throw( - stashed_exception or resp) + msg = self._plan_stack[-1].throw(stashed_exception or resp) except Exception as e: # The current plan did not handle it, # maybe the next plan (if any) would like @@ -1496,6 +1506,8 @@ async def _run(self): # we have killed the current plan, do not give # it a new response resp = sentinel + # If there is at least one plan left in the stack, + # stash the new exception go back to top if len(self._plan_stack): stashed_exception = e continue diff --git a/bluesky/tests/test_examples.py b/bluesky/tests/test_examples.py index f64211fee..ef8280045 100644 --- a/bluesky/tests/test_examples.py +++ b/bluesky/tests/test_examples.py @@ -283,20 +283,20 @@ def test_suspend(RE, hw): ] assert RE.state == 'idle' - def local_suspend(): - RE.request_suspend(ev.wait) - def resume_cb(): RE.loop.call_soon_threadsafe(ev.set) + def local_suspend(): + RE.request_suspend(ev.wait) + # wait a second and then resume + threading.Timer(1, resume_cb).start() + out = [] def ev_cb(name, ev): out.append(ev) # trigger the suspend right after the check point threading.Timer(.1, local_suspend).start() - # wait a second and then resume - threading.Timer(1, resume_cb).start() # grab the start time start = ttime.time() # run, this will not return until it is done diff --git a/bluesky/tests/test_run_engine.py b/bluesky/tests/test_run_engine.py index 43952b3ce..6e292d915 100644 --- a/bluesky/tests/test_run_engine.py +++ b/bluesky/tests/test_run_engine.py @@ -493,14 +493,20 @@ def test_plan(motor, det): motor, UnReplayableSynGauss('det', motor, 'motor', center=0, Imax=1), ['set', 'trigger', 'sleep', - 'rewindable', 'wait_for', 'resume', 'rewindable', + "_start_suspender", + 'rewindable', + 'wait_for', "_resume_from_suspender", + 'rewindable', 'set', 'trigger'])) inps.append((test_plan, motor, SynGauss('det', motor, 'motor', center=0, Imax=1), ['set', 'trigger', 'sleep', - 'rewindable', 'wait_for', 'resume', 'rewindable', + "_start_suspender", + 'rewindable', + 'wait_for', "_resume_from_suspender", + 'rewindable', 'set', 'trigger', 'sleep', 'set', 'trigger'])) diff --git a/bluesky/tests/test_suspenders.py b/bluesky/tests/test_suspenders.py index eab8d08c8..499fed2f1 100644 --- a/bluesky/tests/test_suspenders.py +++ b/bluesky/tests/test_suspenders.py @@ -84,39 +84,89 @@ def accum(msg): RE(scan) assert len(msg_lst) == 2 - assert ['wait_for', 'checkpoint'] == [m[0] for m in msg_lst] - - -@pytest.mark.parametrize('pre_plan,post_plan,expected_list', - [([Msg('null')], None, - ['checkpoint', 'sleep', 'rewindable', 'null', - 'wait_for', 'resume', 'rewindable', 'sleep']), - (None, [Msg('null')], - ['checkpoint', 'sleep', 'rewindable', - 'wait_for', 'resume', 'null', 'rewindable', - 'sleep']), - ([Msg('null')], [Msg('null')], - ['checkpoint', 'sleep', 'rewindable', 'null', - 'wait_for', 'resume', 'null', 'rewindable', - 'sleep']), - (lambda: [Msg('null')], lambda: [Msg('null')], - ['checkpoint', 'sleep', 'rewindable', 'null', - 'wait_for', 'resume', 'null', 'rewindable', - 'sleep'])]) + assert ["wait_for", "checkpoint"] == [m[0] for m in msg_lst] + + +@pytest.mark.parametrize( + "pre_plan,post_plan,expected_list", + [ + ( + [Msg("null")], + None, + [ + "checkpoint", + "sleep", + "_start_suspender", + "rewindable", + "null", + "wait_for", + "_resume_from_suspender", + "rewindable", + "sleep", + ], + ), + ( + None, + [Msg("null")], + [ + "checkpoint", + "sleep", + "_start_suspender", + "rewindable", + "wait_for", + "_resume_from_suspender", + "null", + "rewindable", + "sleep", + ], + ), + ( + [Msg("null")], + [Msg("null")], + [ + "checkpoint", + "sleep", + "_start_suspender", + "rewindable", + "null", + "wait_for", + "_resume_from_suspender", + "null", + "rewindable", + "sleep", + ], + ), + ( + lambda: [Msg("null")], + lambda: [Msg("null")], + [ + "checkpoint", + "sleep", + "_start_suspender", + "rewindable", + "null", + "wait_for", + "_resume_from_suspender", + "null", + "rewindable", + "sleep", + ], + ), + ], +) def test_pre_suspend_plan(RE, pre_plan, post_plan, expected_list, hw): sig = hw.bool_sig - scan = [Msg('checkpoint'), Msg('sleep', None, .2)] + scan = [Msg("checkpoint"), Msg("sleep", None, 0.2)] msg_lst = [] sig.put(0) def accum(msg): msg_lst.append(msg) - susp = SuspendBoolHigh(sig, pre_plan=pre_plan, - post_plan=post_plan) + susp = SuspendBoolHigh(sig, pre_plan=pre_plan, post_plan=post_plan) RE.install_suspender(susp) - threading.Timer(.1, sig.put, (1,)).start() + threading.Timer(0.1, sig.put, (1,)).start() threading.Timer(1, sig.put, (0,)).start() RE.msg_hook = accum RE(scan) diff --git a/docs/source/api_changes.rst b/docs/source/api_changes.rst index 74795c170..2e46a7b03 100644 --- a/docs/source/api_changes.rst +++ b/docs/source/api_changes.rst @@ -2,6 +2,11 @@ Release History ================= +v1.9.0 (2022-XX-YY) +=================== + +* the `"resume"` message which can only be used internally has been renamed to + `"_resume_from_suspender"`. v1.8.3 (2022-04-08) ===================