# Word Count Example with Apache Spark (PySpark)

In this notebook we will go through the traditional Word Count example but we will cover map, flatmap, filter, count, reduceByKey, sortByKey and enhanced word count.

`
@author: Anindya Saha  
@email: mail.anindya@gmail.com
`

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master('local[*]').appName('wordcount-pyspark').getOrCreate()

In [3]:
spark

In [4]:
infile ='data/wordcount.txt'

**Read the file:**

In [5]:
# read the file
word_rdd = spark.sparkContext.textFile(infile).cache()

**1. Read the file, print every line:**

In [6]:
data = word_rdd

In [7]:
# print each line of the book
for line in data.collect():
    print(line)

Management (or managing) is the administration of an organization, whether it be a business, a not-for-profit organization, or government body. Management includes the activities of setting the strategy of an organization and coordinating the efforts of its employees or volunteers to accomplish its objectives through the application of available resources, such as financial, natural, technological, and human resources. The term "management" may also refer to the people who manage an organization.
Management is also an academic discipline, a social science whose objective is to study social organization and organizational leadership. Management is studied at colleges and universities; some important degrees in management are the Bachelor of Commerce (B.Com.) and Master of Business Administration (M.B.A.) and, for the public sector, the Master of Public Administration (MPA) degree. Individuals who aim at becoming management researchers or professors may complete the Doctor of Business Ad

**2. Read the file, print every word:**

In [8]:
data = word_rdd.flatMap(lambda line: line.split(" "))

In [9]:
# print each and every word
print(data.collect())

