{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 2019-05-31\n",
    "Copy of Tiziano Piccardi's [DataSetAnonymization](https://github.com/epfl-dlab/WikipediaCitationUsage/blob/master/DatasetAnonymization.ipynb), altered to preserve `useragent.is_bot` in both citationusage and citationusagepageload tables."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "import re\n",
    "import pyspark.sql\n",
    "from pyspark.sql import *\n",
    "import pandas as pd\n",
    "import matplotlib.pyplot as plt\n",
    "import hashlib\n",
    "import os.path\n",
    "from pyspark.sql.functions import *\n",
    "from datetime import timedelta, date\n",
    "\n",
    "\n",
    "%matplotlib inline\n",
    "spark_hive = pyspark.sql.HiveContext(sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create alternative session ID to replace session_token"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+--------------------+\n",
      "|session_id|       session_token|\n",
      "+----------+--------------------+\n",
      "|         0|e7e800a1b82909b4c037|\n",
      "|         1|77f3df30d8f9c579d6b9|\n",
      "|         2|3de8636a9aaff633c5f2|\n",
      "|         3|f1829efe15aa1e26d59e|\n",
      "|         4|9b7ebe75853b1d8b75f4|\n",
      "|         5|6362ed76a063f821ed33|\n",
      "|         6|3e4e49cc8cf0d800587b|\n",
      "|         7|529ba0f5bd46fb0f51eb|\n",
      "|         8|926758e0d73fbcaf2bbb|\n",
      "|         9|ed34a5bcac1b29495f70|\n",
      "|        10|489e7f3c58cc2a6227bc|\n",
      "|        11|bcf4b3768079ed4ce754|\n",
      "|        12|2863173fc6f780634769|\n",
      "|        13|5759f8e9386d19ff5782|\n",
      "|        14|8466e9dfc2a38e7adde6|\n",
      "|        15|9a77459c516a1792a4a8|\n",
      "|        16|e77c252e139768583f51|\n",
      "|        17|264ee2575f6f4e4255e2|\n",
      "|        18|578fccd38d53a295f59f|\n",
      "|        19|cb014c9b031a633a776e|\n",
      "+----------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pageloads = spark.sql(\"select event.session_token from event.citationusagepageload\").distinct()\n",
    "citationusage = spark.sql(\"select event.session_token from event.citationusage\").distinct()\n",
    "\n",
    "all_tokens = pageloads.union(citationusage).distinct().rdd.zipWithIndex()\n",
    "session_ids = sqlContext.createDataFrame(all_tokens.map(lambda r: \n",
    "                                                         Row(session_token=r[0].session_token, session_id=r[1])))\n",
    "\n",
    "session_ids.cache().write.parquet(\"session_ids.parquet\")\n",
    "session_ids.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Get anonymous edits"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[page_id: bigint, edit_year: int, edit_month: int, edit_day: int, edit_hour: int, ip: string]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "query = \"\"\"\n",
    "SELECT page_id, year(event_timestamp) edit_year, month(event_timestamp) edit_month, \n",
    "        dayofmonth(event_timestamp) edit_day, hour(event_timestamp) edit_hour,\n",
    "        event_user_text_historical ip\n",
    "FROM wmf.mediawiki_history \n",
    "WHERE wiki_db = 'enwiki'\n",
    "AND event_user_is_anonymous = TRUE\n",
    "AND to_timestamp(event_timestamp) > '2019-03-01'\n",
    "AND page_namespace = 0\n",
    "AND page_is_redirect = FALSE\n",
    "\"\"\"\n",
    "\n",
    "anonymous_edits = spark.sql(query).distinct()\n",
    "anonymous_edits"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Get page loads and clicks events"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[dt: string, event: struct<action:string,dom_interactive_time:bigint,event_offset_time:bigint,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,session_token:string,skin:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, topic: string, year: bigint, month: bigint, day: bigint, hour: bigint]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pageloads = spark.sql(\"select * from event.citationusagepageload\")\n",
    "pageloads"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Total pageload events:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1829735489"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pageloads_original = pageloads.count()\n",
    "pageloads_original"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Total unique sessions:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "956199928"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pageloads_original_sessions = pageloads.select(\"event.session_token\").distinct().count()\n",
    "pageloads_original_sessions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "----"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[dt: string, event: struct<action:string,citation_in_text_refs:bigint,dom_interactive_time:bigint,event_offset_time:bigint,ext_position:bigint,footnote_number:bigint,freely_accessible:boolean,in_infobox:boolean,link_occurrence:bigint,link_text:string,link_url:string,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,section_id:string,session_token:string,skin:string,citation_identifier_label:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, topic: string, year: bigint, month: bigint, day: bigint, hour: bigint]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "citationusage = spark.sql(\"select * from event.citationusage\")\n",
    "citationusage"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Total click events:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "117186592"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "citationusage_original = citationusage.count()\n",
    "citationusage_original"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Unique sessions:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "75305966"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "citationusage_original_sessions = citationusage.select(\"event.session_token\").distinct().count()\n",
    "citationusage_original_sessions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "----\n",
    "## Get the session tokens with an edit"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[session_token: string]"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sessions_with_edits = pageloads.join(anonymous_edits, pageloads.ip == anonymous_edits.ip)\\\n",
    "                                .where(pageloads.year == anonymous_edits.edit_year)\\\n",
    "                                .where(pageloads.month == anonymous_edits.edit_month)\\\n",
    "                                .where(pageloads.day == anonymous_edits.edit_day)\\\n",
    "                                .where(pageloads.hour == anonymous_edits.edit_hour)\\\n",
    "                                .where(pageloads.event.page_id == anonymous_edits.page_id)\\\n",
    "                                .select(\"event.session_token\").distinct()\n",
    "\n",
    "sessions_with_edits"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Count the sessions to excude:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "118919"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sessions_with_edits.cache().count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Left join to keep only the pageloads of the sessions without edits:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[dt: string, event: struct<action:string,dom_interactive_time:bigint,event_offset_time:bigint,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,session_token:string,skin:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, topic: string, year: bigint, month: bigint, day: bigint, hour: bigint]"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sessions_with_edits.registerTempTable(\"sessions_with_edits\")\n",
    "\n",
    "query = \"\"\"\n",
    "SELECT p.*\n",
    "FROM event.citationusagepageload p\n",
    "LEFT JOIN sessions_with_edits s\n",
    "ON p.event.session_token = s.session_token\n",
    "WHERE s.session_token IS NULL\n",
    "\"\"\"\n",
    "\n",
    "pageloads_clean = spark.sql(query)\n",
    "pageloads_clean"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get the number of pageload events after the cleaning:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "pageloads_anonymized = pageloads_clean.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get the number of individual sessions:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "pageloads_anonymized_sessions = pageloads_clean.select(\"event.session_token\").distinct().count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Add the session id and drop the critical fields:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- dt: string (nullable = true)\n",
      " |-- event: struct (nullable = true)\n",
      " |    |-- action: string (nullable = true)\n",
      " |    |-- dom_interactive_time: long (nullable = true)\n",
      " |    |-- event_offset_time: long (nullable = true)\n",
      " |    |-- mode: string (nullable = true)\n",
      " |    |-- namespace_id: long (nullable = true)\n",
      " |    |-- page_id: long (nullable = true)\n",
      " |    |-- page_title: string (nullable = true)\n",
      " |    |-- page_token: string (nullable = true)\n",
      " |    |-- referrer: string (nullable = true)\n",
      " |    |-- revision_id: long (nullable = true)\n",
      " |    |-- session_token: string (nullable = true)\n",
      " |    |-- skin: string (nullable = true)\n",
      " |-- ip: string (nullable = true)\n",
      " |-- recvfrom: string (nullable = true)\n",
      " |-- revision: long (nullable = true)\n",
      " |-- schema: string (nullable = true)\n",
      " |-- seqid: long (nullable = true)\n",
      " |-- useragent: struct (nullable = true)\n",
      " |    |-- browser_family: string (nullable = true)\n",
      " |    |-- browser_major: string (nullable = true)\n",
      " |    |-- browser_minor: string (nullable = true)\n",
      " |    |-- device_family: string (nullable = true)\n",
      " |    |-- is_bot: boolean (nullable = true)\n",
      " |    |-- is_mediawiki: boolean (nullable = true)\n",
      " |    |-- os_family: string (nullable = true)\n",
      " |    |-- os_major: string (nullable = true)\n",
      " |    |-- os_minor: string (nullable = true)\n",
      " |    |-- wmf_app_version: string (nullable = true)\n",
      " |-- uuid: string (nullable = true)\n",
      " |-- webhost: string (nullable = true)\n",
      " |-- wiki: string (nullable = true)\n",
      " |-- geocoded_data: map (nullable = true)\n",
      " |    |-- key: string\n",
      " |    |-- value: string (valueContainsNull = true)\n",
      " |-- topic: string (nullable = true)\n",
      " |-- year: long (nullable = true)\n",
      " |-- month: long (nullable = true)\n",
      " |-- day: long (nullable = true)\n",
      " |-- hour: long (nullable = true)\n",
      " |-- session_id: long (nullable = true)\n",
      " |-- event_time: timestamp (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "anonymous_pageloads_nested = pageloads_clean\\\n",
    "        .alias(\"pl\")\\\n",
    "        .join(session_ids.alias(\"ids\"), pageloads_clean.event.session_token == session_ids.session_token)\\\n",
    "        .select([\"pl.*\", \"ids.session_id\", to_timestamp(\"pl.dt\").alias(\"event_time\")])\n",
    "\n",
    "anonymous_pageloads_nested.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- session_id: long (nullable = true)\n",
      " |-- event_time: timestamp (nullable = true)\n",
      " |-- action: string (nullable = true)\n",
      " |-- dom_interactive_time: long (nullable = true)\n",
      " |-- event_offset_time: long (nullable = true)\n",
      " |-- mode: string (nullable = true)\n",
      " |-- namespace_id: long (nullable = true)\n",
      " |-- page_id: long (nullable = true)\n",
      " |-- page_title: string (nullable = true)\n",
      " |-- page_token: string (nullable = true)\n",
      " |-- referrer: string (nullable = true)\n",
      " |-- revision_id: long (nullable = true)\n",
      " |-- skin: string (nullable = true)\n",
      " |-- useragent_is_bot: boolean (nullable = true)\n",
      " |-- recvfrom: string (nullable = true)\n",
      " |-- revision: long (nullable = true)\n",
      " |-- schema: string (nullable = true)\n",
      " |-- seqid: long (nullable = true)\n",
      " |-- uuid: string (nullable = true)\n",
      " |-- webhost: string (nullable = true)\n",
      " |-- wiki: string (nullable = true)\n",
      " |-- geocoded_data: map (nullable = true)\n",
      " |    |-- key: string\n",
      " |    |-- value: string (valueContainsNull = true)\n",
      " |-- year: long (nullable = true)\n",
      " |-- month: long (nullable = true)\n",
      " |-- day: long (nullable = true)\n",
      " |-- hour: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "anonymous_pageloads_nested.registerTempTable('anonymous_pageloads_nested')\n",
    "\n",
    "query = \"\"\"\n",
    "SELECT session_id, event_time, event.*, useragent.is_bot as useragent_is_bot,\n",
    "        recvfrom, revision, schema, seqid, \n",
    "        uuid, webhost, wiki, geocoded_data, \n",
    "        year, month, day, hour \n",
    "FROM anonymous_pageloads_nested\n",
    "\"\"\"\n",
    "\n",
    "anonymous_pageloads = spark.sql(query).drop('session_token')\n",
    "anonymous_pageloads.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Sanity check:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1828725320"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "anonymous_pageloads_count = anonymous_pageloads.count()\n",
    "anonymous_pageloads_count"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Sanity check, number of unique sessions:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "956081009"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "anonymous_pageloads_sessions_count = anonymous_pageloads.select(\"session_id\").distinct().count()\n",
    "anonymous_pageloads_sessions_count"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "anonymous_pageloads.write.parquet(\"anonymous_pageloads_april.parquet\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "----"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Anonymize the citation usage table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- dt: string (nullable = true)\n",
      " |-- event: struct (nullable = true)\n",
      " |    |-- action: string (nullable = true)\n",
      " |    |-- citation_in_text_refs: long (nullable = true)\n",
      " |    |-- dom_interactive_time: long (nullable = true)\n",
      " |    |-- event_offset_time: long (nullable = true)\n",
      " |    |-- ext_position: long (nullable = true)\n",
      " |    |-- footnote_number: long (nullable = true)\n",
      " |    |-- freely_accessible: boolean (nullable = true)\n",
      " |    |-- in_infobox: boolean (nullable = true)\n",
      " |    |-- link_occurrence: long (nullable = true)\n",
      " |    |-- link_text: string (nullable = true)\n",
      " |    |-- link_url: string (nullable = true)\n",
      " |    |-- mode: string (nullable = true)\n",
      " |    |-- namespace_id: long (nullable = true)\n",
      " |    |-- page_id: long (nullable = true)\n",
      " |    |-- page_title: string (nullable = true)\n",
      " |    |-- page_token: string (nullable = true)\n",
      " |    |-- referrer: string (nullable = true)\n",
      " |    |-- revision_id: long (nullable = true)\n",
      " |    |-- section_id: string (nullable = true)\n",
      " |    |-- session_token: string (nullable = true)\n",
      " |    |-- skin: string (nullable = true)\n",
      " |    |-- citation_identifier_label: string (nullable = true)\n",
      " |-- ip: string (nullable = true)\n",
      " |-- recvfrom: string (nullable = true)\n",
      " |-- revision: long (nullable = true)\n",
      " |-- schema: string (nullable = true)\n",
      " |-- seqid: long (nullable = true)\n",
      " |-- useragent: struct (nullable = true)\n",
      " |    |-- browser_family: string (nullable = true)\n",
      " |    |-- browser_major: string (nullable = true)\n",
      " |    |-- browser_minor: string (nullable = true)\n",
      " |    |-- device_family: string (nullable = true)\n",
      " |    |-- is_bot: boolean (nullable = true)\n",
      " |    |-- is_mediawiki: boolean (nullable = true)\n",
      " |    |-- os_family: string (nullable = true)\n",
      " |    |-- os_major: string (nullable = true)\n",
      " |    |-- os_minor: string (nullable = true)\n",
      " |    |-- wmf_app_version: string (nullable = true)\n",
      " |-- uuid: string (nullable = true)\n",
      " |-- webhost: string (nullable = true)\n",
      " |-- wiki: string (nullable = true)\n",
      " |-- geocoded_data: map (nullable = true)\n",
      " |    |-- key: string\n",
      " |    |-- value: string (valueContainsNull = true)\n",
      " |-- topic: string (nullable = true)\n",
      " |-- year: long (nullable = true)\n",
      " |-- month: long (nullable = true)\n",
      " |-- day: long (nullable = true)\n",
      " |-- hour: long (nullable = true)\n",
      " |-- session_id: long (nullable = true)\n",
      " |-- event_time: timestamp (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "anonymous_citationusage_nested = citationusage\\\n",
    "        .alias(\"cit\")\\\n",
    "        .join(session_ids.alias(\"ids\"), citationusage.event.session_token == session_ids.session_token)\\\n",
    "        .select([\"cit.*\", \"ids.session_id\", to_timestamp(\"cit.dt\").alias(\"event_time\")])\n",
    "\n",
    "anonymous_citationusage_nested.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- session_id: long (nullable = true)\n",
      " |-- event_time: timestamp (nullable = true)\n",
      " |-- action: string (nullable = true)\n",
      " |-- citation_in_text_refs: long (nullable = true)\n",
      " |-- dom_interactive_time: long (nullable = true)\n",
      " |-- event_offset_time: long (nullable = true)\n",
      " |-- ext_position: long (nullable = true)\n",
      " |-- footnote_number: long (nullable = true)\n",
      " |-- freely_accessible: boolean (nullable = true)\n",
      " |-- in_infobox: boolean (nullable = true)\n",
      " |-- link_occurrence: long (nullable = true)\n",
      " |-- link_text: string (nullable = true)\n",
      " |-- link_url: string (nullable = true)\n",
      " |-- mode: string (nullable = true)\n",
      " |-- namespace_id: long (nullable = true)\n",
      " |-- page_id: long (nullable = true)\n",
      " |-- page_title: string (nullable = true)\n",
      " |-- page_token: string (nullable = true)\n",
      " |-- referrer: string (nullable = true)\n",
      " |-- revision_id: long (nullable = true)\n",
      " |-- section_id: string (nullable = true)\n",
      " |-- skin: string (nullable = true)\n",
      " |-- citation_identifier_label: string (nullable = true)\n",
      " |-- useragent_is_bot: boolean (nullable = true)\n",
      " |-- recvfrom: string (nullable = true)\n",
      " |-- revision: long (nullable = true)\n",
      " |-- schema: string (nullable = true)\n",
      " |-- seqid: long (nullable = true)\n",
      " |-- uuid: string (nullable = true)\n",
      " |-- webhost: string (nullable = true)\n",
      " |-- wiki: string (nullable = true)\n",
      " |-- geocoded_data: map (nullable = true)\n",
      " |    |-- key: string\n",
      " |    |-- value: string (valueContainsNull = true)\n",
      " |-- year: long (nullable = true)\n",
      " |-- month: long (nullable = true)\n",
      " |-- day: long (nullable = true)\n",
      " |-- hour: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "anonymous_citationusage_nested.registerTempTable('anonymous_citationusage_nested')\n",
    "\n",
    "query = \"\"\"\n",
    "SELECT session_id, event_time, event.*, useragent.is_bot as useragent_is_bot,\n",
    "        recvfrom, revision, schema, seqid, \n",
    "        uuid, webhost, wiki, geocoded_data, \n",
    "        year, month, day, hour \n",
    "FROM anonymous_citationusage_nested\n",
    "\"\"\"\n",
    "\n",
    "anonymous_citationusage = spark.sql(query).drop('session_token')\n",
    "anonymous_citationusage.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Sanity check:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "117186558"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "citationusage_anonymized = anonymous_citationusage.count()\n",
    "citationusage_anonymized"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Sanity check, number of unique sessions:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "75305945"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "citationusage_anonymized_sessions = anonymous_citationusage.select(\"session_id\").distinct().count()\n",
    "citationusage_anonymized_sessions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "anonymous_citationusage.write.parquet(\"anonymous_citationusage_april.parquet\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "------\n",
    "### Click events\n",
    "Number of removed events:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Number of clicks events removed:\n",
      "34\n",
      "Percentage of the dataset removed:\n",
      "2.9013558138118737e-07\n"
     ]
    }
   ],
   "source": [
    "print(\"Number of clicks events removed:\")\n",
    "print(citationusage_original - citationusage_anonymized)\n",
    "print(\"Percentage of the dataset removed:\")\n",
    "print((citationusage_original - citationusage_anonymized)/citationusage_original)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Number of removed sessions:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Number of sessions removed:\n",
      "21\n",
      "Percentage of the dataset removed:\n",
      "2.7886236795634494e-07\n"
     ]
    }
   ],
   "source": [
    "print(\"Number of sessions removed:\")\n",
    "print(citationusage_original_sessions - citationusage_anonymized_sessions)\n",
    "print(\"Percentage of the dataset removed:\")\n",
    "print((citationusage_original_sessions - citationusage_anonymized_sessions)/citationusage_original_sessions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "----\n",
    "### Pageloads"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Number of pageload removed removed:\n",
      "1010169\n",
      "Percentage of the dataset removed:\n",
      "0.0005520847172025311\n"
     ]
    }
   ],
   "source": [
    "print(\"Number of pageload removed removed:\")\n",
    "print(pageloads_original - pageloads_anonymized)\n",
    "print(\"Percentage of the dataset removed:\")\n",
    "print((pageloads_original - pageloads_anonymized)/pageloads_original)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Number of sessions removed:\n",
      "118919\n",
      "Percentage of the dataset removed:\n",
      "0.00012436625073663466\n"
     ]
    }
   ],
   "source": [
    "print(\"Number of sessions removed:\")\n",
    "print(pageloads_original_sessions - pageloads_anonymized_sessions)\n",
    "print(\"Percentage of the dataset removed:\")\n",
    "print((pageloads_original_sessions - pageloads_anonymized_sessions)/pageloads_original_sessions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "----"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Sanity checks\n",
    "\n",
    "Release cache:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.catalog.clearCache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[session_id: bigint, event_time: timestamp, action: string, citation_in_text_refs: bigint, dom_interactive_time: bigint, event_offset_time: bigint, ext_position: bigint, footnote_number: bigint, freely_accessible: boolean, in_infobox: boolean, link_occurrence: bigint, link_text: string, link_url: string, mode: string, namespace_id: bigint, page_id: bigint, page_title: string, page_token: string, referrer: string, revision_id: bigint, section_id: string, skin: string, citation_identifier_label: string, useragent_is_bot: boolean, recvfrom: string, revision: bigint, schema: string, seqid: bigint, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]"
      ]
     },
     "execution_count": 30,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "anonymous_citationusage = spark.read.parquet(\"anonymous_citationusage_april.parquet\")\n",
    "anonymous_citationusage"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Check count:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "117186558"
      ]
     },
     "execution_count": 31,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "anonymous_citationusage.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[session_id: bigint, event_time: timestamp, action: string, dom_interactive_time: bigint, event_offset_time: bigint, mode: string, namespace_id: bigint, page_id: bigint, page_title: string, page_token: string, referrer: string, revision_id: bigint, skin: string, useragent_is_bot: boolean, recvfrom: string, revision: bigint, schema: string, seqid: bigint, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]"
      ]
     },
     "execution_count": 32,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "anonymous_pageloads = spark.read.parquet(\"anonymous_pageloads_april.parquet\")\n",
    "anonymous_pageloads"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Check count:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1828723456"
      ]
     },
     "execution_count": 33,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "anonymous_pageloads.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "----\n",
    "# Check on schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- session_id: long (nullable = true)\n",
      " |-- event_time: timestamp (nullable = true)\n",
      " |-- action: string (nullable = true)\n",
      " |-- citation_in_text_refs: long (nullable = true)\n",
      " |-- dom_interactive_time: long (nullable = true)\n",
      " |-- event_offset_time: long (nullable = true)\n",
      " |-- ext_position: long (nullable = true)\n",
      " |-- footnote_number: long (nullable = true)\n",
      " |-- freely_accessible: boolean (nullable = true)\n",
      " |-- in_infobox: boolean (nullable = true)\n",
      " |-- link_occurrence: long (nullable = true)\n",
      " |-- link_text: string (nullable = true)\n",
      " |-- link_url: string (nullable = true)\n",
      " |-- mode: string (nullable = true)\n",
      " |-- namespace_id: long (nullable = true)\n",
      " |-- page_id: long (nullable = true)\n",
      " |-- page_title: string (nullable = true)\n",
      " |-- page_token: string (nullable = true)\n",
      " |-- referrer: string (nullable = true)\n",
      " |-- revision_id: long (nullable = true)\n",
      " |-- section_id: string (nullable = true)\n",
      " |-- skin: string (nullable = true)\n",
      " |-- citation_identifier_label: string (nullable = true)\n",
      " |-- useragent_is_bot: boolean (nullable = true)\n",
      " |-- recvfrom: string (nullable = true)\n",
      " |-- revision: long (nullable = true)\n",
      " |-- schema: string (nullable = true)\n",
      " |-- seqid: long (nullable = true)\n",
      " |-- uuid: string (nullable = true)\n",
      " |-- webhost: string (nullable = true)\n",
      " |-- wiki: string (nullable = true)\n",
      " |-- geocoded_data: map (nullable = true)\n",
      " |    |-- key: string\n",
      " |    |-- value: string (valueContainsNull = true)\n",
      " |-- year: long (nullable = true)\n",
      " |-- month: long (nullable = true)\n",
      " |-- day: long (nullable = true)\n",
      " |-- hour: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "anonymous_citationusage.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- session_id: long (nullable = true)\n",
      " |-- event_time: timestamp (nullable = true)\n",
      " |-- action: string (nullable = true)\n",
      " |-- dom_interactive_time: long (nullable = true)\n",
      " |-- event_offset_time: long (nullable = true)\n",
      " |-- mode: string (nullable = true)\n",
      " |-- namespace_id: long (nullable = true)\n",
      " |-- page_id: long (nullable = true)\n",
      " |-- page_title: string (nullable = true)\n",
      " |-- page_token: string (nullable = true)\n",
      " |-- referrer: string (nullable = true)\n",
      " |-- revision_id: long (nullable = true)\n",
      " |-- skin: string (nullable = true)\n",
      " |-- useragent_is_bot: boolean (nullable = true)\n",
      " |-- recvfrom: string (nullable = true)\n",
      " |-- revision: long (nullable = true)\n",
      " |-- schema: string (nullable = true)\n",
      " |-- seqid: long (nullable = true)\n",
      " |-- uuid: string (nullable = true)\n",
      " |-- webhost: string (nullable = true)\n",
      " |-- wiki: string (nullable = true)\n",
      " |-- geocoded_data: map (nullable = true)\n",
      " |    |-- key: string\n",
      " |    |-- value: string (valueContainsNull = true)\n",
      " |-- year: long (nullable = true)\n",
      " |-- month: long (nullable = true)\n",
      " |-- day: long (nullable = true)\n",
      " |-- hour: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "anonymous_pageloads.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Check at session level"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[event_time: timestamp, session_id: bigint, page_id: bigint, action: string]"
      ]
     },
     "execution_count": 36,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "anonymous_pageloads.registerTempTable('anonymous_pageloads')\n",
    "\n",
    "pageloads_query = \"\"\"\n",
    "select event_time, session_id, page_id, 'pageLoad' as action\n",
    "from anonymous_pageloads\n",
    "where wiki = 'enwiki'\n",
    "AND namespace_id = 0\n",
    "\"\"\"\n",
    "\n",
    "pageloads = spark.sql(pageloads_query)\n",
    "pageloads"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+----------+--------+--------+\n",
      "|         event_time|session_id| page_id|  action|\n",
      "+-------------------+----------+--------+--------+\n",
      "|2019-04-04 06:08:18|   4497935| 3157978|pageLoad|\n",
      "|2019-04-06 14:04:01|   2565696|49312398|pageLoad|\n",
      "|2019-04-06 10:35:38|   3423790|28658666|pageLoad|\n",
      "|2019-03-29 01:03:23|   4172151| 1418277|pageLoad|\n",
      "|2019-03-24 15:34:54|   2019617|52839809|pageLoad|\n",
      "+-------------------+----------+--------+--------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pageloads.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get the unique session IDs to filter the \"citationusage\" table (the pageload is a subsamping). "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [],
   "source": [
    "unique_sessions = pageloads.select(\"session_id\").distinct()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[event_time: timestamp, session_id: bigint, page_id: bigint, action: string]"
      ]
     },
     "execution_count": 39,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "anonymous_citationusage.registerTempTable('anonymous_citationusage')\n",
    "unique_sessions.registerTempTable('unique_sessions')\n",
    "\n",
    "events_query = \"\"\"\n",
    "select event_time, cu.session_id, page_id, action\n",
    "from anonymous_citationusage cu\n",
    "join unique_sessions us\n",
    "on us.session_id=cu.session_id\n",
    "where wiki = 'enwiki'\n",
    "\"\"\"\n",
    "\n",
    "events = spark.sql(events_query)\n",
    "events"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Merge all the events:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [],
   "source": [
    "sessions_rdd = events.rdd.union(pageloads.rdd)\\\n",
    "                        .map(lambda r: (r.session_id, [(r.event_time, r.page_id, r.action)]))\\\n",
    "                        .reduceByKey(lambda a,b: a+b)\\\n",
    "                        .map(lambda r: (r[0], sorted(r[1], key=lambda x: x[0])))\\\n",
    "                        .map(lambda r: Row(session_id=r[0], events=[\n",
    "                                    Row(event_time=e[0], page_id=e[1], action=e[2]) for e in r[1]]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(events=[Row(action='pageLoad', event_time=datetime.datetime(2019, 3, 25, 1, 56, 14), page_id=45278900)], session_id=0)]"
      ]
     },
     "execution_count": 41,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sessions_rdd.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+----------+\n",
      "|              events|session_id|\n",
      "+--------------------+----------+\n",
      "|[[pageLoad, 2019-...| 982515712|\n",
      "|[[pageLoad, 2019-...|         0|\n",
      "|[[pageLoad, 2019-...| 161480706|\n",
      "|[[pageLoad, 2019-...| 322961412|\n",
      "|[[pageLoad, 2019-...| 484442118|\n",
      "+--------------------+----------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sessions = sqlContext.createDataFrame(sessions_rdd)\n",
    "sessions.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [],
   "source": [
    "def count_missing_load(row):\n",
    "    events = {}\n",
    "    for i in range(len(row.events)):\n",
    "        if row.events[i].action == 'pageLoad' and row.events[i].page_id not in events:\n",
    "            events[row.events[i].page_id] = i\n",
    "    for i in range(len(row.events)):\n",
    "        if row.events[i].action is not 'pageLoad' and (row.events[i].page_id not in events or events[row.events[i].page_id] > i):\n",
    "            return 1\n",
    "    return 0\n",
    "\n",
    "errors_count = sessions.rdd.map(count_missing_load)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "8.327657225917923e-05"
      ]
     },
     "execution_count": 44,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "errors_rate = errors_count.sum()/unique_sessions.count()\n",
    "errors_rate"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Number of sessions:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "956079217"
      ]
     },
     "execution_count": 45,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sessions.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark - YARN (large)",
   "language": "python",
   "name": "spark_yarn_pyspark_large"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}