# MapReduce

## Intuition

In [1]:
a = [[1,2,1], [3,2], [4,9,1,0,2]]

In [2]:
sums = map(sum, a)

In [3]:
sums = [] 
for sublist in a: 
 results = sum(sublist) 
 sums.append(results)

In [5]:
def add(a, b):
 return a + b 

In [6]:
from functools import reduce
print(reduce(add, sums, 0))

25


In [7]:
initial = 0
current_result = initial
for element in sums:
 current_result = add(current_result, element)

## Basic Example

In [8]:
from collections import defaultdict

def map_word_count(document_id, document):
 counts = defaultdict(int)
 for word in document.split():
 counts[word] += 1
 for word in counts:
 yield (word, counts[word])

In [9]:
def shuffle_words(results_generators):
 records = defaultdict(list)
 for results in results_generators:
 for word, count in results:
 records[word].append(count)
 for word in records:
 yield (word, records[word])

In [10]:
def reduce_counts(word, list_of_counts):
 return (word, sum(list_of_counts))

In [11]:
from sklearn.datasets import fetch_20newsgroups
dataset = fetch_20newsgroups(subset='train')
documents = dataset.data[:50]

In [12]:
map_results = map(map_word_count, range(len(documents)), documents)

In [13]:
shuffle_results = shuffle_words(map_results)

In [14]:
reduce_results = [reduce_counts(word, list_of_counts) for word, list_of_counts in shuffle_results]

In [15]:
print(reduce_results[:5])
print(len(reduce_results))

[('coming', 1), ("couldn't", 4), ('Jose,', 1), ('{As', 1), ('185c', 1)]
5036


In [16]:
from joblib import Parallel, delayed

In [17]:
def map_word_count(document_id, document):
 counts = defaultdict(int)
 for word in document.split():
 counts[word] += 1
 return list(counts.items())

In [18]:
map_results = Parallel(n_jobs=2)(delayed(map_word_count)(i, document)
 for i, document in enumerate(documents))

In [19]:
shuffle_results = shuffle_words(map_results)

In [21]:
list(shuffle_results)

