{ "metadata": { "name": "" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "#Mining the Social Web, 2nd Edition\n", "\n", "##Chapter 6: Mining Mailboxes: Analyzing Who's Talking To Whom About What, How Often, and More\n", "\n", "This IPython Notebook provides an interactive way to follow along with and explore the numbered examples from [_Mining the Social Web (2nd Edition)_](http://bit.ly/135dHfs). The intent behind this notebook is to reinforce the concepts from the sample code in a fun, convenient, and effective way. This notebook assumes that you are reading along with the book and have the context of the discussion as you work through these exercises.\n", "\n", "In the somewhat unlikely event that you've somehow stumbled across this notebook outside of its context on GitHub, [you can find the full source code repository here](http://bit.ly/16kGNyb).\n", "\n", "## Copyright and Licensing\n", "\n", "You are free to use or adapt this notebook for any purpose you'd like. However, please respect the [Simplified BSD License](https://github.com/ptwobrussell/Mining-the-Social-Web-2nd-Edition/blob/master/LICENSE.txt) that governs its use." ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 1. Converting a toy mailbox to JSON" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import mailbox\n", "import email\n", "import json\n", "\n", "MBOX = 'resources/ch06-mailboxes/data/northpole.mbox'\n", "\n", "# A routine that makes a ton of simplifying assumptions\n", "# about converting an mbox message into a Python object\n", "# given the nature of the northpole.mbox file in order\n", "# to demonstrate the basic parsing of an mbox with mail\n", "# utilities\n", "\n", "def objectify_message(msg):\n", " \n", " # Map in fields from the message\n", " o_msg = dict([ (k, v) for (k,v) in msg.items() ])\n", " \n", " # Assume one part to the message and get its content\n", " # and its content type\n", " \n", " part = [p for p in msg.walk()][0]\n", " o_msg['contentType'] = part.get_content_type()\n", " o_msg['content'] = part.get_payload()\n", " \n", " return o_msg\n", "\n", "# Create an mbox that can be iterated over and transform each of its\n", "# messages to a convenient JSON representation\n", "\n", "mbox = mailbox.UnixMailbox(open(MBOX, 'rb'), email.message_from_file)\n", "\n", "messages = []\n", "\n", "while 1:\n", " msg = mbox.next()\n", " \n", " if msg is None: break\n", " \n", " messages.append(objectify_message(msg))\n", " \n", "print json.dumps(messages, indent=1)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**The following script downloads and extracts the Enron corpus to the proper working location and unarchive it**
\n", "This script adapts a function from http://stackoverflow.com/a/22776/2292400 to download a file and extract it to the proper working location. It's a lot of code just to download and extract a file but is included to minimize the configuration burden and demonstrate some of the vast possibilities with IPython Notebook. The file is about 450MB so it may take a while to download, and one of the interesting things about this script is that it displays an updated download status every 5 seconds.\n", "\n", "Alternatively, you could just download the file yourself and place it in the _ipynb/resources/ch06-mailboxes/data_ directory or use %%bash cell magic as discussed in the _Chapter0_ notebook to retrive the file with a Bash script that would just be a few lines long.\n", "\n", "**Warning** - Unfortunately, the download and decompression of the file is relatively fast compared to the time that it takes for Vagrant to synchronize the high number of files that decompress with the host machine, and at the current time, there isn't a known workaround that will speed this up for all platforms. _It may take longer than a hour for Vagrant to synchronize the thousands of files that decompress._ It is recommended that you let this code run at an opportune time such as when you step out for lunch or overnight.\n", "\n", "**Alternative Workaround** - The GitHub repository's _ipynb/resources/ch06-mailboxes/data_ directory contains a highly compressed version of the output (_enron.mbox.json.bz2_) that you'll get from *Example 3*. You can follow the instructions in the note after *Example 3* to import this data directly into MongoDB if the time to download and let Vagrant synchronize is too much of an inconvenience for you." ] }, { "cell_type": "code", "collapsed": false, "input": [ "import sys\n", "import urllib2\n", "import time\n", "import os\n", "import envoy # pip install envoy\n", "\n", "URL = \"http://www.cs.cmu.edu/~enron/enron_mail_20110402.tgz\"\n", "DOWNLOAD_DIR = \"resources/ch06-mailboxes/data\"\n", "\n", "# Downloads a file and displays a download status every 5 seconds\n", "\n", "def download(url, download_dir): \n", " file_name = url.split('/')[-1]\n", " u = urllib2.urlopen(url)\n", " f = open(os.path.join(download_dir, file_name), 'wb')\n", " meta = u.info()\n", " file_size = int(meta.getheaders(\"Content-Length\")[0])\n", " print \"Downloading: %s Bytes: %s\" % (file_name, file_size)\n", "\n", " file_size_dl = 0\n", " block_sz = 8192\n", " last_update = time.time()\n", " while True:\n", " buffer = u.read(block_sz)\n", " if not buffer:\n", " break\n", "\n", " file_size_dl += len(buffer)\n", " f.write(buffer)\n", " download_status = r\"%10d MB [%3.2f%%]\" % (file_size_dl / 1000000.0, file_size_dl * 100.0 / file_size)\n", " download_status = download_status + chr(8)*(len(download_status)+1)\n", " if time.time() - last_update > 5:\n", " print download_status,\n", " sys.stdout.flush()\n", " last_update = time.time()\n", " f.close()\n", " return f.name\n", "\n", "# Extracts a gzipped tarfile. e.g. \"$ tar xzf filename.tgz\"\n", "\n", "def tar_xzf(f):\n", " # Call out to the shell for a faster decompression.\n", " # This will still take a while because Vagrant synchronizes\n", " # thousands of files that are extracted to the host machine\n", " r = envoy.run(\"tar xzf %s -C %s\" % (f, DOWNLOAD_DIR))\n", " print r.std_out\n", " print r.std_err\n", "\n", "f = download(URL, DOWNLOAD_DIR)\n", "print \"Download complete: %s\" % (f,)\n", "tar_xzf(f)\n", "print \"Decompression complete\"\n", "print \"Data is ready\"" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 2. Converting the Enron corpus to a standardized mbox format" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import re\n", "import email\n", "from time import asctime\n", "import os\n", "import sys\n", "from dateutil.parser import parse # pip install python_dateutil\n", "\n", "# XXX: Download the Enron corpus to resources/ch06-mailboxes/data\n", "# and unarchive it there.\n", "\n", "MAILDIR = 'resources/ch06-mailboxes/data/enron_mail_20110402/' + \\\n", " 'enron_data/maildir' \n", "\n", "# Where to write the converted mbox\n", "MBOX = 'resources/ch06-mailboxes/data/enron.mbox'\n", "\n", "# Create a file handle that we'll be writing into...\n", "mbox = open(MBOX, 'w')\n", "\n", "# Walk the directories and process any folder named 'inbox'\n", "\n", "for (root, dirs, file_names) in os.walk(MAILDIR):\n", "\n", " if root.split(os.sep)[-1].lower() != 'inbox':\n", " continue\n", "\n", " # Process each message in 'inbox'\n", "\n", " for file_name in file_names:\n", " file_path = os.path.join(root, file_name)\n", " message_text = open(file_path).read()\n", "\n", " # Compute fields for the From_ line in a traditional mbox message\n", "\n", " _from = re.search(r\"From: ([^\\r]+)\", message_text).groups()[0]\n", " _date = re.search(r\"Date: ([^\\r]+)\", message_text).groups()[0]\n", "\n", " # Convert _date to the asctime representation for the From_ line\n", "\n", " _date = asctime(parse(_date).timetuple())\n", "\n", " msg = email.message_from_string(message_text)\n", " msg.set_unixfrom('From %s %s' % (_from, _date))\n", "\n", " mbox.write(msg.as_string(unixfrom=True) + \"\\n\\n\")\n", " \n", "mbox.close()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 3. Converting an mbox to a JSON structure suitable for import into MongoDB" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import sys\n", "import mailbox\n", "import email\n", "import quopri\n", "import json\n", "import time\n", "from BeautifulSoup import BeautifulSoup\n", "from dateutil.parser import parse\n", "\n", "MBOX = 'resources/ch06-mailboxes/data/enron.mbox'\n", "OUT_FILE = 'resources/ch06-mailboxes/data/enron.mbox.json'\n", "\n", "def cleanContent(msg):\n", "\n", " # Decode message from \"quoted printable\" format\n", " msg = quopri.decodestring(msg)\n", " \n", " # Strip out HTML tags, if any are present.\n", " # Bail on unknown encodings if errors happen in BeautifulSoup.\n", " try:\n", " soup = BeautifulSoup(msg)\n", " except:\n", " return ''\n", " return ''.join(soup.findAll(text=True))\n", "\n", "# There's a lot of data to process, and the Pythonic way to do it is with a \n", "# generator. See http://wiki.python.org/moin/Generators.\n", "# Using a generator requires a trivial encoder to be passed to json for object \n", "# serialization.\n", "\n", "class Encoder(json.JSONEncoder):\n", " def default(self, o): return list(o)\n", "\n", "# The generator itself...\n", "def gen_json_msgs(mb):\n", " while 1:\n", " msg = mb.next()\n", " if msg is None:\n", " break\n", " yield jsonifyMessage(msg)\n", " \n", "def jsonifyMessage(msg):\n", " json_msg = {'parts': []}\n", " for (k, v) in msg.items():\n", " json_msg[k] = v.decode('utf-8', 'ignore')\n", "\n", " # The To, Cc, and Bcc fields, if present, could have multiple items.\n", " # Note that not all of these fields are necessarily defined.\n", "\n", " for k in ['To', 'Cc', 'Bcc']:\n", " if not json_msg.get(k):\n", " continue\n", " json_msg[k] = json_msg[k].replace('\\n', '').replace('\\t', '').replace('\\r', '')\\\n", " .replace(' ', '').decode('utf-8', 'ignore').split(',')\n", "\n", " for part in msg.walk():\n", " json_part = {}\n", " if part.get_content_maintype() == 'multipart':\n", " continue\n", " \n", " json_part['contentType'] = part.get_content_type()\n", " content = part.get_payload(decode=False).decode('utf-8', 'ignore')\n", " json_part['content'] = cleanContent(content)\n", " \n", " json_msg['parts'].append(json_part)\n", " \n", " # Finally, convert date from asctime to milliseconds since epoch using the\n", " # $date descriptor so it imports \"natively\" as an ISODate object in MongoDB\n", " then = parse(json_msg['Date'])\n", " millis = int(time.mktime(then.timetuple())*1000 + then.microsecond/1000)\n", " json_msg['Date'] = {'$date' : millis}\n", "\n", " return json_msg\n", "\n", "mbox = mailbox.UnixMailbox(open(MBOX, 'rb'), email.message_from_file)\n", "\n", "# Write each message out as a JSON object on a separate line\n", "# for easy import into MongoDB via mongoimport\n", "\n", "f = open(OUT_FILE, 'w')\n", "for msg in gen_json_msgs(mbox):\n", " if msg != None:\n", " f.write(json.dumps(msg, cls=Encoder) + '\\n')\n", "f.close()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Workaround for obtaining output from Example 3.**\n", "\n", "If you have opted to use the compressed version of the output from *Example 3* that is checked-in as part of this GitHub repository, just execute the following cell, and you'll be able to proceed with *Example 4* as usual. See the previous note after *Example 1* for details on why this workaround may be helpful." ] }, { "cell_type": "code", "collapsed": false, "input": [ "import envoy\n", "\n", "# This data is checked-in to the repository and is a compressed \n", "# version of the output from Example 3\n", "\n", "F = 'resources/ch06-mailboxes/data/enron.mbox.json.bz2'\n", "\n", "r = envoy.run(\"bunzip2 %s\" % (F,))\n", "print r.std_out\n", "print r.std_err" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 4. Getting the options for the mongoimport command from IPython Notebook" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import envoy # pip install envoy\n", "\n", "r = envoy.run('mongoimport')\n", "print r.std_out\n", "print r.std_err" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 5. Using mongoimport to load data into MongoDB from IPython Notebook" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import os\n", "import sys\n", "import envoy\n", "\n", "data_file = os.path.join(os.getcwd(), 'resources/ch06-mailboxes/data/enron.mbox.json')\n", "\n", "# Run a command just as you would in a terminal on the virtual machine to \n", "# import the data file into MongoDB.\n", "r = envoy.run('mongoimport --db enron --collection mbox ' + \\\n", " '--file %s' % data_file)\n", "\n", "# Print its standard output\n", "print r.std_out\n", "print sys.stderr.write(r.std_err)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 6. Simulating a MongoDB shell that you can run from within IPython Notebook" ] }, { "cell_type": "code", "collapsed": false, "input": [ "# We can even simulate a MongoDB shell using envoy to execute commands.\n", "# For example, let's get some stats out of MongoDB just as though we were working \n", "# in a shell by passing it the command and wrapping it in a printjson function to \n", "# display it for us.\n", "\n", "def mongo(db, cmd):\n", " r = envoy.run(\"mongo %s --eval 'printjson(%s)'\" % (db, cmd,))\n", " print r.std_out\n", " if r.std_err: print r.std_err\n", " \n", "mongo('enron', 'db.mbox.stats()')" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 7. Using PyMongo to access MongoDB from Python" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import json\n", "import pymongo # pip install pymongo\n", "from bson import json_util # Comes with pymongo\n", "\n", "# Connects to the MongoDB server running on \n", "# localhost:27017 by default\n", "\n", "client = pymongo.MongoClient()\n", "\n", "# Get a reference to the enron database\n", "\n", "db = client.enron\n", "\n", "# Reference the mbox collection in the Enron database\n", "\n", "mbox = db.mbox\n", "\n", "# The number of messages in the collection\n", "\n", "print \"Number of messages in mbox:\"\n", "print mbox.count()\n", "print\n", "\n", "# Pick a message to look at...\n", "\n", "msg = mbox.find_one()\n", "\n", "# Display the message as pretty-printed JSON. The use of\n", "# the custom serializer supplied by PyMongo is necessary in order\n", "# to handle the date field that is provided as a datetime.datetime \n", "# tuple.\n", "\n", "print \"A message:\"\n", "print json.dumps(msg, indent=1, default=json_util.default)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 8. Querying MongoDB by date/time range" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import json\n", "import pymongo # pip install pymongo\n", "from bson import json_util # Comes with pymongo\n", "from datetime import datetime as dt\n", "\n", "client = pymongo.MongoClient()\n", "\n", "db = client.enron\n", "\n", "mbox = db.mbox\n", "\n", "# Create a small date range here of one day\n", "\n", "start_date = dt(2001, 4, 1) # Year, Month, Day\n", "end_date = dt(2001, 4, 2) # Year, Month, Day\n", "\n", "# Query the database with the highly versatile \"find\" command,\n", "# just like in the MongoDB shell.\n", "\n", "msgs = [ msg \n", " for msg in mbox.find({\"Date\" : \n", " {\n", " \"$lt\" : end_date, \n", " \"$gt\" : start_date\n", " }\n", " }).sort(\"date\")]\n", "\n", "# Create a convenience function to make pretty-printing JSON a little\n", "# less cumbersome\n", "\n", "def pp(o, indent=1):\n", " print json.dumps(msgs, indent=indent, default=json_util.default)\n", "\n", "print \"Messages from a query by date range:\"\n", "pp(msgs)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 9. Enumerating senders and receivers of messages" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import json\n", "import pymongo # pip install pymongo\n", "from bson import json_util # Comes with pymongo\n", "\n", "client = pymongo.MongoClient()\n", "db = client.enron\n", "mbox = db.mbox\n", "\n", "senders = [ i for i in mbox.distinct(\"From\") ]\n", "\n", "receivers = [ i for i in mbox.distinct(\"To\") ]\n", "\n", "cc_receivers = [ i for i in mbox.distinct(\"Cc\") ]\n", "\n", "bcc_receivers = [ i for i in mbox.distinct(\"Bcc\") ]\n", "\n", "print \"Num Senders:\", len(senders)\n", "print \"Num Receivers:\", len(receivers)\n", "print \"Num CC Receivers:\", len(cc_receivers)\n", "print \"Num BCC Receivers:\", len(bcc_receivers)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 10. Analyzing senders and receivers with set operations" ] }, { "cell_type": "code", "collapsed": false, "input": [ "senders = set(senders)\n", "receivers = set(receivers)\n", "cc_receivers = set(cc_receivers)\n", "bcc_receivers = set(bcc_receivers)\n", "\n", "# Find the number of senders who were also direct receivers\n", "\n", "senders_intersect_receivers = senders.intersection(receivers)\n", "\n", "# Find the senders that didn't receive any messages\n", "\n", "senders_diff_receivers = senders.difference(receivers)\n", "\n", "# Find the receivers that didn't send any messages\n", "\n", "receivers_diff_senders = receivers.difference(senders)\n", "\n", "# Find the senders who were any kind of receiver by\n", "# first computing the union of all types of receivers\n", "\n", "all_receivers = receivers.union(cc_receivers, bcc_receivers)\n", "senders_all_receivers = senders.intersection(all_receivers)\n", "\n", "print \"Num senders in common with receivers:\", len(senders_intersect_receivers)\n", "print \"Num senders who didn't receive:\", len(senders_diff_receivers)\n", "print \"Num receivers who didn't send:\", len(receivers_diff_senders)\n", "print \"Num senders in common with *all* receivers:\", len(senders_all_receivers)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 11. Finding senders and receivers of messages who were Enron employees" ] }, { "cell_type": "code", "collapsed": false, "input": [ "# In a Mongo shell, you could try this query for the same effect:\n", "# db.mbox.find({\"To\" : {\"$regex\" : /.*enron.com.*/i} }, \n", "# {\"To\" : 1, \"_id\" : 0})\n", "\n", "senders = [ i \n", " for i in mbox.distinct(\"From\") \n", " if i.lower().find(\"@enron.com\") > -1 ]\n", "\n", "receivers = [ i \n", " for i in mbox.distinct(\"To\") \n", " if i.lower().find(\"@enron.com\") > -1 ]\n", "\n", "cc_receivers = [ i \n", " for i in mbox.distinct(\"Cc\") \n", " if i.lower().find(\"@enron.com\") > -1 ]\n", "\n", "bcc_receivers = [ i \n", " for i in mbox.distinct(\"Bcc\") \n", " if i.lower().find(\"@enron.com\") > -1 ]\n", "\n", "print \"Num Senders:\", len(senders)\n", "print \"Num Receivers:\", len(receivers)\n", "print \"Num CC Receivers:\", len(cc_receivers)\n", "print \"Num BCC Receivers:\", len(bcc_receivers)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 12. Counting sent/received messages for particular email addresses" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import json\n", "import pymongo # pip install pymongo\n", "from bson import json_util # Comes with pymongo\n", "\n", "client = pymongo.MongoClient()\n", "db = client.enron\n", "mbox = db.mbox\n", "\n", "aliases = [\"kenneth.lay@enron.com\", \"ken_lay@enron.com\", \"ken.lay@enron.com\", \n", " \"kenneth_lay@enron.net\", \"klay@enron.com\"] # More possibilities?\n", "\n", "to_msgs = [ msg \n", " for msg in mbox.find({\"To\" : { \"$in\" : aliases } })]\n", "\n", "from_msgs = [ msg \n", " for msg in mbox.find({\"From\" : { \"$in\" : aliases } })]\n", "\n", "print \"Number of message sent to:\", len(to_msgs)\n", "print \"Number of messages sent from:\", len(from_msgs)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 13. Using MongoDB's data aggregation framework" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import json\n", "import pymongo # pip install pymongo\n", "from bson import json_util # Comes with pymongo\n", "\n", "# The basis of our query\n", "FROM = \"kenneth.lay@enron.com\"\n", "\n", "client = pymongo.MongoClient()\n", "db = client.enron\n", "mbox = db.mbox\n", "\n", "# Get the recipient lists for each message\n", "\n", "recipients_per_message = db.mbox.aggregate([\n", " {\"$match\" : {\"From\" : FROM} }, \n", " {\"$project\" : {\"From\" : 1, \"To\" : 1} }, \n", " {\"$group\" : {\"_id\" : \"$From\", \"recipients\" : {\"$addToSet\" : \"$To\" } } } \n", "])['result'][0]['recipients']\n", "\n", "# Collapse the lists of recipients into a single list\n", "\n", "all_recipients = [recipient\n", " for message in recipients_per_message\n", " for recipient in message]\n", "\n", "# Calculate the number of recipients per sent message and sort\n", "\n", "recipients_per_message_totals = \\\n", " sorted([len(recipients) \n", " for recipients in recipients_per_message])\n", "\n", "# Demonstrate how to use $unwind followed by $group to collapse\n", "# the recipient lists into a single list (with no duplicates\n", "# per the $addToSet operator)\n", " \n", "unique_recipients = db.mbox.aggregate([\n", " {\"$match\" : {\"From\" : FROM} }, \n", " {\"$project\" : {\"From\" : 1, \"To\" : 1} }, \n", " {\"$unwind\" : \"$To\"}, \n", " {\"$group\" : {\"_id\" : \"From\", \"recipients\" : {\"$addToSet\" : \"$To\"}} }\n", "])['result'][0]['recipients']\n", "\n", "print \"Num total recipients on all messages:\", len(all_recipients)\n", "print \"Num recipients for each message:\", recipients_per_message_totals\n", "print \"Num unique recipients\", len(unique_recipients)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 14. Creating a text index on MongoDB documents with PyMongo" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import json\n", "import pymongo # pip install pymongo\n", "from bson import json_util # Comes with pymongo\n", "\n", "client = pymongo.MongoClient()\n", "db = client.enron\n", "mbox = db.mbox\n", "\n", "# Create an index if it doesn't already exist\n", "mbox.ensure_index([(\"$**\", \"text\")], name=\"TextIndex\")\n", "\n", "# Get the collection stats (collstats) on a collection\n", "# named \"mbox\"\n", "print json.dumps(db.command(\"collstats\", \"mbox\"), indent=1)\n", "\n", "# Use the db.command method to issue a \"text\" command\n", "# on collection \"mbox\" with parameters, remembering that\n", "# we need to use json_util to handle serialization of our JSON\n", "print json.dumps(db.command(\"text\", \"mbox\", \n", " search=\"raptor\", \n", " limit=1), \n", " indent=1, default=json_util.default)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 15. Aggregate querying for counts of messages by date/time range" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import json\n", "import pymongo # pip install pymongo\n", "from bson import json_util # Comes with pymongo\n", "\n", "client = pymongo.MongoClient()\n", "db = client.enron\n", "mbox = db.mbox\n", "\n", "results = mbox.aggregate([\n", "{\n", " # Create a subdocument called DateBucket with each date component projected\n", " # so that these fields can be grouped on in the next stage of the pipeline\n", " \"$project\" :\n", " {\n", " \"_id\" : 0,\n", " \"DateBucket\" : \n", " {\n", " \"year\" : {\"$year\" : \"$Date\"}, \n", " \"month\" : {\"$month\" : \"$Date\"}, \n", " \"day\" : {\"$dayOfMonth\" : \"$Date\"},\n", " \"hour\" : {\"$hour\" : \"$Date\"},\n", " }\n", " }\n", "},\n", "{\n", " \"$group\" : \n", " {\n", " # Group by year and date by using these fields for the key.\n", " \"_id\" : {\"year\" : \"$DateBucket.year\", \"month\" : \"$DateBucket.month\"},\n", " \n", " # Increment the sum for each group by 1 for every document that's in it\n", " \"num_msgs\" : {\"$sum\" : 1}\n", " }\n", "},\n", "{\n", " \"$sort\" : {\"_id.year\" : 1, \"_id.month\" : 1} \n", "}\n", "])\n", "\n", "print results" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 16. Rendering time series results as a nicely displayed table" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from prettytable import PrettyTable\n", "\n", "\n", "pt = PrettyTable(field_names=['Year', 'Month', 'Num Msgs'])\n", "pt.align['Num Msgs'], pt.align['Month'] = 'r', 'r'\n", "[ pt.add_row([ result['_id']['year'], result['_id']['month'], result['num_msgs'] ]) \n", " for result in results['result'] ]\n", "\n", "print pt" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 17. Connecting to Gmail with Xoauth" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import sys\n", "import oauth2 as oauth\n", "import oauth2.clients.imap as imaplib\n", "\n", "# See http://code.google.com/p/google-mail-xoauth-tools/wiki/\n", "# XoauthDotPyRunThrough for details on obtaining and\n", "# running xoauth.py to get the credentials\n", "\n", "OAUTH_TOKEN = '' # XXX: Obtained with xoauth.py\n", "OAUTH_TOKEN_SECRET = '' # XXX: Obtained with xoauth.py\n", "GMAIL_ACCOUNT = '' # XXX: Your Gmail address - example@gmail.com\n", "\n", "url = 'https://mail.google.com/mail/b/%s/imap/' % (GMAIL_ACCOUNT, )\n", "\n", "# Standard values for Gmail's Xoauth\n", "consumer = oauth.Consumer('anonymous', 'anonymous') \n", "token = oauth.Token(OAUTH_TOKEN, OAUTH_TOKEN_SECRET)\n", "\n", "conn = imaplib.IMAP4_SSL('imap.googlemail.com')\n", "conn.debug = 4 # Set to the desired debug level\n", "conn.authenticate(url, consumer, token)\n", "\n", "conn.select('INBOX')\n", "\n", "# Access your INBOX data" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Example 18. Query your Gmail inbox and store the results as JSON" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import sys\n", "import mailbox\n", "import email\n", "import quopri\n", "import json\n", "import time\n", "from BeautifulSoup import BeautifulSoup\n", "from dateutil.parser import parse\n", "\n", "# What you'd like to search for in the subject of your mail.\n", "# See Section 6.4.4 of http://www.faqs.org/rfcs/rfc3501.html\n", "# for more SEARCH options.\n", "\n", "Q = \"Alaska\" # XXX\n", "\n", "# Recycle some routines from Example 6-3 so that you arrive at the\n", "# very same data structure you've been using throughout this chapter\n", "\n", "def cleanContent(msg):\n", "\n", " # Decode message from \"quoted printable\" format\n", " msg = quopri.decodestring(msg)\n", " \n", " # Strip out HTML tags, if any are present.\n", " # Bail on unknown encodings if errors happen in BeautifulSoup.\n", " try:\n", " soup = BeautifulSoup(msg)\n", " except:\n", " return ''\n", " return ''.join(soup.findAll(text=True))\n", "\n", "def jsonifyMessage(msg):\n", " json_msg = {'parts': []}\n", " for (k, v) in msg.items():\n", " json_msg[k] = v.decode('utf-8', 'ignore')\n", "\n", " # The To, Cc, and Bcc fields, if present, could have multiple items.\n", " # Note that not all of these fields are necessarily defined.\n", "\n", " for k in ['To', 'Cc', 'Bcc']:\n", " if not json_msg.get(k):\n", " continue\n", " json_msg[k] = json_msg[k].replace('\\n', '').replace('\\t', '')\\\n", " .replace('\\r', '').replace(' ', '')\\\n", " .decode('utf-8', 'ignore').split(',')\n", "\n", " for part in msg.walk():\n", " json_part = {}\n", " if part.get_content_maintype() == 'multipart':\n", " continue\n", " \n", " json_part['contentType'] = part.get_content_type()\n", " content = part.get_payload(decode=False).decode('utf-8', 'ignore')\n", " json_part['content'] = cleanContent(content)\n", " \n", " json_msg['parts'].append(json_part)\n", " \n", " # Finally, convert date from asctime to milliseconds since epoch using the\n", " # $date descriptor so it imports \"natively\" as an ISODate object in MongoDB.\n", " then = parse(json_msg['Date'])\n", " millis = int(time.mktime(then.timetuple())*1000 + then.microsecond/1000)\n", " json_msg['Date'] = {'$date' : millis}\n", "\n", " return json_msg\n", "\n", "# Consume a query from the user. This example illustrates searching by subject.\n", "\n", "(status, data) = conn.search(None, '(SUBJECT \"%s\")' % (Q, ))\n", "ids = data[0].split()\n", "\n", "messages = []\n", "for i in ids:\n", " try:\n", " (status, data) = conn.fetch(i, '(RFC822)')\n", " messages.append(email.message_from_string(data[0][1]))\n", " except Exception, e:\n", " print e\n", " print 'Print error fetching message %s. Skipping it.' % (i, )\n", "\n", "print len(messages)\n", "jsonified_messages = [jsonifyMessage(m) for m in messages]\n", "\n", "# Separate out the text content from each message so that it can be analyzed.\n", "\n", "content = [p['content'] for m in jsonified_messages for p in m['parts']]\n", "\n", "# Content can still be quite messy and contain line breaks and other quirks.\n", "\n", "filename = os.path.join('resources/ch06-mailboxes/data', \n", " GMAIL_ACCOUNT.split(\"@\")[0] + '.gmail.json')\n", "f = open(filename, 'w')\n", "f.write(json.dumps(jsonified_messages))\n", "f.close()\n", "\n", "print >> sys.stderr, \"Data written out to\", f.name" ], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }