{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 3.2 Machine Learnign - Text Classification\n",
    "\n",
    "This notebook demonstated building a simple classifier for text data. \n",
    "\n",
    "We will use part of the Enron email SPAM dataset and build a SPAM/HAM classifier."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As the first step we need to preprocess the data to convert them to a format suitable for distributed processing.\n",
    "\n",
    "The original data comes in two directories `ham` and `spam` each contaning many small files - an email per file.\n",
    "\n",
    "We will convert these to Hadoop Sequence Files."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import zipfile\n",
    "from os import path"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def zip2text(filename):\n",
    "    def do(zipinfoItertor):\n",
    "        with zipfile.ZipFile(filename, 'r') as zf:\n",
    "            for zi in zipinfoItertor:\n",
    "                with zf.open(zi) as zfe:\n",
    "                    yield (zi.filename, zfe.read())\n",
    "    return do"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "with zipfile.ZipFile('data/ham.zip', 'r') as zf:\n",
    "    ham = sc.parallelize(zf.infolist()) \\\n",
    "        .mapPartitions(zip2text('data/ham.zip'))\n",
    "\n",
    "with zipfile.ZipFile('data/spam.zip', 'r') as zf:\n",
    "    spam = sc.parallelize(zf.infolist()) \\\n",
    "        .mapPartitions(zip2text('data/spam.zip'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "%%sh\n",
    "rm -rf output/ham.seq output/spam.seq"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "ham.saveAsSequenceFile('output/ham.seq')\n",
    "spam.saveAsSequenceFile('output/spam.seq')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "_SUCCESS\n",
      "part-00000\n",
      "part-00001\n",
      "part-00002\n",
      "part-00003\n"
     ]
    }
   ],
   "source": [
    "%%sh \n",
    "ls output/ham.seq"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we can load the data to a DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>filename</th>\n",
       "      <th>label</th>\n",
       "      <th>text</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>0001.1999-12-10.farmer.ham.txt</td>\n",
       "      <td>0.0</td>\n",
       "      <td>Subject: christmas tree farm pictures\\r\\n</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>0002.1999-12-13.farmer.ham.txt</td>\n",
       "      <td>0.0</td>\n",
       "      <td>Subject: vastar resources , inc .\\r\\ngary , pr...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>0003.1999-12-14.farmer.ham.txt</td>\n",
       "      <td>0.0</td>\n",
       "      <td>Subject: calpine daily gas nomination\\r\\n- cal...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>0004.1999-12-14.farmer.ham.txt</td>\n",
       "      <td>0.0</td>\n",
       "      <td>Subject: re : issue\\r\\nfyi - see note below - ...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>0005.1999-12-14.farmer.ham.txt</td>\n",
       "      <td>0.0</td>\n",
       "      <td>Subject: meter 7268 nov allocation\\r\\nfyi .\\r\\...</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "DataFrame[filename: string, label: double, text: string]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "from pyspark.sql import *\n",
    "\n",
    "# load the 'ham' data\n",
    "df_ham = sc.sequenceFile('output/ham.seq') \\\n",
    "    .map(lambda (f,t):Row(label=0.0, filename=f, text=t)).toDF()\n",
    "\n",
    "# load the 'spam' data\n",
    "df_spam = sc.sequenceFile('output/spam.seq') \\\n",
    "    .map(lambda (f,t):Row(label=1.0, filename=f, text=t)).toDF()\n",
    "\n",
    "# combine the two datasets    \n",
    "df_data = df_ham.union(df_spam).cache()\n",
    "display(df_data.limit(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's build our feature extraction pipeline, which involves:\n",
    "\n",
    "* tokenizing each email\n",
    "* creating a token frequency vector for each eamil\n",
    "* applying the IDF (inverse document requency)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>tokens</th>\n",
       "      <th>tf</th>\n",
       "      <th>idf</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>[subject:, christmas, tree, farm, pictures]</td>\n",
       "      <td>(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...</td>\n",
       "      <td>(0.0, 0.0, 0.0, 0.85227187019, 0.0, 0.0, 0.0, ...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>[subject:, vastar, resources, ,, inc, ., , gar...</td>\n",
       "      <td>(0.0, 0.0, 6.0, 0.0, 5.0, 12.0, 3.0, 33.0, 0.0...</td>\n",
       "      <td>(0.0, 0.0, 4.14382370679, 0.0, 3.33880857933, ...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>[subject:, calpine, daily, gas, nomination, , ...</td>\n",
       "      <td>(0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...</td>\n",
       "      <td>(0.0, 1.74576342699, 0.0, 0.0, 0.0, 0.0, 0.0, ...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>[subject:, re, :, issue, , fyi, -, see, note, ...</td>\n",
       "      <td>(6.0, 3.0, 2.0, 0.0, 0.0, 6.0, 3.0, 34.0, 0.0,...</td>\n",
       "      <td>(4.65055555743, 2.61864514049, 1.38127456893, ...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>[subject:, meter, 7268, nov, allocation, , fyi...</td>\n",
       "      <td>(0.0, 0.0, 2.0, 1.0, 3.0, 4.0, 3.0, 26.0, 0.0,...</td>\n",
       "      <td>(0.0, 0.0, 1.38127456893, 0.85227187019, 2.003...</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "DataFrame[tokens: array<string>, tf: vector, idf: vector]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "from pyspark.ml.feature import HashingTF, IDF, Tokenizer\n",
    "from pyspark.ml import Pipeline\n",
    "\n",
    "tokenizer = Tokenizer(inputCol='text', outputCol='tokens')\n",
    "hashingTF = HashingTF(numFeatures=100, inputCol='tokens', outputCol='tf')\n",
    "idf = IDF(minDocFreq=3, inputCol='tf', outputCol='idf')\n",
    "\n",
    "# build the feature extraction pipeline\n",
    "pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])\n",
    "\n",
    "pipelineModel  = pipeline.fit(df_data)\n",
    "df_data_tf = pipelineModel.transform(df_data)\n",
    "display(df_data_tf.select('tokens', 'tf','idf').limit(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will use logistic regression to train the classification model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import LogisticRegression\n",
    "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n",
    "\n",
    "# split the data into the traning and testing sets\n",
    "(trainData, testData) = df_data.randomSplit([0.7, 0.3])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# construct and train the logistic regression pipeline\n",
    "\n",
    "lr = LogisticRegression(featuresCol='idf', labelCol='label')\n",
    "lrPipeline = Pipeline(stages= [pipeline, lr])\n",
    "lrPipelineModel = lrPipeline.fit(trainData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Logistic regression AUC: 0.922094149048\n"
     ]
    }
   ],
   "source": [
    "# evaluate the logistic regression model (the default metric is 'AUC')\n",
    "evaluator = BinaryClassificationEvaluator(rawPredictionCol=\"rawPrediction\")\n",
    "print(\"Logistic regression AUC: %s\" % evaluator.evaluate(lrPipelineModel.transform(testData)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's see if a more complex model (random forest) does better:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import RandomForestClassifier\n",
    "\n",
    "# construct and train the random forest pipeline\n",
    "rf = RandomForestClassifier(featuresCol = 'idf', labelCol='label')\n",
    "rfPipeline = Pipeline(stages= [pipeline, rf])\n",
    "rfPipelineModel = rfPipeline.fit(trainData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Random Forest AUC: 0.918341357895\n"
     ]
    }
   ],
   "source": [
    "evaluator = BinaryClassificationEvaluator(rawPredictionCol=\"rawPrediction\")\n",
    "print(\"Random Forest AUC: %s\" % evaluator.evaluate(rfPipelineModel.transform(testData)))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "python",
   "name": "pyspark"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}