Skip to content

Commit

Permalink
Merge pull request bluesky#1518 from tacaswell/mnt_suspender_clarity
Browse files Browse the repository at this point in the history
MNT: clarify how suspenders get evaluated
  • Loading branch information
coretl committed May 25, 2022
2 parents 85c5d92 + 73a2a81 commit 2cad6cd
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 97 deletions.
146 changes: 79 additions & 67 deletions bluesky/run_engine.py
Expand Up @@ -326,7 +326,7 @@ def block(ev: Threading.Event) -> None:
_UNCACHEABLE_COMMANDS = ['pause', 'subscribe', 'unsubscribe', 'stage',
'unstage', 'monitor', 'unmonitor', 'open_run',
'close_run', 'install_suspender',
'remove_suspender']
'remove_suspender', '_start_suspender']

@property
def state(self):
Expand Down Expand Up @@ -464,7 +464,8 @@ def setup_run_permit():
'clear_checkpoint': self._clear_checkpoint,
'rewindable': self._rewindable,
'pause': self._pause,
'resume': self._resume,
'_resume_from_suspender': self._resume,
'_start_suspender': self._start_suspender,
'collect': self._collect,
'kickoff': self._kickoff,
'complete': self._complete,
Expand Down Expand Up @@ -1147,58 +1148,15 @@ async def _request_suspend(pre_plan, post_plan, justification):
self._state = 'aborting'
if not was_paused:
self._task.cancel()

if justification is not None:
print("Justification for this suspension:\n%s" % justification)
for current_run in self._run_bundlers.values():
current_run.record_interruption('resume')
# During suspend, all motors should be stopped. Call stop() on
# every object we ever set().
self._stop_movable_objects(success=True)
# Notify Devices of the pause in case they want to clean up.
for obj in self._objs_seen:
if hasattr(obj, 'pause'):
try:
obj.pause()
except NoReplayAllowed:
self._reset_checkpoint_state_meth()

# rewind to the last checkpoint
new_plan = self._rewind()
# queue up the cached messages
self._plan_stack.append(new_plan)
self._response_stack.append(None)

self._plan_stack.append(single_gen(
Msg('rewindable', None, self.rewindable)))
self._response_stack.append(None)
print(f"Justification for this suspension:\n{justification}")

# if there is a post plan add it between the wait
# and the cached messages
if post_plan is not None:
if callable(post_plan):
post_plan = post_plan()
self._plan_stack.append(ensure_generator(post_plan))
self._response_stack.append(None)

# tell the devices they are ready to go again
self._plan_stack.append(single_gen(Msg('resume', None, )))
self._response_stack.append(None)

# add the wait on the future to the stack
self._plan_stack.append(single_gen(Msg('wait_for', None, [fut, ])))
# add starting the suspender logic to the stack
self._plan_stack.append(
single_gen(Msg('_start_suspender', None, pre_plan, post_plan, justification, fut))
)
self._response_stack.append(None)

# if there is a pre plan add on top of the wait
if pre_plan is not None:
if callable(pre_plan):
pre_plan = pre_plan()
self._plan_stack.append(ensure_generator(pre_plan))
self._response_stack.append(None)

self._plan_stack.append(single_gen(
Msg('rewindable', None, False)))
self._response_stack.append(None)
# The event loop is still running. The pre_plan will be processed,
# and then the RunEngine will be hung up on processing the
# 'wait_for' message until `fut` is set.
Expand All @@ -1211,6 +1169,55 @@ async def _request_suspend(pre_plan, post_plan, justification):
self.loop.create_task,
_request_suspend(pre_plan, post_plan, justification))

async def _start_suspender(self, msg):
"""
An internal message to do the initial work of starting a suspender
"""
pre_plan, post_plan, justification, fut = msg.args
for current_run in self._run_bundlers.values():
current_run.record_interruption(
justification if justification is not None else "suspended"
)
# During suspend, all motors should be stopped. Call stop() on
# every object we ever set().
self._stop_movable_objects(success=True)
# Notify Devices of the pause in case they want to clean up.
for obj in self._objs_seen:
if hasattr(obj, 'pause'):
try:
obj.pause()
except NoReplayAllowed:
self._reset_checkpoint_state_meth()
# rewind to the last checkpoint
rewind_plan = self._rewind()
was_rewindable = self.rewindable

