In [1]:
#  Replace with your container and storage account:  "wasbs://<container>@<storage account>.blob.core.windows.net/"
pathPrefix = "wasbs://ted@vpldb.blob.core.windows.net/"
path = pathPrefix + "ted_main.csv"

# Ted Talks Data set

Taken from https://www.kaggle.com/rounakbanik/ted-talks/data

In [3]:
import csv
import StringIO

# Load the data as one big string
# We do this because Spark is unable to parse the CSV correctly due to some escaping
text = sc.wholeTextFiles(path).take(1)[0][1]
# Use Python's csv module to parse the content
lines = [v for v in csv.reader(StringIO.StringIO(text.encode('utf8', 'ignore')))]
# Take the first row as column names
columnNames = lines[0]
# Take the rest of the rows as content
content = sc.parallelize(lines[1:])
# Filter out rows that wouldn't have the right number of columns
compliant = content.filter(lambda v: len(v)==len(columnNames))
# Map list-rows to dictionaries using the column names
talkDict = compliant.map(lambda r: dict(zip(columnNames, r)))

In [4]:
def parse(singleQuotedJson):
  import ast
  
  return ast.literal_eval(singleQuotedJson)
  
def reworkFields(d):
  # Parse integers since Python's CSV parser only parse strings
  d['comments'] = int(d['comments'])
  d['duration'] = int(d['duration'])
  d['film_date'] = int(d['film_date'])
  d['num_speaker'] = int(d['num_speaker'])
  d['published_date'] = int(d['published_date'])
  d['views'] = int(d['views'])
  
  # Parse json columns (into dictionaries)
  d['ratings'] = parse(d['ratings'])
  d['related_talks'] = parse(d['related_talks'])
  d['tags'] = parse(d['tags'])

  return d

def cleanDenormalizedAttributes(dict):
  # Remove denormalized properties
  del(dict['ratings'])
  del(dict['related_talks'])
  del(dict['tags'])
  
  return dict

In [5]:
# Rework some fields
cleanFields = talkDict.map(lambda r: reworkFields(r))
# Extract ratings as a separate RDD linked to the talks one with the talk name
ratings = cleanFields.flatMap(lambda d: [{'talkName':d['name'], 'id':r['id'], 'name':r['name'], 'count':r['count']} for r in d['ratings']])
# Extract related talks, similarly linked to talk name
relatedTalks = cleanFields.flatMap(lambda d: [{'talkName':d['name'], 'relatedTalkName':r['title']} for r in d['related_talks']])
# Extract tags, similarly linked to talk name
tags = cleanFields.flatMap(lambda d: [{'talkName':d['name'], 'tag':t} for t in d['tags']])
# Normalize the talkDict by removing denormalized attributes
normalizedTalks = cleanFields.map(lambda d:  cleanDenormalizedAttributes(d))

In [6]:
from pyspark.sql import Row

# Create data frames, cache them and register them as temp views
normalizedTalksDf = spark.createDataFrame(normalizedTalks.map(lambda d: Row(**d)))
normalizedTalksDf.cache()
normalizedTalksDf.createOrReplaceTempView("talks")

ratingsDf = spark.createDataFrame(ratings.map(lambda d: Row(**d)))
ratingsDf.cache()
ratingsDf.createOrReplaceTempView("ratings")

relatedTalksDf = spark.createDataFrame(relatedTalks.map(lambda d: Row(**d)))
relatedTalksDf.cache()
relatedTalksDf.createOrReplaceTempView("relatedTalks")

tagsDf = spark.createDataFrame(tags.map(lambda d: Row(**d)))
tagsDf.cache()
tagsDf.createOrReplaceTempView("tags")

In [7]:
%sql

SELECT
(
  SELECT COUNT(*)
  FROM talks
) AS talkCount,
(
  SELECT COUNT(*)
  FROM ratings
) AS ratingTalkCount,
(
  SELECT COUNT(*)
  FROM relatedTalks
) AS relatedTalkCount,
(
  SELECT COUNT(*)
  FROM tags
) AS tagCount


In [8]:
%sql

SELECT title, main_speaker, views, ROUND(1000000*comments/views, 1) AS commentsPerMillionViews
FROM talks
ORDER BY views DESC
LIMIT 10

In [9]:
%sql

SELECT ROUND(AVG(t.views)) as avgViews, tg.tag
FROM talks AS t
INNER JOIN tags tg ON tg.talkName=t.name
GROUP BY tg.tag
ORDER BY avgViews DESC


In [10]:
%sql

SELECT name, SUM(count) AS ratingCount
FROM ratings
GROUP BY name
ORDER BY ratingCount DESC


In [11]:
%sql

SELECT t.title, t.main_speaker, t.views, r.count
FROM talks AS t
INNER JOIN ratings AS r ON r.talkName = t.name AND r.name="Inspiring"
ORDER BY r.count DESC
