{ "metadata": { "name": "" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "#(Easy) High performance text processing in Machine Learning*\n", "\n", "#ML meetup\n", "##*Joint work with Ian Langmore\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#[Rosetta*](https://github.com/columbia-applied-data-science/rosetta) \n", "###(*created with Ian Langmore; contributors include Matt DeLand, Thomas Nyberg, et al)\n", "\n", "##Tools for data science with a focus on text processing.\n", "\n", "* Focuses on \"medium data\", i.e. data too big to fit into memory but too small to necessitate the use of a cluster.\n", "* Integrates with existing scientific Python stack as well as select outside tools.\n", "\n", "## Tools and utilities \n", "\n", "### `cmd` \n", "* Unix-like command line utilities. Filters (read from stdin/write to stdout) for files\n", "\n", "### `parallel` \n", "* Wrappers for Python multiprocessing that add ease of use\n", "* Memory-friendly multiprocessing\n", "\n", "### `text`\n", "* Stream text from disk to formats used in common ML processes\n", " * file, database, and generic streamers\n", "* Write processed text to sparse formats\n", "* Helpers for ML tools (e.g. Vowpal Wabbit, Gensim, etc...)\n", "* Other general utilities\n", "\n", "### `workflow`\n", "* High-level wrappers that have helped with our workflow and provide additional examples of code use\n", "\n", "### `modeling`\n", "* General ML modeling utilities\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#Lets begin" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "###for the purpose of this tutorial we will be working with a collection of about 1000 declassified government embassy cables\n", "* you can replace the data with any top-directory of text files or you really want this particular data set feel free to email me and I'll send \"Declassification Engine\" REST API documentation\n" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import os\n", "import pandas as pd" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 5 }, { "cell_type": "markdown", "metadata": {}, "source": [ "#Text Processors" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##Streamers\n", "* Data in many text processing problems comes in the form of \n", " * flat files\n", " * repeated calls to an DB or API\n", " * some 'online' stream\n", "* A lot of these can be handled streaming the data either from disk, DB, API minimizing CPU use\n", "* In addition, a lot of streaming is embarassingly parallel so can be easily scaled\n" ] }, { "cell_type": "code", "collapsed": false, "input": [ "#all you realy need to know is that CABLES is the directory where the data (or cables)\n", "#are stored on your machine\n", "DATA = os.environ['DATA']\n", "CABLES = os.path.join(DATA, 'declass', 'cables_short')\n", "RAW = os.path.join(CABLES, 'raw')\n", "PROCESSED = os.path.join(CABLES, 'processed')\n", "SPARSE = os.path.join(CABLES, 'sparse')\n", "\n", "sfile_path = os.path.join(SPARSE, 'cables-short.vw')\n", "filtered_sfile_path = os.path.join(PROCESSED, 'cables-short-filtered.vw')\n", "sff_path = os.path.join(PROCESSED, 'sff.pkl')" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 6 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Streaming: given a collection of objects streaming is the paradigm of processing these objects one at a time in memory, extracting relevant information, writing the information, and discarding the original object \n", "\n", "###Note: after a streaming process is complete, the original collection should no longer be needed for the analysis at hand" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Lets write a simple file streamer" ] }, { "cell_type": "code", "collapsed": false, "input": [ "#filefilter is a module which helps with basic file/dir functions, such as\n", "#retrieving all paths from a given directory and it's subdir's\n", "from rosetta.text import filefilter" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 7 }, { "cell_type": "code", "collapsed": false, "input": [ "def simple_file_streamer(base_path):\n", " paths = filefilter.get_paths(base_path, get_iter=True)\n", " for p in paths:\n", " with open(p) as f:\n", " text = f.read()\n", " yield(text)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 8 }, { "cell_type": "markdown", "metadata": {}, "source": [ "###In case you haven't worked much with iterators explicitely, here is a small refresher... \n", "* For those familiar with generator functions, or iterators, you'll notice that this is exactly what we mean by \"streamer,\" i.e. anything that retrieves files or extracts information from therein and has a .next() method\n", "* python docs have a short intro about generators (http://docs.python.org/2/tutorial/classes.html#generators)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def my_iter(N):\n", " i=0\n", " while True:\n", " if i == N:\n", " raise StopIteration\n", " else:\n", " yield i\n", " i += 1\n", " \n", " " ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 9 }, { "cell_type": "code", "collapsed": false, "input": [ "mi = my_iter(5)\n", "\n" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 10 }, { "cell_type": "code", "collapsed": true, "input": [ "mi.next()" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 11, "text": [ "0" ] } ], "prompt_number": 11 }, { "cell_type": "code", "collapsed": false, "input": [ "#note the raised StopIteration; lets see how a for look handles this\n", "\n", "for i in my_iter(5):\n", " print i\n", " " ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "0\n", "1\n", "2\n", "3\n", "4\n" ] } ], "prompt_number": 12 }, { "cell_type": "markdown", "metadata": {}, "source": [ "###so even if you have not thought about iterators, you have been using them throughout \n", "###now back to our streamer" ] }, { "cell_type": "code", "collapsed": false, "input": [ "simple_stream = simple_file_streamer(RAW)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 13 }, { "cell_type": "code", "collapsed": false, "input": [ "#lets look at what this object is\n", "type(simple_stream)" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 14, "text": [ "generator" ] } ], "prompt_number": 14 }, { "cell_type": "markdown", "metadata": {}, "source": [ "###Note: this is the first time anything is read into memory" ] }, { "cell_type": "code", "collapsed": false, "input": [ "#lets see what the .next() yields (and splitlines to make it more readable)\n", "simple_stream.next().splitlines()" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 15, "text": [ "['UNCLASSIFIED',\n", " '',\n", " 'PAGE 01 ACCRA 01284 091110Z',\n", " 'ACTION OCS-06',\n", " '',\n", " 'INFO OCT-00 ADS-00 AF-10 AMAD-01 CA-01 /018 W',\n", " ' ------------------203337 091121Z /38',\n", " 'O 081104Z FEB 73',\n", " 'FM AMEMBASSY ACCRA',\n", " 'TO AMEMBASSY MONROVIA IMMEDIATE',\n", " 'INFO SECSTATE WASHDC 4185',\n", " 'AMEMBASSY ABIDJAN',\n", " '',\n", " 'UNCLAS ACCRA 01284',\n", " '',\n", " 'E.O. 12356: N/A',\n", " 'TAGS: CASC (AKINS, ESTHER)',\n", " 'SUBJ: WELFARE/WHEREABOUTS: ESTHER AKINS',\n", " '',\n", " 'REF: MONROVIA 01199 (NOTAL)',\n", " '',\n", " '1. MS. AKINS LAST REGISTERED WITH THE EMBASSY ON MARCH 23,',\n", " '1981. SHE LATER REPORTED SHE WAS DUE TO LEAVE GHANA ON',\n", " 'MARCH 2, 1982.',\n", " '',\n", " '2. ATTEMPTS TO REACH HER BY PHONE THROUGH THE INSTITUTE',\n", " 'OF LINGUISTICS, HER CONTACT ADDRESS AT THE TIME OF HER',\n", " '1981-82 STAY IN GHANA AND OTHER MISSIONARIES HAS PROVED',\n", " 'UNSUCCESSFUL. THE SOURCE OF LIGHT MISSION IS NOT, RPT NOT,',\n", " 'KNOWN TO US.',\n", " '',\n", " '3. WE WILL MAKE ADDITIONAL EFFORTS TO LOCATE MS. AKINS,',\n", " 'AND WILL INFORM YOU IF WE HAVE ANY SUCCESS, BUT ADDITIONAL',\n", " 'CONTACT ADDRESSES FOR HER WOULD BE HELPFUL. WRIGHT',\n", " '',\n", " '',\n", " '',\n", " '',\n", " '',\n", " '',\n", " '',\n", " 'UNCLASSIFIED',\n", " '',\n", " '',\n", " '',\n", " '',\n", " '',\n", " '',\n", " '',\n", " '',\n", " '',\n", " '',\n", " 'NNN']" ] } ], "prompt_number": 15 }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "* Now since our end goal is to build a topic model we probably want to have a more fexible streamer, i.e. one that can return file ids, text or tokens (based on some predefined tokenizer)\n", " * luckily we have one such streamer written" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from rosetta import TextFileStreamer, TokenizerBasic\n", "text_streamer = TextFileStreamer(text_base_path=RAW, file_type='*', \n", " tokenizer=TokenizerBasic())\n" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 16 }, { "cell_type": "code", "collapsed": false, "input": [ "from rosetta.text import streamers" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 17 }, { "cell_type": "code", "collapsed": false, "input": [ "stream = text_streamer.info_stream()" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 18 }, { "cell_type": "code", "collapsed": true, "input": [ "stream.next()" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 20, "text": [ "{'atime': 1399823974.0,\n", " 'cached_path': '/Users/danielkrasner/DATA_master/prod/declass/cables_short/raw/1976YAOUND00800',\n", " 'doc_id': '1976YAOUND00800',\n", " 'mtime': 1383854248.0,\n", " 'size': 475,\n", " 'text': 'UNCLASSIFIED\\n\\nPAGE 01 YAOUND 00800 030654Z\\n\\n20\\nACTION CU-04\\n\\nINFO OCT-01 AF-06 ISO-00 USIA-15 /026 W\\n --------------------- 100150\\nR 021600Z MAR 76\\nFM AMEMBASSY YAOUNDE\\nTO SECSTATE WASHDC 7937\\n\\nUNCLAS YAOUNDE 0800\\n\\nFOR CU/AF\\n\\nE.O. 11652: N/A\\nTAGS: OEXC EIV ETRD\\nSUBJ: MULTI-REGIONAL PROJECT ON INTERNATIONAL INVESTMENT,\\n MARCH 21 TO APRIL 19, 1976\\n\\nREF: STATE 044439\\n\\nPOST REGRETS UNABLE NOMINATE CANDIDATE.\\nSPIRO\\n\\n\\nUNCLASSIFIED\\n\\n\\n\\n\\nNNN',\n", " 'tokens': ['unclassified',\n", " 'page',\n", " 'yaound',\n", " 'action',\n", " 'cu',\n", " 'info',\n", " 'oct',\n", " 'af',\n", " 'iso',\n", " 'usia',\n", " 'mar',\n", " 'fm',\n", " 'amembassy',\n", " 'yaounde',\n", " 'secstate',\n", " 'washdc',\n", " 'unclas',\n", " 'yaounde',\n", " 'e.o',\n", " 'tags',\n", " 'oexc',\n", " 'eiv',\n", " 'etrd',\n", " 'subj',\n", " 'multi',\n", " 'regional',\n", " 'project',\n", " 'international',\n", " 'investment',\n", " 'march',\n", " 'april',\n", " 'ref',\n", " 'state',\n", " 'post',\n", " 'regrets',\n", " 'unable',\n", " 'nominate',\n", " 'candidate',\n", " 'spiro',\n", " 'unclassified',\n", " 'nnn']}" ] } ], "prompt_number": 20 }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Note: you can pass a tokenizer function to TextFileStreamer(), i.e. any function that takes a string of text and returns a list of strings (the \"tokens\")\n", " * We have written a basic tokenizer function and class to add functionality and because the nltk.word_tokenize() was slow\n", "* It also has a few other nice options such as shuffle, file_type, etc and a bunch of methods" ] }, { "cell_type": "code", "collapsed": false, "input": [ "text = stream.next()['text']" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 26 }, { "cell_type": "code", "collapsed": true, "input": [ "print text" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "UNCLASSIFIED\n", "\n", "PAGE 01 ZAGREB 00018 121152Z\n", "\n", "44\n", "ACTION VOE-00\n", "\n", "INFO OCT-01 ISO-00 /001 W\n", " --------------------- 058313\n", "R 081530Z JAN 76\n", "FM AMCONSUL ZAGREB\n", "TO SECSTATE WASHDC 3529\n", "\n", "UNCLAS ZAGREB 0018\n", "\n", "E.O. 11652: N/A\n", "TAGS: CVIS YO\n", "SUBJECT: FS-258, DEC. 1975\n", "\n", "1. PLEASE CORRECT FS-258 DEC. 1975. FIVE IR-5 ISSUED, INSTEAD\n", "OF THREE AS REPORTED. THAT CHANGES THE TOTAL TO 32 INSTEAD OF\n", "30. KAISER\n", "\n", "\n", "UNCLASSIFIED\n", "\n", "\n", "\n", "\n", "NNN\n" ] } ], "prompt_number": 27 }, { "cell_type": "code", "collapsed": false, "input": [ "text_streamer.tokenizer.text_to_token_list(text)\n", "#text_streamer.tokenizer.text_to_counter(text)" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 28, "text": [ "['unclassified',\n", " 'page',\n", " 'zagreb',\n", " 'action',\n", " 'voe',\n", " 'info',\n", " 'oct',\n", " 'iso',\n", " 'jan',\n", " 'fm',\n", " 'amconsul',\n", " 'zagreb',\n", " 'secstate',\n", " 'washdc',\n", " 'unclas',\n", " 'zagreb',\n", " 'e.o',\n", " 'tags',\n", " 'cvis',\n", " 'yo',\n", " 'subject',\n", " 'fs',\n", " 'dec',\n", " 'please',\n", " 'correct',\n", " 'fs',\n", " 'dec',\n", " 'five',\n", " 'ir',\n", " 'issued',\n", " 'instead',\n", " 'three',\n", " 'reported',\n", " 'changes',\n", " 'total',\n", " 'instead',\n", " 'kaiser',\n", " 'unclassified',\n", " 'nnn']" ] } ], "prompt_number": 28 }, { "cell_type": "code", "collapsed": false, "input": [ "#lets look at a few methods\n", "token_stream = text_streamer.token_stream() # returns a generator function which yields a stream of tokens" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 41 }, { "cell_type": "code", "collapsed": false, "input": [ "token_stream.next()[:10] # this is what our basic tokenizer returns (we are skipping stop words and numerics by default)" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 42, "text": [ "['unclassified',\n", " 'page',\n", " 'zagreb',\n", " 'action',\n", " 'scse',\n", " 'info',\n", " 'oct',\n", " 'iso',\n", " 'oct',\n", " 'fm']" ] } ], "prompt_number": 42 }, { "cell_type": "markdown", "metadata": {}, "source": [ "text_streamer.doc_id # returns a list of retrieved doc ids etc \n" ] }, { "cell_type": "code", "collapsed": false, "input": [ "#if you want to use another tokenizer it's easy\n", "import nltk\n", "nltk.word_tokenize(text)\n", "text_streamer_nltk = TextFileStreamer(text_base_path=RAW, file_type='*', \n", " tokenizer_func=nltk.word_tokenize)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 43 }, { "cell_type": "code", "collapsed": false, "input": [ "stream_nltk = text_streamer_nltk.token_stream()" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 44 }, { "cell_type": "code", "collapsed": true, "input": [ "stream_nltk.next()[:10]" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 45, "text": [ "['CONFIDENTIAL',\n", " 'PAGE',\n", " '01',\n", " 'ZANZIB',\n", " '00033',\n", " '091100Z',\n", " '44',\n", " 'ACTION',\n", " 'AF-06',\n", " 'INFO']" ] } ], "prompt_number": 45 }, { "cell_type": "markdown", "metadata": {}, "source": [ "#Vowpal Wabbit for LDA Topic Modeling\n", "* LDA = Latent Dirichlet Allocation \n", " * treats each document as a bag of words \n", " * a topic is chosen from a topic distribution $p(k)$ where $k=1, \\dots , K$\n", " * a word is chosen from the k'th topic distribution $p(w|k)$ and thrown into the bag\n", " * distrubutions $p(k)$ depends on $\\theta$ ~ $Dir(\\alpha)$ and $p(w|k)$ depends on $\\beta$ a $k\\times V$ matrix of word probabilties\n", " * these 'latent' variables are chosen to maximize the probability of producing the observed documents, and in turn depend on user chosen parameters $\\alpha$ and $\\eta$ \n", " * the model produces two important probability distributions:\n", " * $p(w|k)$, the probability of $w$ bring generated by topic $k$ and\n", " * $p(k|d)$, the probabilty of topic $k$ being used to generate a randomly chosen word from document $d$ \n", " * these topic and word weights can be used to understand the semantic structure of the documents as well as generate document feature\n", " * for more details about LDA topic modeling see the wonderful Blei, Ng, and Jordan [paper](http://www.cs.princeton.edu/~blei/papers/BleiNgJordan2003.pdf)\n", "* Vowpal Wabbit\n", " * optimized (very fast) C++ library http://hunch.net/~vw/\n", " * can find tutorials at https://github.com/JohnLangford/vowpal_wabbit/wiki/Tutorial and \n", " https://github.com/columbia-applied-data-science/rosetta/blob/master/examples/vw_helpers.md\n", " \n", "\n", "\n" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from rosetta.text import text_processors, filefilter, streamers, vw_helpers" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 46 }, { "cell_type": "code", "collapsed": false, "input": [ "#create the VW format file \n", "my_tokenizer = text_processors.TokenizerBasic()\n", "stream = streamers.TextFileStreamer(text_base_path=RAW, tokenizer=my_tokenizer)\n", "stream.to_vw(sfile_path, n_jobs=-1, raise_on_bad_id=False)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 47 }, { "cell_type": "code", "collapsed": false, "input": [ "### somewhere here run (stick with 5 passes or so...)\n", "# rm -f *cache\n", "#vw --lda 20 --cache_file doc_tokens.cache --passes 5 -p prediction.dat --readable_model topics.dat --bit_precision 16 --lda_D 975 --lda_rho 0.1 --lda_alpha 1 ../sparse/cables-short.vw\n" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "#load the sparse file \n", "formatter = text_processors.VWFormatter()\n", "sff = text_processors.SFileFilter(formatter)\n", "sff.load_sfile(sfile_path)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 48 }, { "cell_type": "code", "collapsed": false, "input": [ "#remove \"gaps\" in the sequence of numbers (ids)\n", "sff.compactify()\n", "sff.save(PROCESSED + '/sff_basic.pkl')" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Compactification done. self.bit_precision_required = 14\n", "collisions = 0, vocab_size = 14476\n", "All collisions resolved\n" ] } ], "prompt_number": 49 }, { "cell_type": "code", "collapsed": false, "input": [ "sff.to_frame().sort_index(by='doc_fraction', ascending=False).head(10)" ], "language": "python", "metadata": {}, "outputs": [ { "html": [ "
\n", " | doc_freq | \n", "token_score | \n", "doc_fraction | \n", "
---|---|---|---|
token | \n", "\n", " | \n", " | \n", " |
fm | \n", "829 | \n", "845 | \n", "0.849385 | \n", "
page | \n", "829 | \n", "1325 | \n", "0.849385 | \n", "
action | \n", "829 | \n", "907 | \n", "0.849385 | \n", "
info | \n", "829 | \n", "1432 | \n", "0.849385 | \n", "
oct | \n", "829 | \n", "973 | \n", "0.849385 | \n", "
iso | \n", "828 | \n", "852 | \n", "0.848361 | \n", "
secstate | \n", "827 | \n", "854 | \n", "0.847336 | \n", "
washdc | \n", "824 | \n", "907 | \n", "0.844262 | \n", "
nnn | \n", "819 | \n", "830 | \n", "0.839139 | \n", "
tags | \n", "782 | \n", "785 | \n", "0.801230 | \n", "
10 rows \u00d7 3 columns
\n", "\n", " | doc_freq | \n", "token_score | \n", "doc_fraction | \n", "
---|---|---|---|
token | \n", "\n", " | \n", " | \n", " |
woulb | \n", "1 | \n", "1 | \n", "0.001025 | \n", "
rescheduling | \n", "1 | \n", "1 | \n", "0.001025 | \n", "
inherent | \n", "1 | \n", "1 | \n", "0.001025 | \n", "
wise | \n", "1 | \n", "1 | \n", "0.001025 | \n", "
flags | \n", "1 | \n", "1 | \n", "0.001025 | \n", "
ekstrom | \n", "2 | \n", "2 | \n", "0.002049 | \n", "
letting | \n", "1 | \n", "1 | \n", "0.001025 | \n", "
consultation | \n", "14 | \n", "17 | \n", "0.014344 | \n", "
miles | \n", "2 | \n", "2 | \n", "0.002049 | \n", "
frontmen | \n", "1 | \n", "1 | \n", "0.001025 | \n", "
10 rows \u00d7 3 columns
\n", "