" ], "metadata": {}, "output_type": "pyout", "prompt_number": 4, "text": [ " word occ\n", "0 ##s## 9299843\n", "1 ##es## 9299843\n", "2 the 4751890\n", "3 to 2753081\n", "4 and 2411141" ] } ], "prompt_number": 4 }, { "cell_type": "code", "collapsed": false, "input": [ "vocabList = set(vocabList_df.word.tolist())" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 5 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "MapReduce Scripts" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For each N (ie. for each N-gram level), two MapReduce steps are required, hereafter referred to as 'phase_1' and 'phase_2'. The mapper and reducer python scripts (Hadoop streaming is used to allow python) are copied below - they should be saved as files in the working directory, from which they will be copied (with modifications as necessary) to S3 to be used by EMR.\n", "\n", "For local testing, note that you can test a mapper outside of Hadoop with the following:\n", "\n", " head -50 text.sample.txt > testfile\n", " cat testfile | ./phase1_mapper2.py\n", "\n", "Or test both a mapper and a reducer:\n", "\n", " cat testfile | ./phase1_mapper2.py | sort | ./reducer.py\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "phase1_mapper_template.py\n", "\n", " #!/usr/bin/python\n", " import sys, re\n", " \n", " n = $n$\n", " \n", " vocabSet = $vocabList$\n", " \n", " for line in sys.stdin:\n", " \n", " # tokenize string\n", " s = re.sub(\"[.!?;]+\", \" ##s## \", line)\n", " s = \"##s## \" + s\n", " s = re.sub(\"##s##[\\s]+##s##\", \"\", s)\n", " regex = \"[^\\s,:=<>/\\\\)\\\\(\\\"]+\"\n", " tokens = re.findall(regex, s.lower())\n", " \n", " # replace non-vocab tokens with \"##unkn##\"\n", " for i,t in enumerate(tokens):\n", " if not t in vocabSet:\n", " tokens[i] = \"##unkn##\"\n", " \n", " # find n-grams\n", " ngrams = []\n", " for i in range(n-1,len(tokens)):\n", " new = \"\"\n", " for j in range(n-1,-1,-1):\n", " new += tokens[i-j] + \" \"\n", " ngrams.append(new.strip())\n", " \n", " # output n-grams\n", " for l in ngrams:\n", " print \"{0}\\t{1}\".format(str(l).strip(), 1)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "phase1_reducer.py\n", "\n", " #!/usr/bin/python\n", " import sys\n", " \n", " oldKey = None\n", " s = 0\n", " \n", " for line in sys.stdin:\n", " data_mapped = line.strip().split(\"\\t\")\n", " if len(data_mapped) != 2:\n", " continue\n", " \n", " thisKey, thisVal = data_mapped\n", " \n", " if oldKey and oldKey != thisKey:\n", " print str(oldKey).strip(), \"\\t\", str(s)\n", " oldKey = thisKey\n", " s = 0\n", " \n", " oldKey = thisKey\n", " s += int(thisVal)\n", " \n", " if oldKey != None:\n", " print str(oldKey).strip(), \"\\t\", str(s)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "phase2_mapper_template.py\n", "\n", " #!/usr/bin/python\n", " import sys, re\n", " \n", " n = $n$\n", " occCutoff = $occCutoff$\n", " \n", " for line in sys.stdin:\n", " \n", " l = line.strip().split()\n", " if len(l) != n + 1:\n", " continue\n", " \n", " # filter lines to only include those with more than occCutoff occurances\n", " if int(l[n]) > occCutoff and not \"#\" in l[n-1]:\n", " \n", " # format output to be tab delimited as : preface - word - occurances\n", " preface = l[0]\n", " for i in range(1,n-1):\n", " preface += \" \" + l[i]\n", " print preface + \"\\t\" + l[n-1] + \"\\t\" + l[n]\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "phase2_reducer_template.py\n", "\n", " #!/usr/bin/python\n", " import sys\n", " \n", " numKeep = $numKeep$\n", " \n", " oldKey = None\n", " topX = []\n", " sum = 0\n", " \n", " for line in sys.stdin:\n", " data_mapped = line.strip().split(\"\\t\")\n", " if len(data_mapped) != 3:\n", " continue\n", " \n", " thisKey, word, occ = data_mapped\n", " \n", " if oldKey and oldKey != thisKey:\n", " print str(oldKey).strip() + \"\\t\" + str(sum) + \"\\t\" + str(topX)\n", " topX = []\n", " sum = 0\n", " \n", " topX.append((word,int(occ)))\n", " if len(topX) > numKeep:\n", " topX = sorted(topX,key=lambda x: x[1],reverse=True)\n", " o = topX.pop()\n", " \n", " oldKey = thisKey\n", " sum += int(occ)\n", " \n", " if oldKey != None:\n", " print str(oldKey).strip() + \"\\t\" + str(sum) + \"\\t\" + str(topX)\n" ] }, { "cell_type": "code", "collapsed": false, "input": [ "phase1_mapper_template_file = 'phase1_mapper_template.py'\n", "phase1_reducer_file = 'phase1_reducer.py'\n", "phase2_mapper_template_file = 'phase2_mapper_template.py'\n", "phase2_reducer_template_file = 'phase2_reducer_template.py'" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 6 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Setting up S3 Folders and Files" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this section, we set up the necessary files and folders on S3 that will be referenced and used by EMR in the next section. (General notes on the use of boto for S3 can be found [here](http://boto.readthedocs.org/en/latest/s3_tut.html).)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "s3_conn = S3Connection(AWSAccessKeyId, AWSSecretKey)\n", "bucket = s3_conn.get_bucket(bucketname)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 7 }, { "cell_type": "code", "collapsed": false, "input": [ "# add mappers directory to local if it does not exist\n", "if not os.path.exists('mappers'):\n", " os.makedirs('mappers')\n", "\n", "# write a separate phase1 mapper function for each value of n\n", "for n in Nlist:\n", " mf = \"mappers/phase1_mapper\" + str(n) + \".py\"\n", " with open(mf,'w') as mff:\n", " with open(phase1_mapper_template_file,'r') as mtf:\n", " mff.write(mtf.read().replace(\"$n$\", str(n)).replace(\"$vocabList$\", str(vocabList)))\n", " k = bucket.new_key(\"phase1_mapper\" + str(n) + \".py\")\n", " o = k.set_contents_from_filename(mf)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 8 }, { "cell_type": "code", "collapsed": false, "input": [ "k = bucket.new_key('phase1_reducer.py')\n", "o = k.set_contents_from_filename(phase1_reducer_file)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 9 }, { "cell_type": "code", "collapsed": false, "input": [ "# write a separate phase2 mapper function for each value of n\n", "for n in Nlist:\n", " mf = \"mappers/phase2_mapper\" + str(n) + \".py\"\n", " with open(mf,'w') as mff:\n", " with open(phase2_mapper_template_file,'r') as mtf:\n", " mff.write(mtf.read().replace(\"$n$\", str(n)).replace(\"$occCutoff$\", str(occCutoff)))\n", " k = bucket.new_key(\"phase2_mapper\" + str(n) + \".py\")\n", " o = k.set_contents_from_filename(mf)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 10 }, { "cell_type": "code", "collapsed": false, "input": [ "with open(\"phase2_reducer.py\",'w') as mff:\n", " with open(phase2_reducer_template_file,'r') as mtf:\n", " mff.write(mtf.read().replace(\"$numKeep$\", str(numKeep)))\n", "k = bucket.new_key(\"phase2_reducer.py\")\n", "o = k.set_contents_from_filename(\"phase2_reducer.py\")" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 11 }, { "cell_type": "code", "collapsed": false, "input": [ "k = bucket.new_key(\"phase1_output/\")\n", "o = k.set_contents_from_string('')\n", "k = bucket.new_key(\"phase2_output/\")\n", "o = k.set_contents_from_string('')" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 12 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Connecting to EMR, Configuring Jobs, Launching Cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This section configures and launches the computations on EMR. (See notes on using boto to [configure jobs](http://boto.readthedocs.org/en/latest/emr_tut.html) and [connecting to EMR and launching jobs](http://stackoverflow.com/questions/26314316/how-to-launch-and-configure-an-emr-cluster-using-boto).)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "emr_conn = EmrConnection(AWSAccessKeyId, AWSSecretKey)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 13 }, { "cell_type": "code", "collapsed": false, "input": [ "instance_groups = []\n", "instance_groups.append(InstanceGroup(\n", " num_instances = 1,\n", " role = \"MASTER\",\n", " type = masterType,\n", " market = \"ON_DEMAND\",\n", " name = \"Main node\"))\n", "instance_groups.append(InstanceGroup(\n", " num_instances = numWorkers,\n", " role = \"CORE\",\n", " type = workerType,\n", " market = \"ON_DEMAND\",\n", " name = \"Worker nodes\"))" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 14 }, { "cell_type": "code", "collapsed": false, "input": [ "steps = []\n", "for n in Nlist:\n", " steps.append( StreamingStep(\n", " name = \"phase1_\" + str(n),\n", " mapper = \"s3://\" + bucketname + \"/phase1_mapper\" + str(n) + \".py\",\n", " combiner = \"s3://\" + bucketname + \"/phase1_reducer.py\",\n", " reducer = \"s3://\" + bucketname + \"/phase1_reducer.py\",\n", " input = inputfoldername,\n", " output = \"s3://\" + bucketname + \"/phase1_output/n\" + str(n) + \"/\") )\n", " steps.append( StreamingStep(\n", " name = \"phase2_\" + str(n),\n", " mapper = \"s3://\" + bucketname + \"/phase2_mapper\" + str(n) + \".py\",\n", " reducer = \"s3://\" + bucketname + \"/phase2_reducer.py\",\n", " input = \"s3://\" + bucketname + \"/phase1_output/n\" + str(n) + \"/\",\n", " output = \"s3://\" + bucketname + \"/phase2_output/n\" + str(n) + \"/\") )" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 15 }, { "cell_type": "code", "collapsed": false, "input": [ "cluster_id = emr_conn.run_jobflow(\n", " name = \"ngramcalc\",\n", " instance_groups = instance_groups,\n", " log_uri = \"s3://\" + bucketname + \"/logs/\",\n", " steps = steps,\n", " ec2_keyname = sshKeyName,\n", " ami_version = \"latest\")" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 16 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Periodically Checking for Job Completion" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following periodically checks the status of the EMR job and waits for completion or failure before moving on. Note that with the current configuration of this script, the EMR job requires about 39 minutes." ] }, { "cell_type": "code", "collapsed": false, "input": [ "count = 0\n", "current_state = \"\"\n", "while count < maxTime:\n", " time.sleep(checkTime)\n", " job_desc = emr_conn.describe_jobflow(cluster_id)\n", " if job_desc.state != current_state:\n", " current_state = job_desc.state\n", " print current_state\n", " if current_state == 'COMPLETED' or current_state == 'FAILED':\n", " count = maxTime\n", " else:\n", " count += checkTime" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "STARTING\n", "BOOTSTRAPPING" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n", "RUNNING" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n", "SHUTTING_DOWN" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n", "COMPLETED" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n" ] } ], "prompt_number": 17 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Reading Results from S3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The results are then downloaded from S3." ] }, { "cell_type": "code", "collapsed": false, "input": [ "# make output directory if not already there\n", "if not os.path.exists('output'):\n", " os.makedirs('output')\n", "\n", "# download all of the results to that directory\n", "for n in Nlist:\n", " outfilelist = bucket.list(\"phase2_output/n\" + str(n) + \"/\")\n", " for key in outfilelist:\n", " key.get_contents_to_filename(\"output/\" + key.name.replace(\"/\",\".\"))" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 18 }, { "cell_type": "markdown", "metadata": {}, "source": [ "And combined to produce a single file for each Ngram level." ] }, { "cell_type": "code", "collapsed": false, "input": [ "filenames = os.listdir('output')\n", "for n in Nlist:\n", " catlist = \"cat\"\n", " for f in filenames:\n", " if (\"output.n\" + str(n) + \".part\") in f:\n", " catlist += \" output/\" + f\n", " catlist += \" > ngrams\" + str(n) + \".tsv\"\n", " os.system(catlist)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 19 }, { "cell_type": "code", "collapsed": false, "input": [ "pd.read_table(\"ngrams\" + str(Nlist[0]) + \".tsv\", nrows=10, names=[\"preface\",\"sum\",\"output\"])" ], "language": "python", "metadata": {}, "outputs": [ { "html": [ "
" ], "metadata": {}, "output_type": "pyout", "prompt_number": 20, "text": [ " preface sum output\n", "0 ' 5612 [('i', 656), ('and', 633), ('he', 464), ('said...\n", "1 -- 46879 [('and', 4705), ('the', 4027), ('a', 3080), ('...\n", "2 000 20160 [('in', 2426), ('to', 1862), ('people', 1619),...\n", "3 16 5032 [('years', 464), ('and', 431), ('percent', 278...\n", "4 2010 6638 [('and', 1028), ('the', 512), ('to', 334), ('w...\n", "5 30 21261 [('p', 4837), ('a', 2476), ('minutes', 2093), ...\n", "6 5 26158 [('million', 2562), ('percent', 2303), ('minut...\n", "7 according 24553 [('a', 13), ('to', 24479), ('the', 61)]\n", "8 added 10294 [('to', 1834), ('a', 1600), ('that', 1352), ('...\n", "9 alone 6199 [('in', 996), ('and', 620), ('with', 370), ('i..." ] } ], "prompt_number": 20 }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Exporting to JSON" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The results are then output to JSON for use in this [blog post](http://briancoffey.ca/blogpost6.html) or in similar applications." ] }, { "cell_type": "code", "collapsed": false, "input": [ "for n in Nlist:\n", " outobj = {}\n", " with open(\"ngrams\" + str(n) + \".tsv\",\"r\") as f:\n", " for line in f:\n", " outobj[line.split(\"\\t\")[0]] = [ast.literal_eval(line.split(\"\\t\")[2].strip()), int(line.split(\"\\t\")[1].strip())]\n", " with open(\"ngrams\" + str(n) + \".json\",\"w\") as f:\n", " f.write(json.dumps(outobj))" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 23 } ], "metadata": {} } ] }