# Array Detector (non-EPICS)

In this tutorial we will take a first look at how we might add an array detector device to Ophyd.

To allow us to focus purely on the Ophyd side of things, we've stripped out EPICS entirely here and kept all other complexity to a minimum.

## Acquiring the Image

You will need to define a function that integrates directly the hardware to acquire an image and save it at a specified filepath. 

This function must return the array shape (i.e. dimensions) of the image. The name of the function does not matter.

In [None]:
import numpy
from pathlib import Path

def acquire_image(filepath):
 """
 This function should integrate directly with the hardware.
 
 No concepts particular to ophyd are involved here.
 Just tell the hardware to take an image, however that works.
 This function should block until acquisition is complete or
 raise if acquisition fails.
 
 It will be run on a worker thread, so it will not block
 ophyd / the RunEngine.
 """
 # For this tutorail, just generate a random image.
 from PIL import Image
 
 image = numpy.random.randint(0, 255, (512, 512)).astype('uint8')
 # Ensure the directory exists.
 Path(filepath).parent.mkdir(parents=True, exist_ok=True)
 # Save the image.
 Image.fromarray(image).save(filepath)
 return image.shape

In [None]:
acquire_image('test.jpg')

This sample function simply generated a random image in the current directory.

Let's have a quick look at it:

In [None]:
from IPython.display import Image
Image('test.jpg')

## Integrating with Ophyd and Bluesky

Let's get some imports out of the way before we move on:

In [None]:
import os
import uuid
import threading
import itertools

import requests
from ophyd import Device, Component, Signal, DeviceStatus
from ophyd.areadetector.filestore_mixins import resource_factory

We will need to define a signal to help us reference the image file:

In [None]:
class ExternalFileReference(Signal):
 """
 A pure software signal pointing to data in an external file
 
 The parent device is intended to set the value of this Signal to a datum_id.
 """
 def __init__(self, *args, shape, **kwargs):
 super().__init__(*args, **kwargs)
 self.shape = shape

 def describe(self):
 res = super().describe()
 # Tell consumers that readings from this Signal point to "external" data,
 # data that is not in-line in the reading itself.
 res[self.name].update(dict(external="FILESTORE:", dtype="array", shape=self.shape))
 return res

Our `Camera` device will use this ExternalFileReference, and implement the bulk of the staging and acquisition logic:

In [None]:
class Camera(Device):
 """
 An ophyd device for a camera that acquires images and saves them in files.
 """
 # We initialize the shape to [] and update it below once we know the shape
 # of the array.
 image = Component(ExternalFileReference, value="", kind="normal", shape=[])

 def __init__(self, *args, root_path, **kwargs):
 super().__init__(*args, **kwargs)
 self._root_path = root_path
 # Use this lock to ensure that we only process one "trigger" at a time.
 # Generally bluesky should care of this, so this is just an extra
 # precaution.
 self._acquiring_lock = threading.Lock()
 self._counter = None # set to an itertools.count object when staged
 # Accumulate Resource and Datum documents in this cache.
 self._asset_docs_cache = []
 # This string is included in the Resource documents to indicate which
 # can of reader ("handler") is needed to access the relevant data.
 self._SPEC = "MY_FORMAT_SPEC"

 def stage(self):
 # Set the filepath where will be saving images.
 self._rel_path_template = f"images/{uuid.uuid4()}_%d.jpg"
 # Create a Resource document referring to this series of images that we
 # are about to take, and stash it in _asset_docs_cache.
 resource, self._datum_factory = resource_factory(
 self._SPEC, self._root_path, self._rel_path_template, {}, "posix")
 self._asset_docs_cache.append(("resource", resource))
 self._counter = itertools.count()
 return super().stage()

 def unstage(self):
 self._counter = None
 self._asset_docs_cache.clear()
 return super().unstage()

 def trigger(self):
 status = DeviceStatus(self)
 if self._counter is None:
 raise RuntimeError("Device must be staged before triggering.")
 i = next(self._counter)
 # Start a background thread to capture an image and write it to disk.
 thread = threading.Thread(target=self._capture, args=(status, i))
 thread.start()
 # Promptly return a status object, which will be marked "done" when the
 # capture completes.
 return status

 def _capture(self, status, i):
 "This runs on a background thread."
 try:
 if not self._acquiring_lock.acquire(timeout=0):
 raise RuntimeError("Cannot trigger, currently triggering!")
 filepath = os.path.join(self._root_path, self._rel_path_template % i)
 # Kick off requests, or subprocess, or whatever with the result
 # that a file is saved at `filepath`.
 shape = acquire_image(filepath)
 self.image.shape = shape
 # Compose a Datum document referring to this specific image, and
 # stash it in _asset_docs_cache.
 datum = self._datum_factory({"index": i})
 self._asset_docs_cache.append(("datum", datum))
 self.image.set(datum["datum_id"]).wait()
 
 except Exception as exc:
 status.set_exception(exc)
 else:
 status.set_finished()
 finally:
 self._acquiring_lock.release()

 def collect_asset_docs(self):
 "Yield the documents from our cache, and reset it."
 yield from self._asset_docs_cache
 self._asset_docs_cache.clear()

Finally, we will need a File Handler to allow us to load data from the file. Handlers are explained in more detail in the [event model documentation](https://blueskyproject.io/event-model/external.html#handlers).

A simple one might look like this:

In [None]:
class MyHandler:
 def __init__(self, resource_path):
 # resource_path is really a template string with a %d in it
 self._template = resource_path

 def __call__(self, index):
 import PIL, numpy
 filepath = str(self._template) % index
 return numpy.asarray(PIL.Image.open(filepath))

And, of course, we will want an instance of our `Camera` device to work with:

In [None]:
camera = Camera(root_path="external_data", name="camera")
camera

## Manually walk through cycle

As before, we'll manually walk through the individual steps such as staging and reading from the device. Typically this would be done as part of a plan executed by the RunEngine.

In [None]:
camera.stage()

In [None]:
status = camera.trigger()
status

In [None]:
status

In [None]:
camera.describe()

In [None]:
camera.read()

In [None]:
documents = list(camera.collect_asset_docs())
documents

In [None]:
camera.unstage()

## Manually inspect documents and access array data

Let's take a closer look at what is going on inside the documents:

In [None]:
documents

We can pull out the interesting structures, and finally put our Handler to use:

In [None]:
_, resource_document = documents[0]
_, datum_document = documents[1]

handler = MyHandler(
 Path(resource_document["root"], resource_document["resource_path"]),
 **resource_document["resource_kwargs"]
)

When we invoke the handler and pass in the `datum_kwargs` with the `index`, we should get back an array with our data:

In [None]:
handler(**datum_document["datum_kwargs"])

## Use with Bluesky RunEngine and Databroker

In [None]:
from bluesky import RunEngine
from databroker.v2 import temp

RE = RunEngine()
db = temp()
RE.subscribe(db.v1.insert)

db.register_handler("MY_FORMAT_SPEC", MyHandler)

In [None]:
from bluesky.plans import count

In [None]:
RE(count([camera]))

In [None]:
run = db[-1] # Acccess the most recent run.
dataset = run.primary.read() # Access the dataset of its 'primary' stream.
dataset

In [None]:
dataset["camera_image"]