{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 3.2 Machine Learnign - Text Classification\n", "\n", "This notebook demonstated building a simple classifier for text data. \n", "\n", "We will use part of the Enron email SPAM dataset and build a SPAM/HAM classifier." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As the first step we need to preprocess the data to convert them to a format suitable for distributed processing.\n", "\n", "The original data comes in two directories `ham` and `spam` each contaning many small files - an email per file.\n", "\n", "We will convert these to Hadoop Sequence Files." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import zipfile\n", "from os import path" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def zip2text(filename):\n", " def do(zipinfoItertor):\n", " with zipfile.ZipFile(filename, 'r') as zf:\n", " for zi in zipinfoItertor:\n", " with zf.open(zi) as zfe:\n", " yield (zi.filename, zfe.read())\n", " return do" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "with zipfile.ZipFile('data/ham.zip', 'r') as zf:\n", " ham = sc.parallelize(zf.infolist()) \\\n", " .mapPartitions(zip2text('data/ham.zip'))\n", "\n", "with zipfile.ZipFile('data/spam.zip', 'r') as zf:\n", " spam = sc.parallelize(zf.infolist()) \\\n", " .mapPartitions(zip2text('data/spam.zip'))" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "%%sh\n", "rm -rf output/ham.seq output/spam.seq" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": true }, "outputs": [], "source": [ "ham.saveAsSequenceFile('output/ham.seq')\n", "spam.saveAsSequenceFile('output/spam.seq')" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "_SUCCESS\n", "part-00000\n", "part-00001\n", "part-00002\n", "part-00003\n" ] } ], "source": [ "%%sh \n", "ls output/ham.seq" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can load the data to a DataFrame" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>filename</th>\n", " <th>label</th>\n", " <th>text</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>0001.1999-12-10.farmer.ham.txt</td>\n", " <td>0.0</td>\n", " <td>Subject: christmas tree farm pictures\\r\\n</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>0002.1999-12-13.farmer.ham.txt</td>\n", " <td>0.0</td>\n", " <td>Subject: vastar resources , inc .\\r\\ngary , pr...</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>0003.1999-12-14.farmer.ham.txt</td>\n", " <td>0.0</td>\n", " <td>Subject: calpine daily gas nomination\\r\\n- cal...</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>0004.1999-12-14.farmer.ham.txt</td>\n", " <td>0.0</td>\n", " <td>Subject: re : issue\\r\\nfyi - see note below - ...</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>0005.1999-12-14.farmer.ham.txt</td>\n", " <td>0.0</td>\n", " <td>Subject: meter 7268 nov allocation\\r\\nfyi .\\r\\...</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[filename: string, label: double, text: string]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from pyspark.sql import *\n", "\n", "# load the 'ham' data\n", "df_ham = sc.sequenceFile('output/ham.seq') \\\n", " .map(lambda (f,t):Row(label=0.0, filename=f, text=t)).toDF()\n", "\n", "# load the 'spam' data\n", "df_spam = sc.sequenceFile('output/spam.seq') \\\n", " .map(lambda (f,t):Row(label=1.0, filename=f, text=t)).toDF()\n", "\n", "# combine the two datasets \n", "df_data = df_ham.union(df_spam).cache()\n", "display(df_data.limit(5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's build our feature extraction pipeline, which involves:\n", "\n", "* tokenizing each email\n", "* creating a token frequency vector for each eamil\n", "* applying the IDF (inverse document requency)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>tokens</th>\n", " <th>tf</th>\n", " <th>idf</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>[subject:, christmas, tree, farm, pictures]</td>\n", " <td>(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...</td>\n", " <td>(0.0, 0.0, 0.0, 0.85227187019, 0.0, 0.0, 0.0, ...</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>[subject:, vastar, resources, ,, inc, ., , gar...</td>\n", " <td>(0.0, 0.0, 6.0, 0.0, 5.0, 12.0, 3.0, 33.0, 0.0...</td>\n", " <td>(0.0, 0.0, 4.14382370679, 0.0, 3.33880857933, ...</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>[subject:, calpine, daily, gas, nomination, , ...</td>\n", " <td>(0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...</td>\n", " <td>(0.0, 1.74576342699, 0.0, 0.0, 0.0, 0.0, 0.0, ...</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>[subject:, re, :, issue, , fyi, -, see, note, ...</td>\n", " <td>(6.0, 3.0, 2.0, 0.0, 0.0, 6.0, 3.0, 34.0, 0.0,...</td>\n", " <td>(4.65055555743, 2.61864514049, 1.38127456893, ...</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>[subject:, meter, 7268, nov, allocation, , fyi...</td>\n", " <td>(0.0, 0.0, 2.0, 1.0, 3.0, 4.0, 3.0, 26.0, 0.0,...</td>\n", " <td>(0.0, 0.0, 1.38127456893, 0.85227187019, 2.003...</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[tokens: array<string>, tf: vector, idf: vector]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from pyspark.ml.feature import HashingTF, IDF, Tokenizer\n", "from pyspark.ml import Pipeline\n", "\n", "tokenizer = Tokenizer(inputCol='text', outputCol='tokens')\n", "hashingTF = HashingTF(numFeatures=100, inputCol='tokens', outputCol='tf')\n", "idf = IDF(minDocFreq=3, inputCol='tf', outputCol='idf')\n", "\n", "# build the feature extraction pipeline\n", "pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])\n", "\n", "pipelineModel = pipeline.fit(df_data)\n", "df_data_tf = pipelineModel.transform(df_data)\n", "display(df_data_tf.select('tokens', 'tf','idf').limit(5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will use logistic regression to train the classification model" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from pyspark.ml.classification import LogisticRegression\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "\n", "# split the data into the traning and testing sets\n", "(trainData, testData) = df_data.randomSplit([0.7, 0.3])" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# construct and train the logistic regression pipeline\n", "\n", "lr = LogisticRegression(featuresCol='idf', labelCol='label')\n", "lrPipeline = Pipeline(stages= [pipeline, lr])\n", "lrPipelineModel = lrPipeline.fit(trainData)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Logistic regression AUC: 0.922094149048\n" ] } ], "source": [ "# evaluate the logistic regression model (the default metric is 'AUC')\n", "evaluator = BinaryClassificationEvaluator(rawPredictionCol=\"rawPrediction\")\n", "print(\"Logistic regression AUC: %s\" % evaluator.evaluate(lrPipelineModel.transform(testData)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's see if a more complex model (random forest) does better:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from pyspark.ml.classification import RandomForestClassifier\n", "\n", "# construct and train the random forest pipeline\n", "rf = RandomForestClassifier(featuresCol = 'idf', labelCol='label')\n", "rfPipeline = Pipeline(stages= [pipeline, rf])\n", "rfPipelineModel = rfPipeline.fit(trainData)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Random Forest AUC: 0.918341357895\n" ] } ], "source": [ "evaluator = BinaryClassificationEvaluator(rawPredictionCol=\"rawPrediction\")\n", "print(\"Random Forest AUC: %s\" % evaluator.evaluate(rfPipelineModel.transform(testData)))" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 1 }