{ "metadata": { "name": "", "signature": "sha256:d63323d7469e6d30f3a88590bd7c96376919cf170af2e29a5b176398c0875059" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Next-Word Predictor Development via Amazon ElasticMapReduce (EMR)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This script takes as input a pointer to a S3 bucket containing a set of text files, and outputs a set of JSON files, each of which contains a set of prior strings (e.g. 1, 2, 3 or 4 prior words separated by spaces) as keys, pointing to arrays of the most common occurances for the next word in the sequence. The JSON files can be used in next-word prediction applications, as demonstrated in this [blog post](http://briancoffey.ca/blogpost6.html). \n", "\n", "Amazon ElasticMapReduce (EMR) is used to manage the Hadoop cluster for the calculations. Two MapReduce steps are carried out in sequence: \n", "\n", "1. The input text is mapped to a series of N-grams (filtered by a vocabulary list such that tokens (words) not on the list are overwritten with the value \"##unkn##\") and then these N-grams are aggregated by frequency\n", "2. The aggregated N-grams are then filtered and summarized such that each (N-1)-gram is coupled with an array of the top X most frequent following words\n", "\n", "These two steps are run for each value of N considered. (In the configuration below, N = [2,3,4,5] .)\n", "\n", "The following are the main configuration parameters for this script. Note that variables 'vocabSize' and 'occCutoff' are parameters that can be tuned to trade off model size and performance.\n", "\n", "Also note that AWS credentials are required, as described below, along with a previously-computed list of 1-grams, sorted by frequency, that is used to produce the vocabulary list. (Note that the list of 1-grams can be computed by using this script with the same text input folder, the variable 'Nlist' set to [1], and only using the phase_1 mapper and reducer.)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "inputfoldername = 's3://wordpredictor1/input/'\n", "\n", "bucketname = 'wordpredictor2'\n", "\n", "vocabSize = 1000\n", "occCutoff = 10\n", "numKeep = 5\n", "\n", "Nlist = [2,3,4,5]\n", "\n", "masterType = \"m3.xlarge\"\n", "workerType = \"m3.xlarge\"\n", "numWorkers = 2\n", "\n", "maxTime = 5 * 60 * 60\n", "checkTime = 30\n" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 1 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Libraries, Keys" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Libraries: boto is used for AWS interaction, Paramiko for ssh, and Pandas is only used here for nice table outputs in IPython (could thus be cut out if desired)." ] }, { "cell_type": "code", "collapsed": false, "input": [ "import sys, os, time\n", "import ast, json\n", "import pandas as pd\n", "from boto.s3.connection import S3Connection\n", "from boto.s3.key import Key\n", "from boto.emr.connection import EmrConnection\n", "from boto.emr.instance_group import InstanceGroup\n", "from boto.emr.step import StreamingStep" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 2 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Access keys and passwords: in external files, formatted as lines of \"[name] = [value]\", as per the AWS rootkey.csv download." ] }, { "cell_type": "code", "collapsed": false, "input": [ "AWSAccessKeyId, AWSSecretKey = ( line.strip().split('=')[1] for line in open('/Users/brian/rootkey.csv','r') )\n", "sshKeyName, instancePass, mysqlPass, myIP = ( line.strip().split('=')[1] for line in open('/Users/brian/passwords.csv','r') )" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 3 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Vocabulary List" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will pull our vocabulary list from the 'vocabSize' number of most frequent words in a previously-computed list of 1-grams." ] }, { "cell_type": "code", "collapsed": false, "input": [ "vocabList_df = pd.read_table(\"ngrams1.tsv\", nrows=vocabSize, names=[\"word\",\"occ\"])\n", "vocabList_df.head()" ], "language": "python", "metadata": {}, "outputs": [ { "html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
wordocc
0 ##s## 9299843
1 ##es## 9299843
2 the 4751890
3 to 2753081
4 and 2411141
\n", "
" ], "metadata": {}, "output_type": "pyout", "prompt_number": 4, "text": [ " word occ\n", "0 ##s## 9299843\n", "1 ##es## 9299843\n", "2 the 4751890\n", "3 to 2753081\n", "4 and 2411141" ] } ], "prompt_number": 4 }, { "cell_type": "code", "collapsed": false, "input": [ "vocabList = set(vocabList_df.word.tolist())" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 5 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "MapReduce Scripts" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For each N (ie. for each N-gram level), two MapReduce steps are required, hereafter referred to as 'phase_1' and 'phase_2'. The mapper and reducer python scripts (Hadoop streaming is used to allow python) are copied below - they should be saved as files in the working directory, from which they will be copied (with modifications as necessary) to S3 to be used by EMR.\n", "\n", "For local testing, note that you can test a mapper outside of Hadoop with the following:\n", "\n", " head -50 text.sample.txt > testfile\n", " cat testfile | ./phase1_mapper2.py\n", "\n", "Or test both a mapper and a reducer:\n", "\n", " cat testfile | ./phase1_mapper2.py | sort | ./reducer.py\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "phase1_mapper_template.py\n", "\n", " #!/usr/bin/python\n", " import sys, re\n", " \n", " n = $n$\n", " \n", " vocabSet = $vocabList$\n", " \n", " for line in sys.stdin:\n", " \n", " # tokenize string\n", " s = re.sub(\"[.!?;]+\", \" ##s## \", line)\n", " s = \"##s## \" + s\n", " s = re.sub(\"##s##[\\s]+##s##\", \"\", s)\n", " regex = \"[^\\s,:=<>/\\\\)\\\\(\\\"]+\"\n", " tokens = re.findall(regex, s.lower())\n", " \n", " # replace non-vocab tokens with \"##unkn##\"\n", " for i,t in enumerate(tokens):\n", " if not t in vocabSet:\n", " tokens[i] = \"##unkn##\"\n", " \n", " # find n-grams\n", " ngrams = []\n", " for i in range(n-1,len(tokens)):\n", " new = \"\"\n", " for j in range(n-1,-1,-1):\n", " new += tokens[i-j] + \" \"\n", " ngrams.append(new.strip())\n", " \n", " # output n-grams\n", " for l in ngrams:\n", " print \"{0}\\t{1}\".format(str(l).strip(), 1)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "phase1_reducer.py\n", "\n", " #!/usr/bin/python\n", " import sys\n", " \n", " oldKey = None\n", " s = 0\n", " \n", " for line in sys.stdin:\n", " data_mapped = line.strip().split(\"\\t\")\n", " if len(data_mapped) != 2:\n", " continue\n", " \n", " thisKey, thisVal = data_mapped\n", " \n", " if oldKey and oldKey != thisKey:\n", " print str(oldKey).strip(), \"\\t\", str(s)\n", " oldKey = thisKey\n", " s = 0\n", " \n", " oldKey = thisKey\n", " s += int(thisVal)\n", " \n", " if oldKey != None:\n", " print str(oldKey).strip(), \"\\t\", str(s)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "phase2_mapper_template.py\n", "\n", " #!/usr/bin/python\n", " import sys, re\n", " \n", " n = $n$\n", " occCutoff = $occCutoff$\n", " \n", " for line in sys.stdin:\n", " \n", " l = line.strip().split()\n", " if len(l) != n + 1:\n", " continue\n", " \n", " # filter lines to only include those with more than occCutoff occurances\n", " if int(l[n]) > occCutoff and not \"#\" in l[n-1]:\n", " \n", " # format output to be tab delimited as : preface - word - occurances\n", " preface = l[0]\n", " for i in range(1,n-1):\n", " preface += \" \" + l[i]\n", " print preface + \"\\t\" + l[n-1] + \"\\t\" + l[n]\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "phase2_reducer_template.py\n", "\n", " #!/usr/bin/python\n", " import sys\n", " \n", " numKeep = $numKeep$\n", " \n", " oldKey = None\n", " topX = []\n", " sum = 0\n", " \n", " for line in sys.stdin:\n", " data_mapped = line.strip().split(\"\\t\")\n", " if len(data_mapped) != 3:\n", " continue\n", " \n", " thisKey, word, occ = data_mapped\n", " \n", " if oldKey and oldKey != thisKey:\n", " print str(oldKey).strip() + \"\\t\" + str(sum) + \"\\t\" + str(topX)\n", " topX = []\n", " sum = 0\n", " \n", " topX.append((word,int(occ)))\n", " if len(topX) > numKeep:\n", " topX = sorted(topX,key=lambda x: x[1],reverse=True)\n", " o = topX.pop()\n", " \n", " oldKey = thisKey\n", " sum += int(occ)\n", " \n", " if oldKey != None:\n", " print str(oldKey).strip() + \"\\t\" + str(sum) + \"\\t\" + str(topX)\n" ] }, { "cell_type": "code", "collapsed": false, "input": [ "phase1_mapper_template_file = 'phase1_mapper_template.py'\n", "phase1_reducer_file = 'phase1_reducer.py'\n", "phase2_mapper_template_file = 'phase2_mapper_template.py'\n", "phase2_reducer_template_file = 'phase2_reducer_template.py'" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 6 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Setting up S3 Folders and Files" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this section, we set up the necessary files and folders on S3 that will be referenced and used by EMR in the next section. (General notes on the use of boto for S3 can be found [here](http://boto.readthedocs.org/en/latest/s3_tut.html).)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "s3_conn = S3Connection(AWSAccessKeyId, AWSSecretKey)\n", "bucket = s3_conn.get_bucket(bucketname)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 7 }, { "cell_type": "code", "collapsed": false, "input": [ "# add mappers directory to local if it does not exist\n", "if not os.path.exists('mappers'):\n", " os.makedirs('mappers')\n", "\n", "# write a separate phase1 mapper function for each value of n\n", "for n in Nlist:\n", " mf = \"mappers/phase1_mapper\" + str(n) + \".py\"\n", " with open(mf,'w') as mff:\n", " with open(phase1_mapper_template_file,'r') as mtf:\n", " mff.write(mtf.read().replace(\"$n$\", str(n)).replace(\"$vocabList$\", str(vocabList)))\n", " k = bucket.new_key(\"phase1_mapper\" + str(n) + \".py\")\n", " o = k.set_contents_from_filename(mf)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 8 }, { "cell_type": "code", "collapsed": false, "input": [ "k = bucket.new_key('phase1_reducer.py')\n", "o = k.set_contents_from_filename(phase1_reducer_file)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 9 }, { "cell_type": "code", "collapsed": false, "input": [ "# write a separate phase2 mapper function for each value of n\n", "for n in Nlist:\n", " mf = \"mappers/phase2_mapper\" + str(n) + \".py\"\n", " with open(mf,'w') as mff:\n", " with open(phase2_mapper_template_file,'r') as mtf:\n", " mff.write(mtf.read().replace(\"$n$\", str(n)).replace(\"$occCutoff$\", str(occCutoff)))\n", " k = bucket.new_key(\"phase2_mapper\" + str(n) + \".py\")\n", " o = k.set_contents_from_filename(mf)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 10 }, { "cell_type": "code", "collapsed": false, "input": [ "with open(\"phase2_reducer.py\",'w') as mff:\n", " with open(phase2_reducer_template_file,'r') as mtf:\n", " mff.write(mtf.read().replace(\"$numKeep$\", str(numKeep)))\n", "k = bucket.new_key(\"phase2_reducer.py\")\n", "o = k.set_contents_from_filename(\"phase2_reducer.py\")" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 11 }, { "cell_type": "code", "collapsed": false, "input": [ "k = bucket.new_key(\"phase1_output/\")\n", "o = k.set_contents_from_string('')\n", "k = bucket.new_key(\"phase2_output/\")\n", "o = k.set_contents_from_string('')" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 12 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Connecting to EMR, Configuring Jobs, Launching Cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This section configures and launches the computations on EMR. (See notes on using boto to [configure jobs](http://boto.readthedocs.org/en/latest/emr_tut.html) and [connecting to EMR and launching jobs](http://stackoverflow.com/questions/26314316/how-to-launch-and-configure-an-emr-cluster-using-boto).)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "emr_conn = EmrConnection(AWSAccessKeyId, AWSSecretKey)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 13 }, { "cell_type": "code", "collapsed": false, "input": [ "instance_groups = []\n", "instance_groups.append(InstanceGroup(\n", " num_instances = 1,\n", " role = \"MASTER\",\n", " type = masterType,\n", " market = \"ON_DEMAND\",\n", " name = \"Main node\"))\n", "instance_groups.append(InstanceGroup(\n", " num_instances = numWorkers,\n", " role = \"CORE\",\n", " type = workerType,\n", " market = \"ON_DEMAND\",\n", " name = \"Worker nodes\"))" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 14 }, { "cell_type": "code", "collapsed": false, "input": [ "steps = []\n", "for n in Nlist:\n", " steps.append( StreamingStep(\n", " name = \"phase1_\" + str(n),\n", " mapper = \"s3://\" + bucketname + \"/phase1_mapper\" + str(n) + \".py\",\n", " combiner = \"s3://\" + bucketname + \"/phase1_reducer.py\",\n", " reducer = \"s3://\" + bucketname + \"/phase1_reducer.py\",\n", " input = inputfoldername,\n", " output = \"s3://\" + bucketname + \"/phase1_output/n\" + str(n) + \"/\") )\n", " steps.append( StreamingStep(\n", " name = \"phase2_\" + str(n),\n", " mapper = \"s3://\" + bucketname + \"/phase2_mapper\" + str(n) + \".py\",\n", " reducer = \"s3://\" + bucketname + \"/phase2_reducer.py\",\n", " input = \"s3://\" + bucketname + \"/phase1_output/n\" + str(n) + \"/\",\n", " output = \"s3://\" + bucketname + \"/phase2_output/n\" + str(n) + \"/\") )" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 15 }, { "cell_type": "code", "collapsed": false, "input": [ "cluster_id = emr_conn.run_jobflow(\n", " name = \"ngramcalc\",\n", " instance_groups = instance_groups,\n", " log_uri = \"s3://\" + bucketname + \"/logs/\",\n", " steps = steps,\n", " ec2_keyname = sshKeyName,\n", " ami_version = \"latest\")" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 16 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Periodically Checking for Job Completion" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following periodically checks the status of the EMR job and waits for completion or failure before moving on. Note that with the current configuration of this script, the EMR job requires about 39 minutes." ] }, { "cell_type": "code", "collapsed": false, "input": [ "count = 0\n", "current_state = \"\"\n", "while count < maxTime:\n", " time.sleep(checkTime)\n", " job_desc = emr_conn.describe_jobflow(cluster_id)\n", " if job_desc.state != current_state:\n", " current_state = job_desc.state\n", " print current_state\n", " if current_state == 'COMPLETED' or current_state == 'FAILED':\n", " count = maxTime\n", " else:\n", " count += checkTime" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "STARTING\n", "BOOTSTRAPPING" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n", "RUNNING" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n", "SHUTTING_DOWN" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n", "COMPLETED" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n" ] } ], "prompt_number": 17 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Reading Results from S3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The results are then downloaded from S3." ] }, { "cell_type": "code", "collapsed": false, "input": [ "# make output directory if not already there\n", "if not os.path.exists('output'):\n", " os.makedirs('output')\n", "\n", "# download all of the results to that directory\n", "for n in Nlist:\n", " outfilelist = bucket.list(\"phase2_output/n\" + str(n) + \"/\")\n", " for key in outfilelist:\n", " key.get_contents_to_filename(\"output/\" + key.name.replace(\"/\",\".\"))" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 18 }, { "cell_type": "markdown", "metadata": {}, "source": [ "And combined to produce a single file for each Ngram level." ] }, { "cell_type": "code", "collapsed": false, "input": [ "filenames = os.listdir('output')\n", "for n in Nlist:\n", " catlist = \"cat\"\n", " for f in filenames:\n", " if (\"output.n\" + str(n) + \".part\") in f:\n", " catlist += \" output/\" + f\n", " catlist += \" > ngrams\" + str(n) + \".tsv\"\n", " os.system(catlist)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 19 }, { "cell_type": "code", "collapsed": false, "input": [ "pd.read_table(\"ngrams\" + str(Nlist[0]) + \".tsv\", nrows=10, names=[\"preface\",\"sum\",\"output\"])" ], "language": "python", "metadata": {}, "outputs": [ { "html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
prefacesumoutput
0 ' 5612 [('i', 656), ('and', 633), ('he', 464), ('said...
1 -- 46879 [('and', 4705), ('the', 4027), ('a', 3080), ('...
2 000 20160 [('in', 2426), ('to', 1862), ('people', 1619),...
3 16 5032 [('years', 464), ('and', 431), ('percent', 278...
4 2010 6638 [('and', 1028), ('the', 512), ('to', 334), ('w...
5 30 21261 [('p', 4837), ('a', 2476), ('minutes', 2093), ...
6 5 26158 [('million', 2562), ('percent', 2303), ('minut...
7 according 24553 [('a', 13), ('to', 24479), ('the', 61)]
8 added 10294 [('to', 1834), ('a', 1600), ('that', 1352), ('...
9 alone 6199 [('in', 996), ('and', 620), ('with', 370), ('i...
\n", "
" ], "metadata": {}, "output_type": "pyout", "prompt_number": 20, "text": [ " preface sum output\n", "0 ' 5612 [('i', 656), ('and', 633), ('he', 464), ('said...\n", "1 -- 46879 [('and', 4705), ('the', 4027), ('a', 3080), ('...\n", "2 000 20160 [('in', 2426), ('to', 1862), ('people', 1619),...\n", "3 16 5032 [('years', 464), ('and', 431), ('percent', 278...\n", "4 2010 6638 [('and', 1028), ('the', 512), ('to', 334), ('w...\n", "5 30 21261 [('p', 4837), ('a', 2476), ('minutes', 2093), ...\n", "6 5 26158 [('million', 2562), ('percent', 2303), ('minut...\n", "7 according 24553 [('a', 13), ('to', 24479), ('the', 61)]\n", "8 added 10294 [('to', 1834), ('a', 1600), ('that', 1352), ('...\n", "9 alone 6199 [('in', 996), ('and', 620), ('with', 370), ('i..." ] } ], "prompt_number": 20 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Exporting to JSON" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The results are then output to JSON for use in this [blog post](http://briancoffey.ca/blogpost6.html) or in similar applications." ] }, { "cell_type": "code", "collapsed": false, "input": [ "for n in Nlist:\n", " outobj = {}\n", " with open(\"ngrams\" + str(n) + \".tsv\",\"r\") as f:\n", " for line in f:\n", " outobj[line.split(\"\\t\")[0]] = [ast.literal_eval(line.split(\"\\t\")[2].strip()), int(line.split(\"\\t\")[1].strip())]\n", " with open(\"ngrams\" + str(n) + \".json\",\"w\") as f:\n", " f.write(json.dumps(outobj))" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 23 } ], "metadata": {} } ] }