{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Device\n", "\n", "In this notebook you will:\n", "\n", "* Encapsulate multiple Signals in a Device\n", "\n", "Recommend Prerequisites:\n", "\n", "* [Hello Python and Jupyter](./Hello%20Python%20and%20Jupyter.ipynb)\n", "* [Epics Signal](./Epics%20Signal.ipynb)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Simulated Hardware\n", "Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs may already be running in the background. Run this command to ensure that they are running. In the event of a problem, edit this command to replace `status` with `restart all` and run again." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!./supervisor/start_supervisor.sh status" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Devices are a heirarchy\n", "\n", "A `Device` is a hierarchy composed of Signals and other Devices. The components of a Device can be introspected by layers above ophyd and may be decomposed to, ultimately, the underlying Signals." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ophyd import EpicsSignal, EpicsSignalRO\n", "\n", "x = EpicsSignal('random_walk:x')\n", "dt = EpicsSignal('random_walk:dt')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It would be convenient if we could read these as a unit, instead of `x.read(); dt.read()`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ophyd import Device, Component\n", "\n", "class RandomWalk(Device):\n", " x = Component(EpicsSignalRO, 'x')\n", " dt = Component(EpicsSignal, 'dt')\n", " \n", "random_walk = RandomWalk('random_walk:', name='random_walk')\n", "random_walk.wait_for_connection()\n", "random_walk" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `read()` and `describe()` methods walk the hierarchy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_walk.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_walk.x.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_walk.dt.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_walk.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A Device embodies a certain \"layout\" of components. We can have multiple Devices with different PV prefixes but the same layout." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "another_random_walk = RandomWalk('random_walk:vert-', name='another_random_walk')\n", "another_random_walk.wait_for_connection()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "another_random_walk.read()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A Device can be made of subdevices." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class RandomWalks(Device):\n", " vert = Component(RandomWalk, 'vert-')\n", " horiz = Component(RandomWalk, 'horiz-')\n", " \n", "random_walks = RandomWalks('random_walk:', name='random_walks')\n", "random_walks.wait_for_connection()\n", "random_walks" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_walks.vert.x.pvname" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_walks.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_walks.vert.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_walks.vert.x.read()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise\n", "\n", "Using the `random_walks` Device, set the PV `random_walk:horiz-dt` to `3`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load solutions/set_subcomponent_signal.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Adding a set method to `Device`\n", "\n", "Sometimes, setting a value to a Signal and knowing when it is \"done\" involves just one PV:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = random_walks.vert.dt.set(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In other cases it involves coordination across multiple PVs, such as a setpoint PV nd a readback PV, or a setpoint PV and a \"done\" PV. For those cases, we define a `set` method on the Device to manage the coordination across multiple Signals." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ophyd import DeviceStatus\n", "\n", "class Decay(Device):\n", " \"\"\"\n", " A device with a setpoint and readback that decays exponentially toward the setpoint.\n", " \"\"\"\n", " readback = Component(EpicsSignalRO, ':I')\n", " setpoint = Component(EpicsSignal, ':SP')\n", " \n", " def set(self, setpoint):\n", " \"\"\"\n", " Set the setpoint and return a Status object that monitors the readback.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " \n", " # Wire up a callback that will mark the status object as finished\n", " # when the readback approaches within some tolerance of the setpoint.\n", " def callback(old_value, value, **kwargs):\n", " TOLERANCE = 1 # hard-coded; we'll make this configurable later on...\n", " if abs(value - setpoint) < TOLERANCE:\n", " status._finished()\n", " self.readback.clear_sub(callback)\n", " \n", " self.readback.subscribe(callback)\n", " \n", " # Now 'put' the value.\n", " self.setpoint.put(setpoint)\n", " \n", " # And return the Status object, which the caller can use to\n", " # tell when the action is complete.\n", " return status\n", " \n", " \n", "decay = Decay('decay', name='decay')\n", "decay.wait_for_connection()\n", "decay" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "decay.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = decay.set(115)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can watch for completion either by registering a callback:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def callback():\n", " print(\"DONE!\")\n", " \n", "status.add_callback(callback)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "or by polling:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = decay.set(120)\n", "\n", "import time\n", "while not status.done:\n", " time.sleep(0.01) # Make sure to sleep to avoid pinning CPU.\n", "print(\"DONE!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Make the tolerance configurable with a \"soft\" Signal" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ophyd import Signal\n", "\n", "class Decay(Device):\n", " \"\"\"\n", " A device with a setpoint and readback that decays exponentially toward the setpoint.\n", " \"\"\"\n", " readback = Component(EpicsSignalRO, ':I')\n", " setpoint = Component(EpicsSignal, ':SP')\n", " tolerance = Component(Signal, value=1) # not associated with anything in EPICS---a pure ophyd construct\n", " \n", " def set(self, setpoint):\n", " \"\"\"\n", " Set the setpoint and return a Status object that monitors the readback.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " \n", " # Wire up a callback that will mark the status object as finished\n", " # when the readback approaches within some tolerance of the setpoint.\n", " def callback(old_value, value, **kwargs):\n", " if abs(value - setpoint) < self.tolerance.get():\n", " status._finished()\n", " self.readback.clear_sub(callback)\n", " \n", " self.readback.subscribe(callback)\n", " \n", " # Now 'put' the value.\n", " self.setpoint.put(setpoint)\n", " \n", " # And return the Status object, which the caller can use to\n", " # tell when the action is complete.\n", " return status\n", " \n", " \n", "decay = Decay('decay', name='decay')\n", "status = decay.set(125)\n", "status.add_callback(callback)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "decay.tolerance.set(2)\n", "status = decay.set(130)\n", "status.add_callback(callback)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Let the IOC tell us when it is done\n", "\n", "Some IOCs (but not all) provide a specific signal that we can use to know when a set is complete. In that case we can remove the \"tolerance\" logic entirely if we want to and trust the IOC." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Decay(Device):\n", " \"\"\"\n", " A device with a setpoint and readback that decays exponentially toward the setpoint.\n", " \"\"\"\n", " readback = Component(EpicsSignalRO, ':I')\n", " setpoint = Component(EpicsSignal, ':SP')\n", " done = Component(EpicsSignalRO, ':done')\n", " \n", " def set(self, setpoint):\n", " \"\"\"\n", " Set the setpoint and return a Status object that monitors the 'done' PV.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " \n", " # Wire up a callback that will mark the status object as finished\n", " # when the done signal goes from low to high---that is, a positive edge.\n", " def callback(old_value, value, **kwargs):\n", " if old_value == 0 and value == 1:\n", " status._finished()\n", " self.done.clear_sub(callback)\n", " \n", " self.done.subscribe(callback)\n", " \n", " # Now 'put' the value.\n", " self.setpoint.put(setpoint)\n", " \n", " # And return the Status object, which the caller can use to\n", " # tell when the action is complete.\n", " return status\n", " \n", " \n", "decay = Decay('decay', name='decay')\n", "decay" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = decay.set(135)\n", "status.add_callback(callback)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## `PVPositioner`\n", "\n", "The pattern of `readback`, `setpoint` and `done` is pretty common, so ophyd has a special `Device` subclass that writes the `set()` method for you if you provide components with these particular names." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ophyd import PVPositioner\n", "\n", "class Decay(PVPositioner):\n", " \"\"\"\n", " A device with a setpoint and readback that decays exponentially toward the setpoint.\n", " \"\"\"\n", " readback = Component(EpicsSignalRO, ':I')\n", " setpoint = Component(EpicsSignal, ':SP')\n", " done = Component(EpicsSignalRO, ':done')\n", " # actuate = Component(EpicsSignal, ...) # the \"Go\" button, not applicable to this IOC, but sometimes needed\n", " \n", "decay = Decay('decay', name='decay')\n", "status = decay.set(140)\n", "status.add_callback(callback)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Adding a `trigger` method to `Device`\n", "\n", "Like `Device.set`, `Device.trigger` can coordinate across multiple PVs to trigger and detector and tell when it is done triggering.\n", "\n", "When a bluesky plan obtains a reading from some `device` it typically:\n", "\n", "* Calls `device.trigger()` and receives back a status object\n", "* Waits for that status object to complete (while potentially doing other things, like triggering other detectors in parallel)\n", "* Calls `device.read()`\n", "\n", "Some detectors don't need to be triggered and may be summarily read at any time, such as `random_walk.x`. In that case, `device.trigger()` simply has no effect and returns a Status object that is immediately \"done\". But other detector require a specific signal or sequence of signals to acquire a new reading, such as in this example." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class TriggeredDetector(Device):\n", " \"\"\"\n", " A detector that requires triggering\n", " \"\"\"\n", " gain = Component(EpicsSignal, ':gain')\n", " exposure_time = Component(EpicsSignal, ':exposure_time')\n", " reading = Component(EpicsSignalRO, ':reading')\n", " acquire = Component(EpicsSignal, ':acquire')\n", " enabled = Component(EpicsSignal, ':enabled')\n", "\n", " def trigger(self):\n", " \"\"\"\n", " Trigger the detector and return a Status object.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " \n", " # Wire up a callback that will mark the status object as finished\n", " # when we see the state flip from \"acquiring\" to \"not acquiring\"---\n", " # that is, a negative edge.\n", " def callback(old_value, value, **kwargs):\n", " if old_value == 1 and value == 0:\n", " status._finished()\n", " self.acquire.clear_sub(callback)\n", " \n", " self.acquire.subscribe(callback)\n", " \n", " # Now 'put' 1 to the acquire signal.\n", " self.acquire.put(1) \n", "\n", " # And return the Status object, which the caller can use to\n", " # tell when the action is complete.\n", " return status\n", " \n", "triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = triggered_detector.trigger()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This `status` object is exactly the same as the one we got from `set()`. We can check completion by registering a callback or polling." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def callback():\n", " print(\"ACQUISITION COMPLETE\")\n", "\n", "status.add_callback(callback)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while not status.done:\n", " time.sleep(0.01)\n", "print(\"ACQUISITION COMPLETE!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Using put-completion\n", "\n", "If the IOC in question implements correct \"put-completion\" on the triggering PV, we can rely on that, and a simpler solution is possible. But not all IOCs do this so it is good to know the more general solution above." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class TriggeredDetector(Device):\n", " gain = Component(EpicsSignal, ':gain')\n", " exposure_time = Component(EpicsSignal, ':exposure_time')\n", " reading = Component(EpicsSignalRO, ':reading')\n", " acquire = Component(EpicsSignal, ':acquire', put_complete=True) # Rely on the IOC to signal done-ness.\n", " enabled = Component(EpicsSignal, ':enabled')\n", "\n", " def trigger(self):\n", " \"\"\"\n", " Trigger the detector and return a Status object.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " self.acquire.put(1, callback=status._finished)\n", " return status\n", "\n", "triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = triggered_detector.trigger()\n", "while not status.done:\n", " time.sleep(0.01)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Status objects\n", "\n", "Status objects are like rich Futures. They know whether they are `done`, whether their action finished in `success` or not, and they hold a reference to the `device` that they came from, which can be useful to debugging failures. The `status.watch()` may be used to subscribe to incremental progress updates and is used by bluesky to display progress bars during sets and triggers." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status.success" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status.device" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Sorting components into \"kinds\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class TriggeredDetector(Device):\n", " \"\"\"\n", " A detector that requires triggering\n", " \"\"\"\n", " gain = Component(EpicsSignal, ':gain', kind='config')\n", " exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')\n", " reading = Component(EpicsSignalRO, ':reading', kind='normal')\n", " acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)\n", " enabled = Component(EpicsSignal, ':enabled', kind='omitted')\n", "\n", " def trigger(self):\n", " \"\"\"\n", " Trigger the detector and return a Status object.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " self.acquire.put(1, callback=status._finished)\n", " return status\n", " \n", "triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.read_configuration()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.reading.kind" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.reading.kind = 'HINTED'\n", "triggered_detector.reading.kind" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.hints" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Staging and unstaging\n", "\n", "Above we said that when bluesky obtains a reading from some `device` it typically:\n", "\n", "* Calls `device.trigger()` and receives back a status object\n", "* Waits for that status object to complete (while potentially doing other things, like triggering other detectors in parallel)\n", "* Calls `device.read()`\n", "\n", "If it obtains multiple readings in sequence, it repeats this trigger/wait/read cycle. Sometimes, before triggering, there is some choreographed sequence of steps necessary to make `device` ready for use and some corresponding sequence to put in back safely into a resting state. To support this bluesky plans typically call `device.stage()` *once* before first using a device in a plan and then `device.unstage()` at the end. Even if a plan is interrupted by the user or by an exception begin raise, `device.unstage()` will be called.\n", "\n", "Like `device.trigger()`, is hook is optional and does not apply to all devices." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ophyd import set_and_wait\n", "\n", "class TriggeredDetector(Device):\n", " \"\"\"\n", " A detector that requires triggering\n", " \"\"\"\n", " gain = Component(EpicsSignal, ':gain', kind='config')\n", " exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')\n", " reading = Component(EpicsSignalRO, ':reading', kind='normal')\n", " acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)\n", " enabled = Component(EpicsSignal, ':enabled', kind='omitted')\n", "\n", " def trigger(self):\n", " \"\"\"\n", " Trigger the detector and return a Status object.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " self.acquire.put(1, callback=status._finished)\n", " return status\n", " \n", " def stage(self):\n", " self.initial_enabled_state = self.enabled.get()\n", " set_and_wait(self.enabled, 1)\n", " return super().stage()\n", " \n", " def unstage(self):\n", " ret = super().unstage()\n", " set_and_wait(self.enabled, self.initial_enabled_state)\n", " return ret\n", " \n", "triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.enabled.put(0)\n", "triggered_detector.enabled.get()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.stage()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.enabled.get()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = triggered_detector.trigger()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while not status.done:\n", " time.sleep(0.01)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = triggered_detector.trigger()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while not status.done:\n", " time.sleep(0.01)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.unstage()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "triggered_detector.enabled.get()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### A convenient shorthand for common simple cases: `stage_sigs`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ophyd import set_and_wait\n", "\n", "class TriggeredDetector(Device):\n", " \"\"\"\n", " A detector that requires triggering\n", " \"\"\"\n", " gain = Component(EpicsSignal, ':gain', kind='config')\n", " exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')\n", " reading = Component(EpicsSignalRO, ':reading', kind='hinted')\n", " acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)\n", " enabled = Component(EpicsSignal, ':enabled', kind='omitted')\n", " \n", " def __init__(self, *args, **kwargs):\n", " super().__init__(*args, **kwargs)\n", " self.stage_sigs['enabled'] = 1 # OrderedDict mapping component name to desired state\n", "\n", " def trigger(self):\n", " \"\"\"\n", " Trigger the detector and return a Status object.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " self.acquire.put(1, callback=status._finished)\n", " return status\n", " \n", "triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise\n", "\n", "Try staging the device twice in a row. Then try unstaging it twice in a row." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Customizing cleanup via `stop`, `resume`, `pause`\n", "\n", "These optional methods can be used to further customize a Device's cleanup:\n", "\n", "* `stop` -- called by bluesky when a plan is paused or exits (successfully or in error)\n", "* `pause` -- called when the RunEngine is paused\n", "* `resume` -- called when the RunEngine resumes from a pause" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class TriggeredDetector(Device):\n", " \"\"\"\n", " A detector that requires triggering\n", " \"\"\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ " gain = Component(EpicsSignal, ':gain', kind='config')\n", " exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')\n", " reading = Component(EpicsSignalRO, ':reading', kind='hinted')\n", " acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)\n", " enabled = Component(EpicsSignal, ':enabled', kind='omitted')\n", " \n", " def __init__(self, *args, **kwargs):\n", " super().__init__(*args, **kwargs)\n", " self.stage_sigs['enabled'] = 1 # OrderedDict mapping component name to desired state\n", "\n", " def trigger(self):\n", " \"\"\"\n", " Trigger the detector and return a Status object.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " self.acquire.put(1, callback=status._finished)\n", " return status\n", " \n", " def resume(self):\n", " ...\n", " \n", " def pause(self):\n", " ...\n", " \n", " def stop(self, success=False):\n", " ..." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The Payoff\n", "\n", "The payoff of adhering to the `Device` interfaces is that you can scan devices with very different ways of indicating when they are done (put completion or not, one or multiple PVs), very different hardware (motor, diffractometer, temperature controller, power supply voltage, anything you want to \"scan\" as part of an experiment). The details of each case are encapsulated in `Device` and do not leak into the scripts higher up." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib widget\n", "import matplotlib.pyplot as plt\n", "from bluesky import RunEngine\n", "from bluesky.callbacks.best_effort import BestEffortCallback\n", "from bluesky.plans import scan\n", "\n", "RE = RunEngine()\n", "bec = BestEffortCallback()\n", "RE.subscribe(bec)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run scripts/eurotherm.py\n", "\n", "eurotherm = Eurotherm('thermo:', name='eurotherm')\n", "eurotherm.readback.kind = 'hinted'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "eurotherm.hints" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Creating a figure explicitly in advance helps with the\n", "# top-to-bottom flow of this notebook, but it is not necessary.\n", "# If this is omitted, bluesky will cause a figure to appear\n", "# during the RE(...) execution below.\n", "plt.figure('triggered_detector_reading vs eurotherm_readback')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "RE(scan([triggered_detector], eurotherm, 100, 130, 6))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "nbsphinx": { "timeout": 600 } }, "outputs": [], "source": [ "plt.gcf() # Display a snapshot of the current state of the figure." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }