{"cells":[{"cell_type":"code","source":["# Replace with your container and storage account: \"wasbs://@.blob.core.windows.net/\"\npathPrefix = \"wasbs://ted@vpldb.blob.core.windows.net/\"\npath = pathPrefix + \"ted_main.csv\""],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"markdown","source":["# Ted Talks Data set\n\nTaken from https://www.kaggle.com/rounakbanik/ted-talks/data"],"metadata":{}},{"cell_type":"code","source":["import csv\nimport StringIO\n\n# Load the data as one big string\n# We do this because Spark is unable to parse the CSV correctly due to some escaping\ntext = sc.wholeTextFiles(path).take(1)[0][1]\n# Use Python's csv module to parse the content\nlines = [v for v in csv.reader(StringIO.StringIO(text.encode('utf8', 'ignore')))]\n# Take the first row as column names\ncolumnNames = lines[0]\n# Take the rest of the rows as content\ncontent = sc.parallelize(lines[1:])\n# Filter out rows that wouldn't have the right number of columns\ncompliant = content.filter(lambda v: len(v)==len(columnNames))\n# Map list-rows to dictionaries using the column names\ntalkDict = compliant.map(lambda r: dict(zip(columnNames, r)))"],"metadata":{},"outputs":[],"execution_count":3},{"cell_type":"code","source":["def parse(singleQuotedJson):\n import ast\n \n return ast.literal_eval(singleQuotedJson)\n \ndef reworkFields(d):\n # Parse integers since Python's CSV parser only parse strings\n d['comments'] = int(d['comments'])\n d['duration'] = int(d['duration'])\n d['film_date'] = int(d['film_date'])\n d['num_speaker'] = int(d['num_speaker'])\n d['published_date'] = int(d['published_date'])\n d['views'] = int(d['views'])\n \n # Parse json columns (into dictionaries)\n d['ratings'] = parse(d['ratings'])\n d['related_talks'] = parse(d['related_talks'])\n d['tags'] = parse(d['tags'])\n\n return d\n\ndef cleanDenormalizedAttributes(dict):\n # Remove denormalized properties\n del(dict['ratings'])\n del(dict['related_talks'])\n del(dict['tags'])\n \n return dict"],"metadata":{},"outputs":[],"execution_count":4},{"cell_type":"code","source":["# Rework some fields\ncleanFields = talkDict.map(lambda r: reworkFields(r))\n# Extract ratings as a separate RDD linked to the talks one with the talk name\nratings = cleanFields.flatMap(lambda d: [{'talkName':d['name'], 'id':r['id'], 'name':r['name'], 'count':r['count']} for r in d['ratings']])\n# Extract related talks, similarly linked to talk name\nrelatedTalks = cleanFields.flatMap(lambda d: [{'talkName':d['name'], 'relatedTalkName':r['title']} for r in d['related_talks']])\n# Extract tags, similarly linked to talk name\ntags = cleanFields.flatMap(lambda d: [{'talkName':d['name'], 'tag':t} for t in d['tags']])\n# Normalize the talkDict by removing denormalized attributes\nnormalizedTalks = cleanFields.map(lambda d: cleanDenormalizedAttributes(d))"],"metadata":{},"outputs":[],"execution_count":5},{"cell_type":"code","source":["from pyspark.sql import Row\n\n# Create data frames, cache them and register them as temp views\nnormalizedTalksDf = spark.createDataFrame(normalizedTalks.map(lambda d: Row(**d)))\nnormalizedTalksDf.cache()\nnormalizedTalksDf.createOrReplaceTempView(\"talks\")\n\nratingsDf = spark.createDataFrame(ratings.map(lambda d: Row(**d)))\nratingsDf.cache()\nratingsDf.createOrReplaceTempView(\"ratings\")\n\nrelatedTalksDf = spark.createDataFrame(relatedTalks.map(lambda d: Row(**d)))\nrelatedTalksDf.cache()\nrelatedTalksDf.createOrReplaceTempView(\"relatedTalks\")\n\ntagsDf = spark.createDataFrame(tags.map(lambda d: Row(**d)))\ntagsDf.cache()\ntagsDf.createOrReplaceTempView(\"tags\")"],"metadata":{},"outputs":[],"execution_count":6},{"cell_type":"code","source":["%sql\n\nSELECT\n(\n SELECT COUNT(*)\n FROM talks\n) AS talkCount,\n(\n SELECT COUNT(*)\n FROM ratings\n) AS ratingTalkCount,\n(\n SELECT COUNT(*)\n FROM relatedTalks\n) AS relatedTalkCount,\n(\n SELECT COUNT(*)\n FROM tags\n) AS tagCount\n"],"metadata":{},"outputs":[],"execution_count":7},{"cell_type":"code","source":["%sql\n\nSELECT title, main_speaker, views, ROUND(1000000*comments/views, 1) AS commentsPerMillionViews\nFROM talks\nORDER BY views DESC\nLIMIT 10"],"metadata":{},"outputs":[],"execution_count":8},{"cell_type":"code","source":["%sql\n\nSELECT ROUND(AVG(t.views)) as avgViews, tg.tag\nFROM talks AS t\nINNER JOIN tags tg ON tg.talkName=t.name\nGROUP BY tg.tag\nORDER BY avgViews DESC\n"],"metadata":{},"outputs":[],"execution_count":9},{"cell_type":"code","source":["%sql\n\nSELECT name, SUM(count) AS ratingCount\nFROM ratings\nGROUP BY name\nORDER BY ratingCount DESC\n"],"metadata":{},"outputs":[],"execution_count":10},{"cell_type":"code","source":["%sql\n\nSELECT t.title, t.main_speaker, t.views, r.count\nFROM talks AS t\nINNER JOIN ratings AS r ON r.talkName = t.name AND r.name=\"Inspiring\"\nORDER BY r.count DESC\n"],"metadata":{},"outputs":[],"execution_count":11},{"cell_type":"code","source":[""],"metadata":{},"outputs":[],"execution_count":12}],"metadata":{"name":"ted","notebookId":4192747449792805},"nbformat":4,"nbformat_minor":0}