{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 1.3 RDD Text Processing\n", "\n", "This notebook demonstrates some of the more advanced uses of Spark RDD API for text processing.\n", "\n", "We will analyze the text of 'The Art of War' by Sun Tzu trying to find words that the best characterise each of its chapters.\n", "\n", "Let's have look at the data first (`data/artwar.1b.txt` file):" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "BEGINNING:\n", "\n", "Translated by Lionel Giles\n", "\n", "I. Laying Plans\n", "\n", "1. Sun Tzu said: The art of war is of vital importance to the State.\n", "\n", "2. It is a matter of life and death, a road either to safety or to\n", "ruin. Hence it is a subject of inquiry which can on no account be\n", "neglected. \n", "\n", "ENDING:\n", "\n", "27. Hence it is only the enlightened ruler and the wise general who\n", "will use the highest intelligence of the army for purposes of spying\n", "and thereby they achieve great results. Spies are a most important\n", "element in water, because on them depends an army's ability to move.\n", "\n", "THE END\n", "\n", "----------------------------------------------------------------------\n", "\n", "Copyright statement:\n", "The Internet Classics Archive by Daniel C. Stevenson, Web Atomics.\n" ] } ], "source": [ "%%sh\n", "\n", "echo \"BEGINNING:\"\n", "head -n 22 'data/artwar.1b.txt' | tail -n 10\n", "echo \n", "echo \"ENDING:\"\n", "tail -n 20 'data/artwar.1b.txt' | head -n 12" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It's a text file with some intro content at the beginnig, the chapters are numbered with Roman numbers the paragraphs with Arabic numbers and there is the 'THE END' text as the end of the last chapter. " ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "Out first task is to split the document by chapters. \n", "\n", "As the first step let's find the starting line of each of the chapters:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(u'I. Laying Plans', 15),\n", " (u'II. Waging War', 106),\n", " (u'III. Attack by Stratagem', 184),\n", " (u'IV. Tactical Dispositions', 266),\n", " (u'V. Energy', 337),\n", " (u'VI. Weak Points and Strong', 426),\n", " (u'VII. Maneuvering', 562),\n", " (u'VIII. Variation in Tactics', 692),\n", " (u'IX. The Army on the March', 748),\n", " (u'X. Terrain', 908),\n", " (u'XI. The Nine Situations', 1033),\n", " (u'XII. The Attack by Fire', 1287),\n", " (u'XIII. The Use of Spies', 1365),\n", " (u'THE END', 1467)]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import re\n", "\n", "# returns True if the `line` is the heading (starts with a Roman number followed by the dot)\n", "# or the \"THE END\" marker\n", "def is_heading(line):\n", " return re.match(r'^[IVX]+\\..*', line) or line.startswith(\"THE END\")\n", "\n", "\n", "# create an RDD with the lines of the text\n", "aowTextRDD = sc.textFile('data/artwar.1b.txt')\n", "\n", "# number each line using `zipWithIndex`\n", "# and select the heading lines togehter with their indexes\n", "\n", "chaptersAndBeginnings = aowTextRDD.zipWithIndex() \\\n", " .filter(lambda (line, index): is_heading(line)).collect()\n", " \n", "display(chaptersAndBeginnings)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of chapters: 13\n", "Chapter titles: [u'I. Laying Plans', u'II. Waging War', u'III. Attack by Stratagem', u'IV. Tactical Dispositions', u'V. Energy', u'VI. Weak Points and Strong', u'VII. Maneuvering', u'VIII. Variation in Tactics', u'IX. The Army on the March', u'X. Terrain', u'XI. The Nine Situations', u'XII. The Attack by Fire', u'XIII. The Use of Spies']\n", "Chapter start lines: [15, 106, 184, 266, 337, 426, 562, 692, 748, 908, 1033, 1287, 1365, 1467] \n" ] } ], "source": [ "# let's create some useful intermediate variables\n", "\n", "chapterBeginnings = [ i for (title,i) in chaptersAndBeginnings]\n", "chapterTitles = [ title for (title,i) in chaptersAndBeginnings[0:-1]]\n", "noOfChapters = len(chapterTitles)\n", "\n", "print(\"Number of chapters: %s\" % noOfChapters)\n", "print(\"Chapter titles: %s\" % chapterTitles)\n", "print(\"Chapter start lines: %s \" % chapterBeginnings)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's assign each line with its chapter number and filter out the lines before the first and after the last chapter. \n", "\n", "We will use the python `bisect.bisect()` function, which find the insertion point (index) in a sorted list to keep it sorted. \n", "\n", "In out case the sorted list is `chapterBeginnings` from above, and `bisect` with a line numer will return the 1-based chapter number the line belongs to; 0 for the lines before the first chapter and 14 for the lines after the last one." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(0, u'1. Sun Tzu said: The art of war is of vital importance to the State.'),\n", " (0, u'2. It is a matter of life and death, a road either to safety or to'),\n", " (0, u'ruin. Hence it is a subject of inquiry which can on no account be'),\n", " (0, u'neglected. '),\n", " (0, u'3. The art of war, then, is governed by five constant factors, to')]" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import bisect\n", "\n", "# we can use sc.broadcast() to publish a large object to all executors and improve the performance\n", "\n", "chapterBeginningsBR = sc.broadcast([ i for (title, i) in chaptersAndBeginnings])\n", "\n", "# to refer to the value of the broadcasted variable in the computation use the `value` property\n", "\n", "linesWithChapterNoRDD = aowTextRDD.zipWithIndex() \\\n", " .filter(lambda (line,index): line and not is_heading(line)) \\\n", " .map(lambda (line, index): (bisect.bisect_left(chapterBeginningsBR.value, index) - 1, line)) \\\n", " .filter(lambda (chapterNo, line): chapterNo >= 0 and chapterNo < noOfChapters) \n", " \n", "linesWithChapterNoRDD.take(5) " ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Result at dict: [(0, [(u'the', 35), (u'to', 20), (u'and', 19), (u'of', 19), (u'is', 17), (u'be', 12), (u'in', 11), (u'are', 9), (u'a', 8), (u'which', 8)]), (1, [(u'the', 38), (u'of', 26), (u'and', 18), (u'be', 17), (u'to', 16), (u'a', 12), (u'will', 12), (u'in', 11), (u'is', 11), (u'your', 7)])]\n" ] } ], "source": [ "import operator\n", "from itertools import islice\n", "\n", "# find the top 10 words by count in each chapter\n", "\n", "topFrequentWordsByChapterRDD = linesWithChapterNoRDD\\\n", " .flatMap(lambda (chapterNo, line): ((chapterNo, word) for word in re.split('[^a-z\\']+', line.lower()) if word)) \\\n", " .map(lambda chapterNoAndWord: (chapterNoAndWord, 1)) \\\n", " .reduceByKey(operator.add) \\\n", " .map(lambda ((chapterNo, word), count): (chapterNo, (word, count))) \\\n", " .sortBy(lambda (chapterNo, (word, count)): count, False) \\\n", " .groupByKey() \\\n", " .map(lambda (chapterNo, wordAndCountIterator): (chapterNo, list(islice(wordAndCountIterator, 10))))\n", "\n", "\n", "# `collectAsMap` can be used to collect a pair RDD to a python `dict`\n", "\n", "topFrequentWordsByChapter = topFrequentWordsByChapterRDD.collectAsMap()\n", "print(\"Result at dict: %s\" % topFrequentWordsByChapter.items()[0:2])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And now we can format it nicely:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Top frequent words by chapter:\n", "I. Laying Plans: the to and of is be in are a which\n", "II. Waging War: the of and be to a will in is your\n", "III. Attack by Stratagem: the to is of in will and army if a\n", "IV. Tactical Dispositions: the of to is a in and victory enemy defeat\n", "V. Energy: the of a to and is in be more it\n", "VI. Weak Points and Strong: the to be and in of he his is can\n", "VII. Maneuvering: the of and to is a in an be army\n", "VIII. Variation in Tactics: the of to be in not which his must and\n", "IX. The Army on the March: the to and is of a in are if that\n", "X. Terrain: the to and is a of are if you in\n", "XI. The Nine Situations: the of to and ground is a on in be\n", "XII. The Attack by Fire: the to is of an fire a in not be\n", "XIII. The Use of Spies: the of and to spies be is a in it\n" ] } ], "source": [ "print(\"Top frequent words by chapter:\")\n", "for chapterNo, title in enumerate(chapterTitles):\n", " print(\"%s: %s\" % (title, \" \".join([ word for word,_ in topFrequentWordsByChapter[chapterNo] ]) ))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Unfortunatelly the most frequent words in each chapter do not provide good summary because many of them are common in all chapters. \n", "\n", "To address this issue we can look at ways to take into account the information of how specific each word is to each chapter.\n", "\n", "One of the common approaches in text analysis is to use the statistics of *td-idf* ([Term Frequency - Inverse Document Frequency](https://en.wikipedia.org/wiki/Tf%E2%80%93idf))\n", "\n", "In this approach the term frequency in the document is weighted with the `log` of inverse of document frequency.\n", "\n", "For example in our case:\n", "\n", " * a word that appears in all 13 chapters will have IDF = log(13/13) = 0 \n", " * a word that appears only in one chapter will have the IDF log(13/1) = ~2.56\n", "\n", "Let's see if this approach works:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(u'army', 13),\n", " (u'said', 13),\n", " (u'tzu', 13),\n", " (u'is', 13),\n", " (u'in', 13),\n", " (u'if', 13),\n", " (u'the', 13),\n", " (u'for', 13),\n", " (u'may', 13),\n", " (u'that', 13)]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# calculate the chapter counts for each word\n", "chapterCounts = linesWithChapterNoRDD \\\n", " .flatMap(lambda (chapterNo, line): ((word, chapterNo) for word in re.split('[^a-z\\']+', line.lower()) if word)) \\\n", " .distinct() \\\n", " .countByKey()\n", "\n", "sorted(chapterCounts.items(), key=lambda kv:kv[1], reverse=True)[0:10]" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(u'secondly', 2.5649493574615367),\n", " (u'dynasty', 2.5649493574615367),\n", " (u'rapid', 2.5649493574615367),\n", " (u'woods', 2.5649493574615367),\n", " (u'aides', 2.5649493574615367)]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from math import log\n", "\n", "# calculat (in python) the IDF\n", "\n", "idf = dict( (w, log(float(noOfChapters)/c)) for w,c in chapterCounts.items())\n", "sorted(idf.items(), key=lambda kv:kv[1], reverse=True)[0:5]" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# find the top 10 words by IF-IDF in each chapter\n", "idfBR = sc.broadcast(idf)\n", "\n", "topIDIDFWordsByChapter = linesWithChapterNoRDD\\\n", " .flatMap(lambda (chapterNo,line): ((chapterNo, word) for word in re.split('[^a-z\\']+', line.lower()) if word)) \\\n", " .map(lambda chapterNoAndWord: (chapterNoAndWord, 1)) \\\n", " .reduceByKey(operator.add) \\\n", " .map(lambda ((chapterNo, word), count): (chapterNo, (word, count * idfBR.value[word]))) \\\n", " .sortBy(lambda (chapterNo, (word, tfidf)): tfidf, False) \\\n", " .groupByKey() \\\n", " .map(lambda (chapterNo, wordAndTfidfIterator): (chapterNo, list(islice(wordAndTfidfIterator, 10)))) \\\n", " .collectAsMap()\n" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Top TD-IDF words by chapter:\n", "I. Laying Plans: calculations counsel moral law seem deliberations determine believe seeking hearkens\n", "II. Waging War: chariots substance thousand wagons people's ardor spent twenty damped tenths\n", "III. Attack by Stratagem: bulwark win cities ignorant entire next besiege months walled destroy\n", "IV. Tactical Dispositions: estimation quantity measurement balancing defeating defeat sign fighter skilled mistakes\n", "V. Energy: energy indirect yet postulates decision combination effected combined simulated direct\n", "VI. Weak Points and Strong: weaken strengthen van water left rear what places hasten succor\n", "VII. Maneuvering: maneuvering lost drums goal banners fall art deviation refrain alone\n", "VIII. Variation in Tactics: leads dangerous faults roads trouble schemes sensitive varying solicitude affect\n", "IX. The Army on the March: get places river dust sent advancing about grass sign shows\n", "X. Terrain: result six gone towards halfway responsible garrisoned unaware regard ground\n", "XI. The Nine Situations: ground would i facile intersecting serious they desperate dispersive contentious\n", "XII. The Attack by Fire: fire burn days season aid stay developments special proper wind\n", "XIII. The Use of Spies: spies spy converted doomed surviving purposes information inward subtle was\n" ] } ], "source": [ "# format the output nicely\n", "print(\"Top TD-IDF words by chapter:\")\n", "for chapterNo, title in enumerate(chapterTitles):\n", " print(\"%s: %s\" % (title, \" \".join([ word for word,_ in topIDIDFWordsByChapter[chapterNo] ]) ))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*This (arguably) is a much better summary.*\n", "\n", "You can now play around modifying pieces of the code.\n", "\n", "When you are done and you are running off the local machine remember to *close the notebook* with `File/Close and Halt`." ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 1 }