{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 3.2 Machine Learnign - Text Classification\n",
"\n",
"This notebook demonstated building a simple classifier for text data. \n",
"\n",
"We will use part of the Enron email SPAM dataset and build a SPAM/HAM classifier."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As the first step we need to preprocess the data to convert them to a format suitable for distributed processing.\n",
"\n",
"The original data comes in two directories `ham` and `spam` each contaning many small files - an email per file.\n",
"\n",
"We will convert these to Hadoop Sequence Files."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import zipfile\n",
"from os import path"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def zip2text(filename):\n",
" def do(zipinfoItertor):\n",
" with zipfile.ZipFile(filename, 'r') as zf:\n",
" for zi in zipinfoItertor:\n",
" with zf.open(zi) as zfe:\n",
" yield (zi.filename, zfe.read())\n",
" return do"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"with zipfile.ZipFile('data/ham.zip', 'r') as zf:\n",
" ham = sc.parallelize(zf.infolist()) \\\n",
" .mapPartitions(zip2text('data/ham.zip'))\n",
"\n",
"with zipfile.ZipFile('data/spam.zip', 'r') as zf:\n",
" spam = sc.parallelize(zf.infolist()) \\\n",
" .mapPartitions(zip2text('data/spam.zip'))"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%%sh\n",
"rm -rf output/ham.seq output/spam.seq"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ham.saveAsSequenceFile('output/ham.seq')\n",
"spam.saveAsSequenceFile('output/spam.seq')"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"_SUCCESS\n",
"part-00000\n",
"part-00001\n",
"part-00002\n",
"part-00003\n"
]
}
],
"source": [
"%%sh \n",
"ls output/ham.seq"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can load the data to a DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"
\n",
" \n",
" \n",
" | \n",
" filename | \n",
" label | \n",
" text | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 0001.1999-12-10.farmer.ham.txt | \n",
" 0.0 | \n",
" Subject: christmas tree farm pictures\\r\\n | \n",
"
\n",
" \n",
" | 1 | \n",
" 0002.1999-12-13.farmer.ham.txt | \n",
" 0.0 | \n",
" Subject: vastar resources , inc .\\r\\ngary , pr... | \n",
"
\n",
" \n",
" | 2 | \n",
" 0003.1999-12-14.farmer.ham.txt | \n",
" 0.0 | \n",
" Subject: calpine daily gas nomination\\r\\n- cal... | \n",
"
\n",
" \n",
" | 3 | \n",
" 0004.1999-12-14.farmer.ham.txt | \n",
" 0.0 | \n",
" Subject: re : issue\\r\\nfyi - see note below - ... | \n",
"
\n",
" \n",
" | 4 | \n",
" 0005.1999-12-14.farmer.ham.txt | \n",
" 0.0 | \n",
" Subject: meter 7268 nov allocation\\r\\nfyi .\\r\\... | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
"DataFrame[filename: string, label: double, text: string]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"from pyspark.sql import *\n",
"\n",
"# load the 'ham' data\n",
"df_ham = sc.sequenceFile('output/ham.seq') \\\n",
" .map(lambda (f,t):Row(label=0.0, filename=f, text=t)).toDF()\n",
"\n",
"# load the 'spam' data\n",
"df_spam = sc.sequenceFile('output/spam.seq') \\\n",
" .map(lambda (f,t):Row(label=1.0, filename=f, text=t)).toDF()\n",
"\n",
"# combine the two datasets \n",
"df_data = df_ham.union(df_spam).cache()\n",
"display(df_data.limit(5))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's build our feature extraction pipeline, which involves:\n",
"\n",
"* tokenizing each email\n",
"* creating a token frequency vector for each eamil\n",
"* applying the IDF (inverse document requency)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" tokens | \n",
" tf | \n",
" idf | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" [subject:, christmas, tree, farm, pictures] | \n",
" (0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | \n",
" (0.0, 0.0, 0.0, 0.85227187019, 0.0, 0.0, 0.0, ... | \n",
"
\n",
" \n",
" | 1 | \n",
" [subject:, vastar, resources, ,, inc, ., , gar... | \n",
" (0.0, 0.0, 6.0, 0.0, 5.0, 12.0, 3.0, 33.0, 0.0... | \n",
" (0.0, 0.0, 4.14382370679, 0.0, 3.33880857933, ... | \n",
"
\n",
" \n",
" | 2 | \n",
" [subject:, calpine, daily, gas, nomination, , ... | \n",
" (0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | \n",
" (0.0, 1.74576342699, 0.0, 0.0, 0.0, 0.0, 0.0, ... | \n",
"
\n",
" \n",
" | 3 | \n",
" [subject:, re, :, issue, , fyi, -, see, note, ... | \n",
" (6.0, 3.0, 2.0, 0.0, 0.0, 6.0, 3.0, 34.0, 0.0,... | \n",
" (4.65055555743, 2.61864514049, 1.38127456893, ... | \n",
"
\n",
" \n",
" | 4 | \n",
" [subject:, meter, 7268, nov, allocation, , fyi... | \n",
" (0.0, 0.0, 2.0, 1.0, 3.0, 4.0, 3.0, 26.0, 0.0,... | \n",
" (0.0, 0.0, 1.38127456893, 0.85227187019, 2.003... | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
"DataFrame[tokens: array, tf: vector, idf: vector]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"from pyspark.ml.feature import HashingTF, IDF, Tokenizer\n",
"from pyspark.ml import Pipeline\n",
"\n",
"tokenizer = Tokenizer(inputCol='text', outputCol='tokens')\n",
"hashingTF = HashingTF(numFeatures=100, inputCol='tokens', outputCol='tf')\n",
"idf = IDF(minDocFreq=3, inputCol='tf', outputCol='idf')\n",
"\n",
"# build the feature extraction pipeline\n",
"pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])\n",
"\n",
"pipelineModel = pipeline.fit(df_data)\n",
"df_data_tf = pipelineModel.transform(df_data)\n",
"display(df_data_tf.select('tokens', 'tf','idf').limit(5))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will use logistic regression to train the classification model"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.ml.classification import LogisticRegression\n",
"from pyspark.ml.evaluation import BinaryClassificationEvaluator\n",
"\n",
"# split the data into the traning and testing sets\n",
"(trainData, testData) = df_data.randomSplit([0.7, 0.3])"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# construct and train the logistic regression pipeline\n",
"\n",
"lr = LogisticRegression(featuresCol='idf', labelCol='label')\n",
"lrPipeline = Pipeline(stages= [pipeline, lr])\n",
"lrPipelineModel = lrPipeline.fit(trainData)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Logistic regression AUC: 0.922094149048\n"
]
}
],
"source": [
"# evaluate the logistic regression model (the default metric is 'AUC')\n",
"evaluator = BinaryClassificationEvaluator(rawPredictionCol=\"rawPrediction\")\n",
"print(\"Logistic regression AUC: %s\" % evaluator.evaluate(lrPipelineModel.transform(testData)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's see if a more complex model (random forest) does better:"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.ml.classification import RandomForestClassifier\n",
"\n",
"# construct and train the random forest pipeline\n",
"rf = RandomForestClassifier(featuresCol = 'idf', labelCol='label')\n",
"rfPipeline = Pipeline(stages= [pipeline, rf])\n",
"rfPipelineModel = rfPipeline.fit(trainData)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Random Forest AUC: 0.918341357895\n"
]
}
],
"source": [
"evaluator = BinaryClassificationEvaluator(rawPredictionCol=\"rawPrediction\")\n",
"print(\"Random Forest AUC: %s\" % evaluator.evaluate(rfPipelineModel.transform(testData)))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "python",
"name": "pyspark"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 1
}