# Documentation

This notebook will cover the documentation for Nd-Multicore.

## Use Case:
This module is currently used for running embarrassingly parallel pipelines on data located in the BOSS. However, the modularity of the code should allow for further parallel functions in the future.

Specifically, the current pipeline accomplishes the following things:

1. Reads in BOSS data parameters through a neurodata.cfg file
2. Breaks data into blocks.
3. Runs any specified function on each block of data in a parallel fashion (using python3 multiprocessing library)

## Important Notes:
1. The function should accept a python dictionary as its only non-default parameter. The dictionary will contain channels as the keys and the raw data from the BOSS (usually numpy array) as the value. 


The rest of the notebook will focus on the steps of the pipeline and how the code works.

## 1. Loading BOSS Resource

The first step is to load the resource from the BOSS. We do this through the NeuroDataResource.py file and class. Look into neurodata.cfg.example to see what parameters you have to provide. This code is mostly handled for you and there should be little changes you have to make to it. For reference though, here is how the NeuroDataResource class functions:

You should mainly concern yourself with the class variables and the get_cutout function if you are modifying this code so we just listed those below. If you want the entire source code, just open the python file.

In [None]:
class NeuroDataResource:
 def __init__(self, host, token, collection, experiment, requested_channels,
 x_range,
 y_range,
 z_range):
 # Create BOSS Remote
 self._bossRemote = BossRemote({'protocol': 'https',
 'host': host,
 'token': token})
 # Specify collection
 self.collection = collection
 # Specify experiment
 self.experiment = experiment
 # Pull all channels (used for validation of pulling data)
 self.channels = self._bossRemote.list_channels(collection, experiment)
 
 # Specify which channels you want to pull 
 if len(requested_channels) == 0:
 self.requested_channels = self.channels
 else:
 self.requested_channels = requested_channels
 
 self._get_coord_frame_details()
 # validate range
 if not self.correct_range(z_range, y_range, x_range):
 raise Exception("Error: Inccorect dimension range")
 # Range of data you are pulling from
 self.x_range = x_range
 self.y_range = y_range
 self.z_range = z_range

 def get_cutout(self, chan, zRange=None, yRange=None, xRange=None):
 if chan not in self.channels:
 print('Error: Channel Not Found in this Resource')
 return
 if zRange is None or yRange is None or xRange is None:
 print('Error: You must supply zRange, yRange, xRange kwargs in list format')
 return

 channel_resource = self._get_channel(chan)
 datatype = channel_resource.datatype

 data = self._bossRemote.get_cutout(channel_resource,
 0,
 xRange,
 yRange,
 zRange)

 #Datatype check. Recast to original datatype if data is float64
 if data.dtype == datatype:
 return data
 else:
 return data.astype(datatype)


## 2. Compute Blocks
After a NeuroDataResource object is created, the next step is to compute block sizes for your data. We show the function below

In [None]:
'''
 This function is designed to compute proper block sizes (less than 2 gb)
 when given a NDR

 Input:
 Resource NeuroDataResource class containing necessary parameters
 block_size (x, y, z) specifying size of blocks
'''
def compute_blocks(resource, block_size):
 x_start, x_end = resource.x_range
 y_start, y_end = resource.y_range
 z_start, z_end = resource.z_range

 blocks = intern.block_compute(x_start, x_end, y_start, y_end, z_start, z_end, (0, 0, 0), block_size)
 ### IMPORTANT blocks are returned as x, y, z ###
 for i in range(len(blocks)):
 x_range, y_range, z_range = blocks[i]
 # create Block object to preserve original location of block
 blocks[i] = Block(z_range, y_range, x_range)
 return blocks

The purpose of this function is to break the total data cube into smaller blocks that can be run on in parallel. This is especially useful since BOSS only allows you to pull 2gbs with each request.

The main function is done through the block_compute function that the Intern package provides. We structure these blocks into Block objects. These objects are used to maintain metadata about where each block originally came from in the BOSS.

Note that this function does nothing with the data variable for each block.

In [None]:
class Block:
 def __init__(self, z_range, y_range, x_range):
 self.x_start = x_range[0]
 self.x_end = x_range[1]
 self.y_start = y_range[0]
 self.y_end = y_range[1]
 self.z_start = z_range[0]
 self.z_end = z_range[1]
 self.data = None

## 3. Run a function on each block in parallel

In [None]:
'''
 This is the main driver function to start multiprocessing

 Input:
 config_file Neurodata config file
 function function to be run, must take in Data Dictionary!
 cpus number of cpus to use
 block_size size of blocks
'''
def run_parallel(config_file, function, cpus = None, block_size = (1000, 1000, 10)):
 ## Make resource and compute blocks
 resource = ndr.get_boss_resource(config_file)
 blocks = compute_blocks(resource, block_size)
 ## prepare job by fixing NeuroDataRresource argument
 task = partial(job, resource = resource, function = function)
 ## Prepare pool
 num_workers = cpus
 if num_workers is None:
 num_workers = mp.cpu_count() - 1
 pool = mp.Pool(num_workers)
 try:
 print(pool.map(task, blocks))
 except:
 pool.terminate()
 print("Parallel failed, closing pool and exiting")
 raise
 pool.terminate()

First, notice that this function includes instantiating the resource and computing all the blocks (get_boss_resource and compute_blocks).

The next step is just a trick with python functions. We use the "partial" function to fix certain parameters for the function "job". In particular, we are specifying that job should take in the NeuroDataResource object we just created and the function provided in the parameters.

This function, called "task", is what will be run across each block object in parallel.

Some other things to note, default block size is provided, and if cpus are not specified, we will automatically use the total number of cpus your node has - 1.

Here is the job we are running:

In [None]:
'''
 This function pulls data from BOSS, and runs a function on it

 Input:
 block Block object without raw data
 resource NeuroDataResource object
 function pipeline to be run on data
 Output:
 String of block key (z_start, y_start, x_start)
'''
def job(block, resource, function = None):

 print("Starting job, retrieiving data")
 block = get_data(resource, block)
 print("Starting algorithm")
 try:
 result = function(block.data)
 except Exception as ex:
 print(ex)
 print("Ran into error in algorithm, exiting this block")
 return

 key = str(block.z_start) + "_" + str(block.y_start) + "_" + str(block.x_start)
 print("Done with job")
 return key


All this job does is populate the Block object with a data dictionary through the get_data function, then runs the specified function on the data dictionary. Note that if there is an exception in the function provided, the parallel process will not end but that block is skipped.

On the other hand, if there is an error with getting the data or the overall job function, the entire pool will terminate!

One final note, in the function you provide, you should also handle saving your results, merging etc.