['Management', '(or', 'managing)', 'is', 'the', 'administration', 'of', 'an', 'organization,', 'whether', 'it', 'be', 'a', 'business,', 'a', 'not-for-profit', 'organization,', 'or', 'government', 'body.', 'Management', 'includes', 'the', 'activities', 'of', 'setting', 'the', 'strategy', 'of', 'an', 'organization', 'and', 'coordinating', 'the', 'efforts', 'of', 'its', 'employees', 'or', 'volunteers', 'to', 'accomplish', 'its', 'objectives', 'through', 'the', 'application', 'of', 'available', 'resources,', 'such', 'as', 'financial,', 'natural,', 'technological,', 'and', 'human', 'resources.', 'The', 'term', '"management"', 'may', 'also', 'refer', 'to', 'the', 'people', 'who', 'manage', 'an', 'organization.', 'Management', 'is', 'also', 'an', 'academic', 'discipline,', 'a', 'social', 'science', 'whose', 'objective', 'is', 'to', 'study', 'social', 'organization', 'and', 'organizational', 'leadership.', 'Management', 'is', 'studied', 'at', 'colleges', 'and', 'universities;', 'some', 'import

**3. Read the file, generate words, filter out "empty" word and print each word:**

In [10]:
data = (word_rdd
        .flatMap(lambda line: line.split(" "))
        .filter(lambda word: len(word) > 0)
       )

In [11]:
# print each and every word
print(data.collect())

['Management', '(or', 'managing)', 'is', 'the', 'administration', 'of', 'an', 'organization,', 'whether', 'it', 'be', 'a', 'business,', 'a', 'not-for-profit', 'organization,', 'or', 'government', 'body.', 'Management', 'includes', 'the', 'activities', 'of', 'setting', 'the', 'strategy', 'of', 'an', 'organization', 'and', 'coordinating', 'the', 'efforts', 'of', 'its', 'employees', 'or', 'volunteers', 'to', 'accomplish', 'its', 'objectives', 'through', 'the', 'application', 'of', 'available', 'resources,', 'such', 'as', 'financial,', 'natural,', 'technological,', 'and', 'human', 'resources.', 'The', 'term', '"management"', 'may', 'also', 'refer', 'to', 'the', 'people', 'who', 'manage', 'an', 'organization.', 'Management', 'is', 'also', 'an', 'academic', 'discipline,', 'a', 'social', 'science', 'whose', 'objective', 'is', 'to', 'study', 'social', 'organization', 'and', 'organizational', 'leadership.', 'Management', 'is', 'studied', 'at', 'colleges', 'and', 'universities;', 'some', 'import

**4. Read the file, generate words, trim each word, put in lowercase, replace special char, filter out "empty" word and print each word:**

In [12]:
import re
data = (word_rdd
        .flatMap(lambda line: line.split(" "))
        .map(lambda word: word.strip().lower())
        .map(lambda word: re.sub("[,.:;'\"\\?\\-!\\(\\)]", "", word))
        .filter(lambda word: len(word) > 2)
       )

In [13]:
# print each and every word
print(data.collect())

['management', 'managing', 'the', 'administration', 'organization', 'whether', 'business', 'notforprofit', 'organization', 'government', 'body', 'management', 'includes', 'the', 'activities', 'setting', 'the', 'strategy', 'organization', 'and', 'coordinating', 'the', 'efforts', 'its', 'employees', 'volunteers', 'accomplish', 'its', 'objectives', 'through', 'the', 'application', 'available', 'resources', 'such', 'financial', 'natural', 'technological', 'and', 'human', 'resources', 'the', 'term', 'management', 'may', 'also', 'refer', 'the', 'people', 'who', 'manage', 'organization', 'management', 'also', 'academic', 'discipline', 'social', 'science', 'whose', 'objective', 'study', 'social', 'organization', 'and', 'organizational', 'leadership', 'management', 'studied', 'colleges', 'and', 'universities', 'some', 'important', 'degrees', 'management', 'are', 'the', 'bachelor', 'commerce', 'bcom', 'and', 'master', 'business', 'administration', 'mba', 'and', 'for', 'the', 'public', 'sector', 

**5. Read the file, generate words, trim each word, put in lower case, replace special char, filter out "empty" word, count each word and print the word and the count:**

In [14]:
import re
data = (word_rdd
        .flatMap(lambda line: line.split(" "))
        .map(lambda word: word.strip().lower())
        .map(lambda word: re.sub("[,.:;'\"\\?\\-!\\(\\)]", "", word))
        .filter(lambda word: len(word) > 2)
        .map(lambda word: (word, 1))
        .reduceByKey(lambda a, b: a + b)
       )

In [15]:
# print each and every word
print(data.collect())

[('leadership', 1), ('human', 1), ('bcom', 1), ('provide', 3), ('even', 1), ('perform', 1), ('set', 1), ('than', 1), ('several', 1), ('levels', 1), ('science', 1), ('officer', 1), ('ceo', 1), ('typically', 1), ('large', 1), ('becoming', 1), ('commerce', 1), ('resources', 2), ('pyramid', 1), ('examples', 1), ('voluntary', 1), ('objectives', 1), ('larger', 2), ('may', 2), ('managers', 13), ('leaders', 1), ('make', 1), ('manager', 1), ('middle', 3), ('there', 2), ('strategic', 2), ('financial', 1), ('chief', 1), ('overall', 1), ('researchers', 1), ('have', 1), ('volunteers', 2), ('social', 2), ('more', 1), ('roles', 3), ('work', 2), ('efforts', 1), ('operate', 1), ('degrees', 1), ('communicate', 1), ('term', 1), ('professors', 1), ('would', 1), ('three', 1), ('include', 1), ('refer', 1), ('team', 1), ('bachelor', 1), ('master', 2), ('doctor', 1), ('whose', 1), ('individuals', 1), ('sector', 1), ('academic', 1), ('discipline', 1), ('public', 2), ('management', 9), ('organizations', 4), ('l

**6. Read the file, generate words, trim each word, put in lower case, replace special char, filter out "empty" word, count each word, sort by count ASC and print the word and the count:**

In [16]:
import re
data = (word_rdd
        .flatMap(lambda line: line.split(" "))
        .map(lambda word: word.strip().lower())
        .map(lambda word: re.sub("[,.:;'\"\\?\\-!\\(\\)]", "", word))
        .filter(lambda word: len(word) > 2)
        .map(lambda word: (word, 1))
        .reduceByKey(lambda a, b: a + b)
        .map(lambda wc: (wc[1], wc[0]))
        .sortByKey(False)
       )

In [17]:
# print each and every word
print(data.collect())

[(22, 'the'), (13, 'managers'), (10, 'and'), (9, 'management'), (9, 'organization'), (5, 'administration'), (4, 'organizations'), (4, 'are'), (4, 'business'), (3, 'provide'), (3, 'middle'), (3, 'roles'), (3, 'senior'), (3, 'direction'), (3, 'frontline'), (3, 'who'), (3, 'such'), (2, 'resources'), (2, 'larger'), (2, 'may'), (2, 'there'), (2, 'strategic'), (2, 'volunteers'), (2, 'social'), (2, 'work'), (2, 'master'), (2, 'public'), (2, 'smaller'), (2, 'its'), (2, 'some'), (2, 'which'), (2, 'goals'), (2, 'also'), (2, 'employees'), (1, 'leadership'), (1, 'human'), (1, 'bcom'), (1, 'even'), (1, 'perform'), (1, 'set'), (1, 'than'), (1, 'several'), (1, 'levels'), (1, 'science'), (1, 'officer'), (1, 'ceo'), (1, 'typically'), (1, 'large'), (1, 'becoming'), (1, 'commerce'), (1, 'pyramid'), (1, 'examples'), (1, 'voluntary'), (1, 'objectives'), (1, 'leaders'), (1, 'make'), (1, 'manager'), (1, 'financial'), (1, 'chief'), (1, 'overall'), (1, 'researchers'), (1, 'have'), (1, 'more'), (1, 'efforts'), 

**
7 .  
a. Read the file, generate words, replace special char, trim and lowercase, filter out "empty", count each word, sort by count ASC  
b. store the result in memory  
c. print the word and the count  
d. print the word only  
e. print the number of word that start with each char for only the word with more than 5 occurrences. The result is sorted by count ASC  
**

In [18]:
# a. generate words, replace special char, trim and lowercase, filter out "empty", count each word, sort by count ASC
import re
data = (word_rdd
        .flatMap(lambda line: line.split(" "))
        .map(lambda word: word.strip().lower())
        .map(lambda word: re.sub("[,.:;'\"\\?\\-!\\(\\)]", "", word))
        .filter(lambda word: len(word) > 2)
        .map(lambda word: (word, 1))
        .reduceByKey(lambda a, b: a + b)
        .map(lambda wc: (wc[1], wc[0]))
        .sortByKey(False)
       )

In [19]:
# b. store the result in memory
from pyspark import StorageLevel
data.persist(StorageLevel.MEMORY_ONLY)

PythonRDD[31] at RDD at PythonRDD.scala:48

In [20]:
# c. print the word and the count
print(data.map(lambda wc: (wc[1], wc[0])).collect())

[('the', 22), ('managers', 13), ('and', 10), ('management', 9), ('organization', 9), ('administration', 5), ('organizations', 4), ('are', 4), ('business', 4), ('provide', 3), ('middle', 3), ('roles', 3), ('senior', 3), ('direction', 3), ('frontline', 3), ('who', 3), ('such', 3), ('resources', 2), ('larger', 2), ('may', 2), ('there', 2), ('strategic', 2), ('volunteers', 2), ('social', 2), ('work', 2), ('master', 2), ('public', 2), ('smaller', 2), ('its', 2), ('some', 2), ('which', 2), ('goals', 2), ('also', 2), ('employees', 2), ('leadership', 1), ('human', 1), ('bcom', 1), ('even', 1), ('perform', 1), ('set', 1), ('than', 1), ('several', 1), ('levels', 1), ('science', 1), ('officer', 1), ('ceo', 1), ('typically', 1), ('large', 1), ('becoming', 1), ('commerce', 1), ('pyramid', 1), ('examples', 1), ('voluntary', 1), ('objectives', 1), ('leaders', 1), ('make', 1), ('manager', 1), ('financial', 1), ('chief', 1), ('overall', 1), ('researchers', 1), ('have', 1), ('more', 1), ('efforts', 1), 

In [21]:
# d. print the word only
print(data.map(lambda wc: wc[1]).collect())

['the', 'managers', 'and', 'management', 'organization', 'administration', 'organizations', 'are', 'business', 'provide', 'middle', 'roles', 'senior', 'direction', 'frontline', 'who', 'such', 'resources', 'larger', 'may', 'there', 'strategic', 'volunteers', 'social', 'work', 'master', 'public', 'smaller', 'its', 'some', 'which', 'goals', 'also', 'employees', 'leadership', 'human', 'bcom', 'even', 'perform', 'set', 'than', 'several', 'levels', 'science', 'officer', 'ceo', 'typically', 'large', 'becoming', 'commerce', 'pyramid', 'examples', 'voluntary', 'objectives', 'leaders', 'make', 'manager', 'financial', 'chief', 'overall', 'researchers', 'have', 'more', 'efforts', 'operate', 'degrees', 'communicate', 'term', 'professors', 'would', 'three', 'include', 'refer', 'team', 'bachelor', 'doctor', 'whose', 'individuals', 'sector', 'academic', 'discipline', 'lower', 'scopes', 'board', 'mpa', 'regional', 'them', 'executive', 'organizational', 'regular', 'coordinating', 'oversee', 'activities'

In [22]:
# e. print the number of word that start with each char for only the word with more than 5 occurrences. 
# The result is sorted by count ASC
data.filter(lambda cw: cw[0] > 5).map(lambda cw: (cw[1][0], cw[0])).reduceByKey(lambda a, b: a + b).map(lambda cw: (cw[1], cw[0])).sortByKey().collect()

[(9, 'o'), (10, 'a'), (22, 'm'), (22, 't')]

**
8 .  
a. Read the file, generate words, count each word  
b. store the word splits in a file  
c. store the word counts in a file    
**

In [23]:
# a. generate words, replace special char, trim and lowercase, filter out "empty", count each word, sort by count ASC
import re
splits = (word_rdd
        .flatMap(lambda line: line.split(" "))
        .map(lambda word: word.strip().lower())
        .map(lambda word: re.sub("[,.:;'\"\\?\\-!\\(\\)]", "", word))
        .filter(lambda word: len(word) > 2)
        .map(lambda word: (word, 1))
       )

In [24]:
# b. store the word splits in a file
splits.coalesce(1).saveAsTextFile('splitoutput')

In [25]:
counts = splits.reduceByKey(lambda a, b: a + b)

In [26]:
# c. store the word counts in a file
counts.coalesce(1).saveAsTextFile('countoutput')

In [27]:
spark.stop()