{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Ingesting Third-Party Files\n", "\n", "Why? By couching data from third-party files into the Bluesky Event Model, downstream tooling expect a common in-memory layout, regardless of variations between scientific domain or file format.\n", "\n", "How? It is always possible to represent experimental data, fundamentally, as one or more time series because that is how the data is acquired in the first place." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Write an Ingestor\n", "\n", "Write a generator that yields one RunStart document and subsequent documents relating to that run. The signature of the generator is up to the implementor: it may take any arguments, position or keyword, required or optional.\n", "\n", "Note that it will not be enough to have one of these for \"format\" (e.g. TIFF series) because examples of different origin will have different conventions for metadata in filesnames, etc. We will likely need separate ingestors corresponding each software that generates the files to be ingested.\n", "\n", "We will also need one additional layer, above ingestors, to support a dialog box where users select file(s) and the appropriate ingestor or ingestors for those files are chosen." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import base64\n", "import hashlib\n", "import glob\n", "import os\n", "\n", "import event_model\n", "import tifffile\n", "\n", "\n", "def ingest_tiff_series(pattern):\n", " \"\"\"\n", " Wrap one TIFF series in the document model.\n", " \"\"\"\n", " file_list = sorted(glob.glob(pattern))\n", " if not file_list:\n", " raise ValueError(f\"No matches for {pattern!r}\")\n", " \n", " # We would like a deterministic UID for this group of files.\n", " # This is one possible approach, maybe not the best one.\n", " m = hashlib.sha256()\n", " [m.update(filename.encode()) for filename in file_list]\n", " uid = base64.urlsafe_b64encode(m.digest()).decode()\n", " \n", " mtime = os.path.getmtime(file_list[0])\n", " \n", " # RunStart Document\n", " run_bundle = event_model.compose_run(uid=uid, time=mtime)\n", " yield 'start', run_bundle.start_doc\n", " \n", " # Peek at the first image to get shape, dtype.\n", " img = tifffile.imread(file_list[0])\n", " \n", " # EventDescriptor Document\n", " shape = img.shape\n", " dtype = 'number' # must be one of the jsonschema types\n", " data_keys = {'image': {'shape': shape, 'dtype': 'number', 'source': pattern}}\n", " stream_bundle = run_bundle.compose_descriptor(data_keys=data_keys, name='primary')\n", " yield 'descriptor', stream_bundle.descriptor_doc\n", " \n", " # Events or EventPages\n", " for filename in file_list:\n", " mtime = os.path.getmtime(filename)\n", " img = tifffile.imread(filename)\n", " if img.shape != shape:\n", " raise ValueError(f\"Expected series of images of shape {shape} \"\n", " f\"but {filename} has shape {img.shape}\")\n", " yield 'event', stream_bundle.compose_event(data={'image': img},\n", " timestamps={'image': mtime},\n", " time=mtime)\n", " \n", " # RunStop Document\n", " yield 'stop', run_bundle.compose_stop(time=mtime)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Populate a `BlueskyInMemoryCatalog`\n", "\n", "Here we instantiate an empty Catalog and then add one `BlueskyRun` to it. The signature of `upsert` is a generator function, a tuple of position arguments to be passed to that function, and a dictionary of keyword arguments to be passed. As stated above, the generated is expected to yield one RunStart document and subsequent documents related to that one run. It may yield only a partial run if the experiment is still in progress or was ungracefully interrupted.\n", "\n", "If the `uid` of the run is the same as one previously passed to `upsert`, it will replaced the previous one, as the name \"upsert\" (adopted from the database jargon for \"update/insert\") suggests." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from intake_bluesky.in_memory import BlueskyInMemoryCatalog\n", "\n", "catalog = BlueskyInMemoryCatalog()\n", "catalog.upsert(ingest_tiff_series, ('files/*.tiff',), {})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Access the data" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=']" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(catalog)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "run = catalog['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=']" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "run.read_canonical() # a stream of documents like what the bluesky RunEngine would emit" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "start\n", "descriptor\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "stop\n" ] } ], "source": [ "for name, doc in run.read_canonical():\n", " print(name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The data from a particular stream can also be accessed as an `xarray.Dataset`, which is nice for interactive work." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "Dimensions: (dim_0: 512, dim_1: 512, time: 10)\n", "Coordinates:\n", " * time (time) float64 1.559e+09 1.559e+09 ... 1.559e+09 1.559e+09\n", "Dimensions without coordinates: dim_0, dim_1\n", "Data variables:\n", " image (time, dim_0, dim_1) int64 217 155 77 12 108 ... 211 34 108 2 182\n", " seq_num (time) int64 1 2 3 4 5 6 7 8 9 10\n", " uid (time) \n", "array(334187042, dtype=int64)" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "run.primary.read()['image'].sum()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Another Ingestor: TIFF Stack" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "import base64\n", "import hashlib\n", "import glob\n", "import os\n", "\n", "import event_model\n", "import tifffile\n", "\n", "\n", "def ingest_tiff_stack(filename):\n", " \"\"\"\n", " Wrap one TIFF series in the document model.\n", " \"\"\"\n", " # We would like a deterministic UID for this group of files.\n", " # This is one possible approach, maybe not the best one.\n", " m = hashlib.sha256()\n", " m.update(filename.encode())\n", " uid = base64.urlsafe_b64encode(m.digest()).decode()\n", " \n", " mtime = os.path.getmtime(filename)\n", " \n", " # RunStart Document\n", " run_bundle = event_model.compose_run(uid=uid, time=mtime)\n", " yield 'start', run_bundle.start_doc\n", "\n", " img_stack = tifffile.imread(filename)\n", " \n", " # EventDescriptor Document\n", " shape = img_stack.shape[1:]\n", " dtype = 'number' # must be one of the jsonschema types\n", " data_keys = {'image': {'shape': shape, 'dtype': 'number', 'source': filename}}\n", " stream_bundle = run_bundle.compose_descriptor(data_keys=data_keys, name='primary')\n", " yield 'descriptor', stream_bundle.descriptor_doc\n", " \n", " # Events or EventPages\n", " len_ = len(img_stack)\n", " yield 'event_page', stream_bundle.compose_event_page(data={'image': img_stack},\n", " timestamps={'image': [mtime] * len_},\n", " time=[mtime] * len_,\n", " seq_num=list(range(1, len_ + 1)),\n", " validate=False) # Work around bug in event-model validator.\n", " \n", " # RunStop Document\n", " yield 'stop', run_bundle.compose_stop(time=mtime)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "catalog.upsert(ingest_tiff_stack, ('files/stack/stack.tiff',), {})" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=',\n", " 'PylgaOkBfHebVoDhBEusZvBcnGF7WNiFYecrfQvjZdg=']" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(catalog)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "run = catalog['PylgaOkBfHebVoDhBEusZvBcnGF7WNiFYecrfQvjZdg=']" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "start\n", "descriptor\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "event\n", "stop\n" ] } ], "source": [ "for name, doc in run.read_canonical():\n", " print(name)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "array(334198927, dtype=int64)" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "run.primary.read()['image'].sum()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Mongo-like search on data from files\n", "\n", "The library `mongoquery` is used to provide a large subset of the MongoDB query API, even though no MongoDB is present." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(catalog.search({'time': {'$lt': 1558887100}})) # narrows results to the TIFF Series example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reserialization\n", "\n", "The `BlueskyInMemoryCatalog` \"ingests\" data into memory. A separate process would have to repeat the conversion from TIFF to documents. If the same data will be accessedd repeatedly, it may be convenient to make a separate copy of the data in a richer format, such as msgpack, that can encode the documents more literally. This will roughly double the storage required but likely significantly expedite the data-loading process." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "from suitcase.msgpack import export\n", "\n", "for uid in catalog:\n", " export(catalog[uid].read_canonical(), 'native_copies')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is not simple and fast to pull up a catalog of the re-serialized data." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['PylgaOkBfHebVoDhBEusZvBcnGF7WNiFYecrfQvjZdg=',\n", " 'ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=']" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from intake_bluesky.msgpack import BlueskyMsgpackCatalog\n", "\n", "native_catalog = BlueskyMsgpackCatalog('native_copies/*.msgpack')\n", "list(native_catalog)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Laziness\n", "\n", "There are a couple layers of laziness available in this system.\n", "\n", "1. Until `read_canonical()` or `read()` is called, the ingestor generator is only pulled until it yeilds the first document (RunStart) to support search. Depending on the ingestor, this may mean that file need not be fully read or even opened until/unless the corresponding catalog entry is read.\n", "2. In the examples above, the data is read it as numpy arrays. It may instead be read in as dask arrays, as illustrated below.\n", "3. Finally, these examples illustrate a local intake Catalog. A remote users with an intake client accessing data from a remote intake Catalog served by an intake server can leverage the dask-backed deferred transport built into intake and intake-bluesky, which will send the data form server to client in compressed chunks when it is accessed or explicited loaded." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "import dask.array.image\n", "\n", "\n", "def ingest_tiff_series_dask(pattern):\n", " \"\"\"\n", " Wrap one TIFF series in the document model.\n", " \"\"\"\n", " file_list = sorted(glob.glob(pattern))\n", " if not file_list:\n", " raise ValueError(f\"No matches for {pattern!r}\")\n", " \n", " # We would like a deterministic UID for this group of files.\n", " # This is one possible approach, maybe not the best one.\n", " m = hashlib.sha256()\n", " [m.update(filename.encode()) for filename in file_list]\n", " uid = base64.urlsafe_b64encode(m.digest()).decode()\n", " \n", " mtime = os.path.getmtime(file_list[0])\n", " \n", " # RunStart Document\n", " run_bundle = event_model.compose_run(uid=uid, time=mtime)\n", " yield 'start', run_bundle.start_doc\n", " \n", " # Peek at the first image to get shape, dtype.\n", " img = tifffile.imread(file_list[0])\n", " \n", " # EventDescriptor Document\n", " shape = img.shape\n", " dtype = 'number' # must be one of the jsonschema types\n", " data_keys = {'image': {'shape': shape, 'dtype': 'number', 'source': pattern}}\n", " stream_bundle = run_bundle.compose_descriptor(data_keys=data_keys, name='primary')\n", " yield 'descriptor', stream_bundle.descriptor_doc\n", " \n", " # Events or EventPages\n", " for filename in file_list:\n", " mtime = os.path.getmtime(filename)\n", " img = dask.array.image.imread(filename)\n", " img = img.reshape(img.shape[1:]) # dask.imread gives us an extra dimension of len 1\n", " if img.shape != shape:\n", " raise ValueError(f\"Expected series of images of shape {shape} \"\n", " f\"but {filename} has shape {img.shape}\")\n", " yield 'event', stream_bundle.compose_event(data={'image': img},\n", " timestamps={'image': mtime},\n", " time=mtime)\n", " \n", " # RunStop Document\n", " yield 'stop', run_bundle.compose_stop(time=mtime)" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "catalog.upsert(ingest_tiff_series_dask, ('files/*.tiff',), {})" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=',\n", " 'PylgaOkBfHebVoDhBEusZvBcnGF7WNiFYecrfQvjZdg=']" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(catalog) # The 'upsert' method has replaced the non-lazy version of this run that had the same uid." ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'uid': '9ded224d-6dbf-40f3-ae4f-df736a2b23e3',\n", " 'time': 1559144355.3176703,\n", " 'data': {'image': dask.array},\n", " 'timestamps': {'image': 1559144355.3176703},\n", " 'seq_num': 1,\n", " 'filled': {},\n", " 'descriptor': '63d737c2-0301-4b65-be44-f9c098a65ce6'}" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "for name, doc in catalog['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s='].read_canonical():\n", " if name == 'event':\n", " break\n", " \n", "doc" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([[217, 155, 77, ..., 184, 130, 185],\n", " [ 17, 173, 229, ..., 168, 124, 144],\n", " [ 90, 210, 231, ..., 28, 16, 98],\n", " ...,\n", " [237, 235, 88, ..., 237, 222, 207],\n", " [184, 9, 252, ..., 232, 210, 91],\n", " [116, 240, 234, ..., 193, 187, 157]], dtype=int64)" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "doc['data']['image'].compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" } }, "nbformat": 4, "nbformat_minor": 4 }