{ "metadata": { "name": "", "signature": "sha256:5c6a42495904dcc9148e382e7fb67a3073d76d5886367b67aaa862cfa8e6dcc6" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "## Generator Pipelines using Workbench:\n", "If you don't know what Generator Pipelines here are some resources: \n", "\n", "- Intro material: [Generator Pipelines in Python](http://www.blangdon.com/writing/about/generator-pipelines-in-python/)\n", " \n", "- More advanced, LOTS of exercises: [David M. Beazley](http://www.dabeaz.com/generators/)\n", " \n", "- Video presentation: [V James Powell @ PyData NYC 2012](http://vimeo.com/53039281)\n", " \n", "**tl;dr** - Good for streaming data and building pipelines that iterate over streaming data.\n", "\n", "** How easy is it to get workbench to build streaming data pipeline thingys? **\n", "- Pretty darn easy...\n", "\n", "## Lets start up the workbench server...\n", "Run the workbench server (from somewhere, for the demo we're just going to start a local one)\n", "\n", "$ cd workbench\n", "$ ./workbench\n", "\n", "\n", "#### Okay so when the server starts up, it autoloads any worker plugins in the server/worker directory and dynamically monitors the directory, if a new python file shows up, it's validated as a properly formed plugin and if it passes is added to the list of workers." ] }, { "cell_type": "code", "collapsed": false, "input": [ "# Lets start to interact with workbench, please note there is NO specific client to workbench,\n", "# Just use the ZeroRPC Python, Node.js, or CLI interfaces.\n", "import zerorpc\n", "c = zerorpc.Client()\n", "c.connect(\"tcp://127.0.0.1:4242\")" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 76, "text": [ "[None]" ] } ], "prompt_number": 76 }, { "cell_type": "code", "collapsed": false, "input": [ "# Load in 100 PE Files\n", "import os\n", "file_list = [os.path.join('../data/pe/bad', child) for child in os.listdir('../data/pe/bad')]\n", "file_list += [os.path.join('../data/pe/good', child) for child in os.listdir('../data/pe/good')]\n", "md5_list = []\n", "for filename in file_list:\n", " with open(filename,'rb') as f:\n", " md5_list.append(c.store_sample(filename, f.read(), 'pe'))\n", "print 'Files loaded: %d' % len(md5_list)\n", "md5_list[:5]" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Files loaded: 100\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 77, "text": [ "['033d91aae8ad29ed9fbb858179271232',\n", " '0cb9aa6fb9c4aa3afad7a303e21ac0f3',\n", " '0e882ec9b485979ea84c7843d41ba36f',\n", " '0e8b030fb6ae48ffd29e520fc16b5641',\n", " '0eb9e990c521b30428a379700ec5ab3e']" ] } ], "prompt_number": 77 }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Notice the level on control on our batch operations\n", "#### Your database may have tons of files of different types. We literally control execution on the per sample level with md5 lists. Alternatively we can specify specific types or simply make a query to the database get exactly what we want and build our own md5 list.\n", "\n", "#### Also notice that we can specify ^exactly^ what data we want down to arbitrary depth.. here we want just the imported_symbols from the sparse features from the pe_features worker." ] }, { "cell_type": "code", "collapsed": false, "input": [ "# Compute pe_features on all files of type pe, just pull back the sparse features\n", "imports = c.batch_work_request('pe_features', {'md5_list': md5_list, 'subkeys':['md5','sparse_features.imported_symbols']})\n", "imports" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 78, "text": [ "