{ "cells": [ { "cell_type": "markdown", "id": "convenient-valentine", "metadata": {}, "source": [ "# Achieving Complex Behaviors with Set Methods\n", "\n", "In this tutorial we will learn how to encode more complex behaviors into Devices by defining `set` methods. \n", "\n", "This will allow us to set multiple PVs at once, as well as to perform calculations on input values as needed.\n", "\n", "## Set up for tutorial\n", "\n", "First, let's ensure our simulated IOCs are running.\n", "\n", "The IOCs may already be running in the background. Run this command to verify\n", "that they are running: it should produce output with STARTING or RUNNING on each line.\n", "In the event of a problem, edit this command to replace `status` with `restart all` and run again." ] }, { "cell_type": "code", "execution_count": null, "id": "silver-gospel", "metadata": {}, "outputs": [], "source": [ "!../supervisor/start_supervisor.sh status" ] }, { "cell_type": "markdown", "id": "remarkable-eligibility", "metadata": {}, "source": [ "## Adding a set method to `Device`\n", "\n", "Sometimes, setting a value to a Signal and knowing when it is \"done\" involves just one PV. Here's a simple example from the previous tutorial:" ] }, { "cell_type": "code", "execution_count": null, "id": "crucial-cinema", "metadata": {}, "outputs": [], "source": [ "from ophyd import Device, Component, EpicsSignal, EpicsSignalRO\n", "\n", "class RandomWalk(Device):\n", " x = Component(EpicsSignalRO, 'x')\n", " dt = Component(EpicsSignal, 'dt')\n", " \n", "random_walk = RandomWalk('random_walk:', name='random_walk')\n", "random_walk.wait_for_connection()\n", "\n", "status = random_walk.dt.set(2)" ] }, { "cell_type": "markdown", "id": "serial-knife", "metadata": {}, "source": [ "In other cases it involves coordination across multiple PVs, such as a setpoint PV and a readback PV, or a setpoint PV and a \"done\" PV. For those cases, we define a `set` method on the Device to manage the coordination across multiple Signals." ] }, { "cell_type": "code", "execution_count": null, "id": "standing-bahamas", "metadata": {}, "outputs": [], "source": [ "from ophyd import DeviceStatus\n", "\n", "class Decay(Device):\n", " \"\"\"\n", " A device with a setpoint and readback that decays exponentially toward the setpoint.\n", " \"\"\"\n", " readback = Component(EpicsSignalRO, ':I')\n", " setpoint = Component(EpicsSignal, ':SP')\n", " \n", " def set(self, setpoint):\n", " \"\"\"\n", " Set the setpoint and return a Status object that monitors the readback.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " \n", " # Wire up a callback that will mark the status object as finished\n", " # when the readback approaches within some tolerance of the setpoint.\n", " def callback(old_value, value, **kwargs):\n", " TOLERANCE = 1 # hard-coded; we'll make this configurable later on...\n", " if abs(value - setpoint) < TOLERANCE:\n", " status.set_finished()\n", " self.readback.clear_sub(callback)\n", " \n", " self.readback.subscribe(callback)\n", " \n", " # Now 'put' the value.\n", " self.setpoint.put(setpoint)\n", " \n", " # And return the Status object, which the caller can use to\n", " # tell when the action is complete.\n", " return status\n", " \n", " \n", "decay = Decay('decay', name='decay')\n", "decay.wait_for_connection()\n", "decay" ] }, { "cell_type": "code", "execution_count": null, "id": "resident-lecture", "metadata": {}, "outputs": [], "source": [ "decay.read()" ] }, { "cell_type": "code", "execution_count": null, "id": "committed-manner", "metadata": {}, "outputs": [], "source": [ "status = decay.set(115)" ] }, { "cell_type": "markdown", "id": "biological-adams", "metadata": {}, "source": [ "We can watch for completion either by registering a callback:" ] }, { "cell_type": "code", "execution_count": null, "id": "noted-throat", "metadata": {}, "outputs": [], "source": [ "def callback(status):\n", " print(\"DONE:\", status)\n", " \n", "status.add_callback(callback)" ] }, { "cell_type": "markdown", "id": "mature-plaintiff", "metadata": {}, "source": [ "or by blocking:" ] }, { "cell_type": "code", "execution_count": null, "id": "continued-cradle", "metadata": {}, "outputs": [], "source": [ "status = decay.set(120)\n", "status.wait() # blocks here\n", "print(\"DONE!\")" ] }, { "cell_type": "markdown", "id": "auburn-wisdom", "metadata": {}, "source": [ "### Make the tolerance configurable with a \"soft\" Signal" ] }, { "cell_type": "code", "execution_count": null, "id": "nonprofit-edmonton", "metadata": {}, "outputs": [], "source": [ "from ophyd import Signal\n", "\n", "class Decay(Device):\n", " \"\"\"\n", " A device with a setpoint and readback that decays exponentially toward the setpoint.\n", " \"\"\"\n", " readback = Component(EpicsSignalRO, ':I')\n", " setpoint = Component(EpicsSignal, ':SP')\n", " tolerance = Component(Signal, value=1) # not associated with anything in EPICS---a pure ophyd construct\n", " \n", " def set(self, setpoint):\n", " \"\"\"\n", " Set the setpoint and return a Status object that monitors the readback.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " \n", " # Wire up a callback that will mark the status object as finished\n", " # when the readback approaches within some tolerance of the setpoint.\n", " def callback(old_value, value, **kwargs):\n", " if abs(value - setpoint) < self.tolerance.get():\n", " status.set_finished()\n", " self.readback.clear_sub(callback)\n", " \n", " self.readback.subscribe(callback)\n", " \n", " # Now 'put' the value.\n", " self.setpoint.put(setpoint)\n", " \n", " # And return the Status object, which the caller can use to\n", " # tell when the action is complete.\n", " return status\n", " \n", " \n", "decay = Decay('decay', name='decay')\n", "status = decay.set(125)\n", "status.add_callback(callback)" ] }, { "cell_type": "code", "execution_count": null, "id": "diverse-firewall", "metadata": {}, "outputs": [], "source": [ "decay.tolerance.set(2)\n", "status = decay.set(130)\n", "status.add_callback(callback)" ] }, { "cell_type": "markdown", "id": "tribal-final", "metadata": {}, "source": [ "### Let the IOC tell us when it is done\n", "\n", "Some IOCs (but not all) provide a specific signal that we can use to know when a set is complete. In that case we can remove the \"tolerance\" logic entirely if we want to and trust the IOC." ] }, { "cell_type": "code", "execution_count": null, "id": "confidential-fourth", "metadata": {}, "outputs": [], "source": [ "class Decay(Device):\n", " \"\"\"\n", " A device with a setpoint and readback that decays exponentially toward the setpoint.\n", " \"\"\"\n", " readback = Component(EpicsSignalRO, ':I')\n", " setpoint = Component(EpicsSignal, ':SP')\n", " done = Component(EpicsSignalRO, ':done')\n", " \n", " def set(self, setpoint):\n", " \"\"\"\n", " Set the setpoint and return a Status object that monitors the 'done' PV.\n", " \"\"\"\n", " status = DeviceStatus(self)\n", " \n", " # Wire up a callback that will mark the status object as finished\n", " # when the done signal goes from low to high---that is, a positive edge.\n", " def callback(old_value, value, **kwargs):\n", " if old_value == 0 and value == 1:\n", " status.set_finished()\n", " self.done.clear_sub(callback)\n", " \n", " self.done.subscribe(callback)\n", " \n", " # Now 'put' the value.\n", " self.setpoint.put(setpoint)\n", " \n", " # And return the Status object, which the caller can use to\n", " # tell when the action is complete.\n", " return status\n", " \n", " \n", "decay = Decay('decay', name='decay')\n", "decay" ] }, { "cell_type": "code", "execution_count": null, "id": "diagnostic-concept", "metadata": {}, "outputs": [], "source": [ "status = decay.set(135)\n", "status.add_callback(callback)" ] }, { "cell_type": "markdown", "id": "distributed-flexibility", "metadata": {}, "source": [ "## `PVPositioner`\n", "\n", "The pattern of `readback`, `setpoint` and `done` is pretty common, so ophyd has a special `Device` subclass that writes the `set()` method for you if you provide components with these particular names." ] }, { "cell_type": "code", "execution_count": null, "id": "contrary-lecture", "metadata": {}, "outputs": [], "source": [ "from ophyd import PVPositioner\n", "\n", "class Decay(PVPositioner):\n", " \"\"\"\n", " A device with a setpoint and readback that decays exponentially toward the setpoint.\n", " \"\"\"\n", " readback = Component(EpicsSignalRO, ':I')\n", " setpoint = Component(EpicsSignal, ':SP')\n", " done = Component(EpicsSignalRO, ':done')\n", " # actuate = Component(EpicsSignal, ...) # the \"Go\" button, not applicable to this IOC, but sometimes needed\n", " \n", "decay = Decay('decay', name='decay')\n", "status = decay.set(140)\n", "status.add_callback(callback)" ] }, { "cell_type": "code", "execution_count": null, "id": "antique-sunset", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 5 }