if callable(pre_plan):
pre_plan = pre_plan()
if callable(post_plan):
post_plan = post_plan()

def suspender_helper_inner_plan():
# none of this should run again.
yield Msg('rewindable', None, False)
# if there is a pre plan add on top of the wait
if pre_plan is not None:
yield from ensure_generator(pre_plan)
# wait for the future from the suspender to be released
yield Msg('wait_for', None, [fut, ])
# do the work we need to do to resume
yield Msg('_resume_from_suspender', None, )
# if there is a post plan, run it
if post_plan is not None:
yield from ensure_generator(post_plan)
# put rewindable back the way it was
yield Msg('rewindable', None, was_rewindable)
yield from rewind_plan

# add the above helper to the plan stack
self._plan_stack.append(suspender_helper_inner_plan())
self._response_stack.append(None)

def abort(self, reason=''):
"""
Stop a running or paused plan and mark it as aborted.
Expand Down Expand Up @@ -1458,19 +1465,24 @@ async def _run(self):
# the new response to be added
new_response = None

# This 'await' must be here to ensure that
# this coroutine breaks out of its current behavior
# before trying to get the next message from the
# top of the generator stack in case there has
# been a pause requested. Without this the next
# message after the pause may be processed first
# on resume (instead of the first message in
# self._msg_cache).

# This sleep has to be inside of this try block so
# that any of the 'async' exceptions get thrown in the
# correct place
await asyncio.sleep(0, **self._loop_for_kwargs)
# This 'await' must be here to ensure that this coroutine
# breaks out of its current behavior before trying to get
# the next message from the top of the generator stack in
# case there has been a pause requested. Without this the
# next message after the pause may be processed first on
# resume (instead of the first message in self._msg_cache).
# This await also gives the co-routine for requesting
# suspends a chance to run.

# This sleep has to be inside of this try block so that any
# of the 'async' exceptions get thrown in the correct
# place.

# If we are handling an exception, then burn through the
# current plan stack before rather than allowing a pause or
# suspension to try and finish firing.
if stashed_exception is None:
await asyncio.sleep(0, **self._loop_for_kwargs)
# always pop off a result, we are either sending it back in
# or throwing an exception in, in either case the left hand
# side of the yield in the plan will be moved past
Expand All @@ -1482,12 +1494,10 @@ async def _run(self):
stashed_exception = self._exception
self._exception = None
# The case where we have a stashed exception
if (stashed_exception is not None or
isinstance(resp, Exception)):
if (stashed_exception is not None or isinstance(resp, Exception)):
# throw the exception at the current plan
try:
msg = self._plan_stack[-1].throw(
stashed_exception or resp)
msg = self._plan_stack[-1].throw(stashed_exception or resp)
except Exception as e:
# The current plan did not handle it,
# maybe the next plan (if any) would like
Expand All @@ -1496,6 +1506,8 @@ async def _run(self):
# we have killed the current plan, do not give
# it a new response
resp = sentinel
# If there is at least one plan left in the stack,
# stash the new exception go back to top
if len(self._plan_stack):
stashed_exception = e
continue
Expand Down
10 changes: 5 additions & 5 deletions bluesky/tests/test_examples.py
Expand Up @@ -283,20 +283,20 @@ def test_suspend(RE, hw):
]
assert RE.state == 'idle'

def local_suspend():
RE.request_suspend(ev.wait)

def resume_cb():
RE.loop.call_soon_threadsafe(ev.set)

def local_suspend():
RE.request_suspend(ev.wait)
# wait a second and then resume
threading.Timer(1, resume_cb).start()

out = []

def ev_cb(name, ev):
out.append(ev)
# trigger the suspend right after the check point
threading.Timer(.1, local_suspend).start()
# wait a second and then resume
threading.Timer(1, resume_cb).start()
# grab the start time
start = ttime.time()
# run, this will not return until it is done
Expand Down
10 changes: 8 additions & 2 deletions bluesky/tests/test_run_engine.py
Expand Up @@ -493,14 +493,20 @@ def test_plan(motor, det):
motor,
UnReplayableSynGauss('det', motor, 'motor', center=0, Imax=1),
['set', 'trigger', 'sleep',
'rewindable', 'wait_for', 'resume', 'rewindable',
"_start_suspender",
'rewindable',
'wait_for', "_resume_from_suspender",
'rewindable',
'set', 'trigger']))

inps.append((test_plan,
motor,
SynGauss('det', motor, 'motor', center=0, Imax=1),
['set', 'trigger', 'sleep',
'rewindable', 'wait_for', 'resume', 'rewindable',
"_start_suspender",
'rewindable',
'wait_for', "_resume_from_suspender",
'rewindable',
'set',
'trigger', 'sleep', 'set', 'trigger']))

Expand Down
96 changes: 73 additions & 23 deletions bluesky/tests/test_suspenders.py
Expand Up @@ -84,39 +84,89 @@ def accum(msg):
RE(scan)

assert len(msg_lst) == 2
assert ['wait_for', 'checkpoint'] == [m[0] for m in msg_lst]


@pytest.mark.parametrize('pre_plan,post_plan,expected_list',
[([Msg('null')], None,
['checkpoint', 'sleep', 'rewindable', 'null',
'wait_for', 'resume', 'rewindable', 'sleep']),
(None, [Msg('null')],
['checkpoint', 'sleep', 'rewindable',
'wait_for', 'resume', 'null', 'rewindable',
'sleep']),
([Msg('null')], [Msg('null')],
['checkpoint', 'sleep', 'rewindable', 'null',
'wait_for', 'resume', 'null', 'rewindable',
'sleep']),
(lambda: [Msg('null')], lambda: [Msg('null')],
['checkpoint', 'sleep', 'rewindable', 'null',
'wait_for', 'resume', 'null', 'rewindable',
'sleep'])])
assert ["wait_for", "checkpoint"] == [m[0] for m in msg_lst]


@pytest.mark.parametrize(
"pre_plan,post_plan,expected_list",
[
(
[Msg("null")],
None,
[
"checkpoint",
"sleep",
"_start_suspender",
"rewindable",
"null",
"wait_for",
"_resume_from_suspender",
"rewindable",
"sleep",
],
),
(
None,
[Msg("null")],
[
"checkpoint",
"sleep",
"_start_suspender",
"rewindable",
"wait_for",
"_resume_from_suspender",
"null",
"rewindable",
"sleep",
],
),
(
[Msg("null")],
[Msg("null")],
[
"checkpoint",
"sleep",
"_start_suspender",
"rewindable",
"null",
"wait_for",
"_resume_from_suspender",
"null",
"rewindable",
"sleep",
],
),
(
lambda: [Msg("null")],
lambda: [Msg("null")],
[
"checkpoint",
"sleep",
"_start_suspender",
"rewindable",
"null",
"wait_for",
"_resume_from_suspender",
"null",
"rewindable",
"sleep",
],
),
],
)
def test_pre_suspend_plan(RE, pre_plan, post_plan, expected_list, hw):
sig = hw.bool_sig
scan = [Msg('checkpoint'), Msg('sleep', None, .2)]
scan = [Msg("checkpoint"), Msg("sleep", None, 0.2)]
msg_lst = []
sig.put(0)

def accum(msg):
msg_lst.append(msg)

susp = SuspendBoolHigh(sig, pre_plan=pre_plan,
post_plan=post_plan)
susp = SuspendBoolHigh(sig, pre_plan=pre_plan, post_plan=post_plan)

RE.install_suspender(susp)
threading.Timer(.1, sig.put, (1,)).start()
threading.Timer(0.1, sig.put, (1,)).start()
threading.Timer(1, sig.put, (0,)).start()
RE.msg_hook = accum
RE(scan)
Expand Down
5 changes: 5 additions & 0 deletions docs/source/api_changes.rst
Expand Up @@ -2,6 +2,11 @@
Release History
=================

v1.9.0 (2022-XX-YY)
===================

* the `"resume"` message which can only be used internally has been renamed to
`"_resume_from_suspender"`.

v1.8.3 (2022-04-08)
===================
Expand Down

0 comments on commit 2cad6cd

Please sign in to comment.