[('coming', [1]),
 ("couldn't", [1, 1, 1, 1]),
 ('Jose,', [1]),
 ('{As', [1]),
 ('185c', [1]),
 ('burst', [5]),
 ('context.', [1]),
 ('copy,', [1]),
 ('**********************************************************************',
 [1]),
 ('Modular', [1]),
 ('Yeah,', [1]),
 ('parking', [1]),
 ('Prices!', [1]),
 ('em', [1]),
 ('record,', [1]),
 ('program', [1]),
 ('>philosophically<', [1]),
 ('kind', [1, 1]),
 ('opinions', [2, 1, 1]),
 ('cubic', [1]),
 ('vision', [1]),
 ('later', [1, 1, 1]),
 ('$3495,', [1]),
 ('she', [2, 1]),
 ('xray@is.rice.edu', [1]),
 ('up', [2, 2, 1, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 1, 3]),
 ('Callison', [1]),
 ('v8', [1]),
 ('No', [6, 1]),
 ('disobeys', [1]),
 ('term?', [1]),
 ('login', [1]),
 ('Most', [1, 1, 1, 3, 1]),
 ('kept', [1]),
 ('(Repost)', [1]),
 ('mean', [1, 1, 1]),
 ('luck,', [1]),
 ('punisher.caltech.edu', [1]),
 ('nCUBE', [1]),
 ('result', [1]),
 ('Problems???', [1]),
 ('(I', [2, 1]),
 ('Grow', [1]),
 ('Goalie', [1]),
 ('Binoculars', [1]),
 ('boots),', [1]

# NB Predict

In [1]:
import os
import re
import numpy as np
from collections import defaultdict
from operator import itemgetter

In [2]:
word_search_re = re.compile(r"[\w']+")

In [3]:
def load_model(model_filename):
 model = defaultdict(lambda: defaultdict(float))
 with open(model_filename) as inf:
 for line in inf:
 word, values = line.split(maxsplit=1)
 word = eval(word)
 values = eval(values)
 model[word] = values
 return model

In [4]:
model_filename = os.path.join(os.path.expanduser("~"), "models", "part-00000")
model = load_model(model_filename)

In [5]:
model["i"]["male"], model["i"]["female"]

(409.7987003114851, 513.3231594734408)

In [6]:
def nb_predict(model, document):
 words = word_search_re.findall(document)
 probabilities = defaultdict(lambda : 0)
 for word in set(words):
 probabilities["male"] += np.log(model[word].get("male", 1e-5))
 probabilities["female"] += np.log(model[word].get("female", 1e-5))
 # Now find the most likely gender
 most_likely_genders = sorted(probabilities.items(), key=itemgetter(1), reverse=True)
 return most_likely_genders[0][0]

In [7]:
new_post = """ Every day should be a half day. Took the afternoon off to hit the dentist, and while I was out I managed to get my oil changed, too. Remember that business with my car dealership this winter? Well, consider this the epilogue. The friendly fellas at the Valvoline Instant Oil Change on Snelling were nice enough to notice that my dipstick was broken, and the metal piece was too far down in its little dipstick tube to pull out. Looks like I'm going to need a magnet. Damn you, Kline Nissan, daaaaaaammmnnn yooouuuu.... Today I let my boss know that I've submitted my Corps application. The news has been greeted by everyone in the company with a level of enthusiasm that really floors me. The back deck has finally been cleared off by the construction company working on the place. This company, for anyone who's interested, consists mainly of one guy who spends his days cursing at his crew of Spanish-speaking laborers. Construction of my deck began around the time Nixon was getting out of office.
"""

In [8]:
nb_predict(model, new_post)

'male'

In [9]:
testing_folder = os.path.join(os.path.expanduser("~"), "Data", "blogposts_testing")
testing_filenames = []
for filename in os.listdir(testing_folder):
 testing_filenames.append(os.path.join(testing_folder, filename))

In [10]:
def nb_predict_many(model, input_filename):
 with open(input_filename) as inf:
 # remove leading and trailing whitespace
 for line in inf:
 tokens = line.split()
 actual_gender = eval(tokens[0])
 blog_post = eval(" ".join(tokens[1:]))
 yield actual_gender, nb_predict(model, blog_post)

In [11]:
def nb_predict(model, document):
 words = word_search_re.findall(document)
 probabilities = defaultdict(lambda : 1)
 for word in set(words):
 probabilities["male"] += np.log(model[word].get("male", 1e-15))
 probabilities["female"] += np.log(model[word].get("female", 1e-15))
 # Now find the most likely gender
 most_likely_genders = sorted(probabilities.items(), key=itemgetter(1), reverse=True)
 return most_likely_genders

In [13]:
y_true = []
y_pred = []
for testing_filename in testing_filenames:
 for actual_gender, ratios in nb_predict_many(model, testing_filename):
 predicted_gender = ratios[0][0]
 y_true.append(actual_gender == "female")
 y_pred.append(predicted_gender == "female")
y_true = np.array(y_true, dtype='int')
y_pred = np.array(y_pred, dtype='int')

In [14]:
from sklearn.metrics import f1_score
print("f1={:.4f}".format(f1_score(y_true, y_pred, pos_label=None)))
print("acc={:.4f}".format(np.mean(y_true == y_pred)))
 


f1=0.5540
acc=0.5765


In [15]:
aws_model_filename = os.path.join(os.path.expanduser("~"), "models", "model_aws")
aws_model = load_model(aws_model_filename)

In [16]:
y_true = []
y_pred = []
for testing_filename in testing_filenames:
 for actual_gender, predicted_gender in nb_predict_many(aws_model, testing_filename):
 predicted_gender = ratios[0][0]
 y_true.append(actual_gender == "female")
 y_pred.append(predicted_gender == "female")
 #print("Actual: {0}\tPredicted: {1}".format(actual_gender, predicted_gender))
 if len(y_true) > 500:
 break
y_true = np.array(y_true, dtype='int')
y_pred = np.array(y_pred, dtype='int')

In [17]:
print("f1={:.4f}".format(f1_score(y_true, y_pred, pos_label=None)))
print("acc={:.4f}".format(np.mean(y_true == y_pred)))

f1=0.8144
acc=0.8734


 'precision', 'predicted', average, warn_for)


In [18]:
print(list(zip(y_true, y_pred))[:10])

[(0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0)]


In [19]:
from sklearn.metrics import confusion_matrix

In [20]:
confusion_matrix(y_true, y_pred)

array([[614, 0],
 [ 89, 0]])

# Test load

In [1]:
import os
filename = os.path.join(os.path.expanduser("~"), "Data", "blogs", "1005545.male.25.Engineering.Sagittarius.xml")

In [3]:
all_posts = []
with open(filename) as inf:
 # remove leading and trailing whitespace
 post_start = False
 post = []
 for line in inf:
 line = line.strip()
 if line == "":
 post_start = True
 elif line == "":
 post_start = False
 all_posts.append("\n".join(post))
 post = []
 elif post_start:
 post.append(line)

In [4]:
len(all_posts)

80