{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Documentation\n", "\n", "This notebook will cover the documentation for Nd-Multicore.\n", "\n", "## Use Case:\n", "This module is currently used for running embarrassingly parallel pipelines on data located in the BOSS. However, the modularity of the code should allow for further parallel functions in the future.\n", "\n", "Specifically, the current pipeline accomplishes the following things:\n", "\n", "1. Reads in BOSS data parameters through a neurodata.cfg file\n", "2. Breaks data into blocks.\n", "3. Runs any specified function on each block of data in a parallel fashion (using python3 multiprocessing library)\n", "\n", "## Important Notes:\n", "1. The function should accept a python dictionary as its only non-default parameter. The dictionary will contain channels as the keys and the raw data from the BOSS (usually numpy array) as the value. \n", "\n", "\n", "The rest of the notebook will focus on the steps of the pipeline and how the code works.\n", "\n", "## 1. Loading BOSS Resource\n", "\n", "The first step is to load the resource from the BOSS. We do this through the NeuroDataResource.py file and class. Look into neurodata.cfg.example to see what parameters you have to provide. This code is mostly handled for you and there should be little changes you have to make to it. For reference though, here is how the NeuroDataResource class functions:\n", "\n", "You should mainly concern yourself with the class variables and the get_cutout function if you are modifying this code so we just listed those below. If you want the entire source code, just open the python file." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class NeuroDataResource:\n", " def __init__(self, host, token, collection, experiment, requested_channels,\n", " x_range,\n", " y_range,\n", " z_range):\n", " # Create BOSS Remote\n", " self._bossRemote = BossRemote({'protocol': 'https',\n", " 'host': host,\n", " 'token': token})\n", " # Specify collection\n", " self.collection = collection\n", " # Specify experiment\n", " self.experiment = experiment\n", " # Pull all channels (used for validation of pulling data)\n", " self.channels = self._bossRemote.list_channels(collection, experiment)\n", " \n", " # Specify which channels you want to pull \n", " if len(requested_channels) == 0:\n", " self.requested_channels = self.channels\n", " else:\n", " self.requested_channels = requested_channels\n", " \n", " self._get_coord_frame_details()\n", " # validate range\n", " if not self.correct_range(z_range, y_range, x_range):\n", " raise Exception(\"Error: Inccorect dimension range\")\n", " # Range of data you are pulling from\n", " self.x_range = x_range\n", " self.y_range = y_range\n", " self.z_range = z_range\n", "\n", " def get_cutout(self, chan, zRange=None, yRange=None, xRange=None):\n", " if chan not in self.channels:\n", " print('Error: Channel Not Found in this Resource')\n", " return\n", " if zRange is None or yRange is None or xRange is None:\n", " print('Error: You must supply zRange, yRange, xRange kwargs in list format')\n", " return\n", "\n", " channel_resource = self._get_channel(chan)\n", " datatype = channel_resource.datatype\n", "\n", " data = self._bossRemote.get_cutout(channel_resource,\n", " 0,\n", " xRange,\n", " yRange,\n", " zRange)\n", "\n", " #Datatype check. Recast to original datatype if data is float64\n", " if data.dtype == datatype:\n", " return data\n", " else:\n", " return data.astype(datatype)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Compute Blocks\n", "After a NeuroDataResource object is created, the next step is to compute block sizes for your data. We show the function below" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "'''\n", " This function is designed to compute proper block sizes (less than 2 gb)\n", " when given a NDR\n", "\n", " Input:\n", " Resource NeuroDataResource class containing necessary parameters\n", " block_size (x, y, z) specifying size of blocks\n", "'''\n", "def compute_blocks(resource, block_size):\n", " x_start, x_end = resource.x_range\n", " y_start, y_end = resource.y_range\n", " z_start, z_end = resource.z_range\n", "\n", " blocks = intern.block_compute(x_start, x_end, y_start, y_end, z_start, z_end, (0, 0, 0), block_size)\n", " ### IMPORTANT blocks are returned as x, y, z ###\n", " for i in range(len(blocks)):\n", " x_range, y_range, z_range = blocks[i]\n", " # create Block object to preserve original location of block\n", " blocks[i] = Block(z_range, y_range, x_range)\n", " return blocks" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The purpose of this function is to break the total data cube into smaller blocks that can be run on in parallel. This is especially useful since BOSS only allows you to pull 2gbs with each request.\n", "\n", "The main function is done through the block_compute function that the Intern package provides. We structure these blocks into Block objects. These objects are used to maintain metadata about where each block originally came from in the BOSS.\n", "\n", "Note that this function does nothing with the data variable for each block." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Block:\n", " def __init__(self, z_range, y_range, x_range):\n", " self.x_start = x_range[0]\n", " self.x_end = x_range[1]\n", " self.y_start = y_range[0]\n", " self.y_end = y_range[1]\n", " self.z_start = z_range[0]\n", " self.z_end = z_range[1]\n", " self.data = None" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Run a function on each block in parallel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "'''\n", " This is the main driver function to start multiprocessing\n", "\n", " Input:\n", " config_file Neurodata config file\n", " function function to be run, must take in Data Dictionary!\n", " cpus number of cpus to use\n", " block_size size of blocks\n", "'''\n", "def run_parallel(config_file, function, cpus = None, block_size = (1000, 1000, 10)):\n", " ## Make resource and compute blocks\n", " resource = ndr.get_boss_resource(config_file)\n", " blocks = compute_blocks(resource, block_size)\n", " ## prepare job by fixing NeuroDataRresource argument\n", " task = partial(job, resource = resource, function = function)\n", " ## Prepare pool\n", " num_workers = cpus\n", " if num_workers is None:\n", " num_workers = mp.cpu_count() - 1\n", " pool = mp.Pool(num_workers)\n", " try:\n", " print(pool.map(task, blocks))\n", " except:\n", " pool.terminate()\n", " print(\"Parallel failed, closing pool and exiting\")\n", " raise\n", " pool.terminate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, notice that this function includes instantiating the resource and computing all the blocks (get_boss_resource and compute_blocks).\n", "\n", "The next step is just a trick with python functions. We use the \"partial\" function to fix certain parameters for the function \"job\". In particular, we are specifying that job should take in the NeuroDataResource object we just created and the function provided in the parameters.\n", "\n", "This function, called \"task\", is what will be run across each block object in parallel.\n", "\n", "Some other things to note, default block size is provided, and if cpus are not specified, we will automatically use the total number of cpus your node has - 1.\n", "\n", "Here is the job we are running:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "'''\n", " This function pulls data from BOSS, and runs a function on it\n", "\n", " Input:\n", " block Block object without raw data\n", " resource NeuroDataResource object\n", " function pipeline to be run on data\n", " Output:\n", " String of block key (z_start, y_start, x_start)\n", "'''\n", "def job(block, resource, function = None):\n", "\n", " print(\"Starting job, retrieiving data\")\n", " block = get_data(resource, block)\n", " print(\"Starting algorithm\")\n", " try:\n", " result = function(block.data)\n", " except Exception as ex:\n", " print(ex)\n", " print(\"Ran into error in algorithm, exiting this block\")\n", " return\n", "\n", " key = str(block.z_start) + \"_\" + str(block.y_start) + \"_\" + str(block.x_start)\n", " print(\"Done with job\")\n", " return key\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "All this job does is populate the Block object with a data dictionary through the get_data function, then runs the specified function on the data dictionary. Note that if there is an exception in the function provided, the parallel process will not end but that block is skipped.\n", "\n", "On the other hand, if there is an error with getting the data or the overall job function, the entire pool will terminate!\n", "\n", "One final note, in the function you provide, you should also handle saving your results, merging etc." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4" } }, "nbformat": 4, "nbformat_minor": 2 }