{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "#### 2019-05-31\n", "Copy of Tiziano Piccardi's [DataSetAnonymization](https://github.com/epfl-dlab/WikipediaCitationUsage/blob/master/DatasetAnonymization.ipynb), altered to preserve `useragent.is_bot` in both citationusage and citationusagepageload tables." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "import re\n", "import pyspark.sql\n", "from pyspark.sql import *\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import hashlib\n", "import os.path\n", "from pyspark.sql.functions import *\n", "from datetime import timedelta, date\n", "\n", "\n", "%matplotlib inline\n", "spark_hive = pyspark.sql.HiveContext(sc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create alternative session ID to replace session_token" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------------------+\n", "|session_id| session_token|\n", "+----------+--------------------+\n", "| 0|e7e800a1b82909b4c037|\n", "| 1|77f3df30d8f9c579d6b9|\n", "| 2|3de8636a9aaff633c5f2|\n", "| 3|f1829efe15aa1e26d59e|\n", "| 4|9b7ebe75853b1d8b75f4|\n", "| 5|6362ed76a063f821ed33|\n", "| 6|3e4e49cc8cf0d800587b|\n", "| 7|529ba0f5bd46fb0f51eb|\n", "| 8|926758e0d73fbcaf2bbb|\n", "| 9|ed34a5bcac1b29495f70|\n", "| 10|489e7f3c58cc2a6227bc|\n", "| 11|bcf4b3768079ed4ce754|\n", "| 12|2863173fc6f780634769|\n", "| 13|5759f8e9386d19ff5782|\n", "| 14|8466e9dfc2a38e7adde6|\n", "| 15|9a77459c516a1792a4a8|\n", "| 16|e77c252e139768583f51|\n", "| 17|264ee2575f6f4e4255e2|\n", "| 18|578fccd38d53a295f59f|\n", "| 19|cb014c9b031a633a776e|\n", "+----------+--------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "pageloads = spark.sql(\"select event.session_token from event.citationusagepageload\").distinct()\n", "citationusage = spark.sql(\"select event.session_token from event.citationusage\").distinct()\n", "\n", "all_tokens = pageloads.union(citationusage).distinct().rdd.zipWithIndex()\n", "session_ids = sqlContext.createDataFrame(all_tokens.map(lambda r: \n", " Row(session_token=r[0].session_token, session_id=r[1])))\n", "\n", "session_ids.cache().write.parquet(\"session_ids.parquet\")\n", "session_ids.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get anonymous edits" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[page_id: bigint, edit_year: int, edit_month: int, edit_day: int, edit_hour: int, ip: string]" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "query = \"\"\"\n", "SELECT page_id, year(event_timestamp) edit_year, month(event_timestamp) edit_month, \n", " dayofmonth(event_timestamp) edit_day, hour(event_timestamp) edit_hour,\n", " event_user_text_historical ip\n", "FROM wmf.mediawiki_history \n", "WHERE wiki_db = 'enwiki'\n", "AND event_user_is_anonymous = TRUE\n", "AND to_timestamp(event_timestamp) > '2019-03-01'\n", "AND page_namespace = 0\n", "AND page_is_redirect = FALSE\n", "\"\"\"\n", "\n", "anonymous_edits = spark.sql(query).distinct()\n", "anonymous_edits" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get page loads and clicks events" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[dt: string, event: struct<action:string,dom_interactive_time:bigint,event_offset_time:bigint,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,session_token:string,skin:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, topic: string, year: bigint, month: bigint, day: bigint, hour: bigint]" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pageloads = spark.sql(\"select * from event.citationusagepageload\")\n", "pageloads" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Total pageload events:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "1829735489" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pageloads_original = pageloads.count()\n", "pageloads_original" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Total unique sessions:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "956199928" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pageloads_original_sessions = pageloads.select(\"event.session_token\").distinct().count()\n", "pageloads_original_sessions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[dt: string, event: struct<action:string,citation_in_text_refs:bigint,dom_interactive_time:bigint,event_offset_time:bigint,ext_position:bigint,footnote_number:bigint,freely_accessible:boolean,in_infobox:boolean,link_occurrence:bigint,link_text:string,link_url:string,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,section_id:string,session_token:string,skin:string,citation_identifier_label:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, topic: string, year: bigint, month: bigint, day: bigint, hour: bigint]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "citationusage = spark.sql(\"select * from event.citationusage\")\n", "citationusage" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Total click events:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "117186592" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "citationusage_original = citationusage.count()\n", "citationusage_original" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Unique sessions:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "75305966" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "citationusage_original_sessions = citationusage.select(\"event.session_token\").distinct().count()\n", "citationusage_original_sessions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----\n", "## Get the session tokens with an edit" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[session_token: string]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sessions_with_edits = pageloads.join(anonymous_edits, pageloads.ip == anonymous_edits.ip)\\\n", " .where(pageloads.year == anonymous_edits.edit_year)\\\n", " .where(pageloads.month == anonymous_edits.edit_month)\\\n", " .where(pageloads.day == anonymous_edits.edit_day)\\\n", " .where(pageloads.hour == anonymous_edits.edit_hour)\\\n", " .where(pageloads.event.page_id == anonymous_edits.page_id)\\\n", " .select(\"event.session_token\").distinct()\n", "\n", "sessions_with_edits" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Count the sessions to excude:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "118919" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sessions_with_edits.cache().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Left join to keep only the pageloads of the sessions without edits:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[dt: string, event: struct<action:string,dom_interactive_time:bigint,event_offset_time:bigint,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,session_token:string,skin:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, topic: string, year: bigint, month: bigint, day: bigint, hour: bigint]" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sessions_with_edits.registerTempTable(\"sessions_with_edits\")\n", "\n", "query = \"\"\"\n", "SELECT p.*\n", "FROM event.citationusagepageload p\n", "LEFT JOIN sessions_with_edits s\n", "ON p.event.session_token = s.session_token\n", "WHERE s.session_token IS NULL\n", "\"\"\"\n", "\n", "pageloads_clean = spark.sql(query)\n", "pageloads_clean" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the number of pageload events after the cleaning:" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "pageloads_anonymized = pageloads_clean.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the number of individual sessions:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "pageloads_anonymized_sessions = pageloads_clean.select(\"event.session_token\").distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Add the session id and drop the critical fields:" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- dt: string (nullable = true)\n", " |-- event: struct (nullable = true)\n", " | |-- action: string (nullable = true)\n", " | |-- dom_interactive_time: long (nullable = true)\n", " | |-- event_offset_time: long (nullable = true)\n", " | |-- mode: string (nullable = true)\n", " | |-- namespace_id: long (nullable = true)\n", " | |-- page_id: long (nullable = true)\n", " | |-- page_title: string (nullable = true)\n", " | |-- page_token: string (nullable = true)\n", " | |-- referrer: string (nullable = true)\n", " | |-- revision_id: long (nullable = true)\n", " | |-- session_token: string (nullable = true)\n", " | |-- skin: string (nullable = true)\n", " |-- ip: string (nullable = true)\n", " |-- recvfrom: string (nullable = true)\n", " |-- revision: long (nullable = true)\n", " |-- schema: string (nullable = true)\n", " |-- seqid: long (nullable = true)\n", " |-- useragent: struct (nullable = true)\n", " | |-- browser_family: string (nullable = true)\n", " | |-- browser_major: string (nullable = true)\n", " | |-- browser_minor: string (nullable = true)\n", " | |-- device_family: string (nullable = true)\n", " | |-- is_bot: boolean (nullable = true)\n", " | |-- is_mediawiki: boolean (nullable = true)\n", " | |-- os_family: string (nullable = true)\n", " | |-- os_major: string (nullable = true)\n", " | |-- os_minor: string (nullable = true)\n", " | |-- wmf_app_version: string (nullable = true)\n", " |-- uuid: string (nullable = true)\n", " |-- webhost: string (nullable = true)\n", " |-- wiki: string (nullable = true)\n", " |-- geocoded_data: map (nullable = true)\n", " | |-- key: string\n", " | |-- value: string (valueContainsNull = true)\n", " |-- topic: string (nullable = true)\n", " |-- year: long (nullable = true)\n", " |-- month: long (nullable = true)\n", " |-- day: long (nullable = true)\n", " |-- hour: long (nullable = true)\n", " |-- session_id: long (nullable = true)\n", " |-- event_time: timestamp (nullable = true)\n", "\n" ] } ], "source": [ "anonymous_pageloads_nested = pageloads_clean\\\n", " .alias(\"pl\")\\\n", " .join(session_ids.alias(\"ids\"), pageloads_clean.event.session_token == session_ids.session_token)\\\n", " .select([\"pl.*\", \"ids.session_id\", to_timestamp(\"pl.dt\").alias(\"event_time\")])\n", "\n", "anonymous_pageloads_nested.printSchema()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- session_id: long (nullable = true)\n", " |-- event_time: timestamp (nullable = true)\n", " |-- action: string (nullable = true)\n", " |-- dom_interactive_time: long (nullable = true)\n", " |-- event_offset_time: long (nullable = true)\n", " |-- mode: string (nullable = true)\n", " |-- namespace_id: long (nullable = true)\n", " |-- page_id: long (nullable = true)\n", " |-- page_title: string (nullable = true)\n", " |-- page_token: string (nullable = true)\n", " |-- referrer: string (nullable = true)\n", " |-- revision_id: long (nullable = true)\n", " |-- skin: string (nullable = true)\n", " |-- useragent_is_bot: boolean (nullable = true)\n", " |-- recvfrom: string (nullable = true)\n", " |-- revision: long (nullable = true)\n", " |-- schema: string (nullable = true)\n", " |-- seqid: long (nullable = true)\n", " |-- uuid: string (nullable = true)\n", " |-- webhost: string (nullable = true)\n", " |-- wiki: string (nullable = true)\n", " |-- geocoded_data: map (nullable = true)\n", " | |-- key: string\n", " | |-- value: string (valueContainsNull = true)\n", " |-- year: long (nullable = true)\n", " |-- month: long (nullable = true)\n", " |-- day: long (nullable = true)\n", " |-- hour: long (nullable = true)\n", "\n" ] } ], "source": [ "anonymous_pageloads_nested.registerTempTable('anonymous_pageloads_nested')\n", "\n", "query = \"\"\"\n", "SELECT session_id, event_time, event.*, useragent.is_bot as useragent_is_bot,\n", " recvfrom, revision, schema, seqid, \n", " uuid, webhost, wiki, geocoded_data, \n", " year, month, day, hour \n", "FROM anonymous_pageloads_nested\n", "\"\"\"\n", "\n", "anonymous_pageloads = spark.sql(query).drop('session_token')\n", "anonymous_pageloads.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sanity check:" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "1828725320" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "anonymous_pageloads_count = anonymous_pageloads.count()\n", "anonymous_pageloads_count" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sanity check, number of unique sessions:" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "956081009" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "anonymous_pageloads_sessions_count = anonymous_pageloads.select(\"session_id\").distinct().count()\n", "anonymous_pageloads_sessions_count" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "anonymous_pageloads.write.parquet(\"anonymous_pageloads_april.parquet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Anonymize the citation usage table" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- dt: string (nullable = true)\n", " |-- event: struct (nullable = true)\n", " | |-- action: string (nullable = true)\n", " | |-- citation_in_text_refs: long (nullable = true)\n", " | |-- dom_interactive_time: long (nullable = true)\n", " | |-- event_offset_time: long (nullable = true)\n", " | |-- ext_position: long (nullable = true)\n", " | |-- footnote_number: long (nullable = true)\n", " | |-- freely_accessible: boolean (nullable = true)\n", " | |-- in_infobox: boolean (nullable = true)\n", " | |-- link_occurrence: long (nullable = true)\n", " | |-- link_text: string (nullable = true)\n", " | |-- link_url: string (nullable = true)\n", " | |-- mode: string (nullable = true)\n", " | |-- namespace_id: long (nullable = true)\n", " | |-- page_id: long (nullable = true)\n", " | |-- page_title: string (nullable = true)\n", " | |-- page_token: string (nullable = true)\n", " | |-- referrer: string (nullable = true)\n", " | |-- revision_id: long (nullable = true)\n", " | |-- section_id: string (nullable = true)\n", " | |-- session_token: string (nullable = true)\n", " | |-- skin: string (nullable = true)\n", " | |-- citation_identifier_label: string (nullable = true)\n", " |-- ip: string (nullable = true)\n", " |-- recvfrom: string (nullable = true)\n", " |-- revision: long (nullable = true)\n", " |-- schema: string (nullable = true)\n", " |-- seqid: long (nullable = true)\n", " |-- useragent: struct (nullable = true)\n", " | |-- browser_family: string (nullable = true)\n", " | |-- browser_major: string (nullable = true)\n", " | |-- browser_minor: string (nullable = true)\n", " | |-- device_family: string (nullable = true)\n", " | |-- is_bot: boolean (nullable = true)\n", " | |-- is_mediawiki: boolean (nullable = true)\n", " | |-- os_family: string (nullable = true)\n", " | |-- os_major: string (nullable = true)\n", " | |-- os_minor: string (nullable = true)\n", " | |-- wmf_app_version: string (nullable = true)\n", " |-- uuid: string (nullable = true)\n", " |-- webhost: string (nullable = true)\n", " |-- wiki: string (nullable = true)\n", " |-- geocoded_data: map (nullable = true)\n", " | |-- key: string\n", " | |-- value: string (valueContainsNull = true)\n", " |-- topic: string (nullable = true)\n", " |-- year: long (nullable = true)\n", " |-- month: long (nullable = true)\n", " |-- day: long (nullable = true)\n", " |-- hour: long (nullable = true)\n", " |-- session_id: long (nullable = true)\n", " |-- event_time: timestamp (nullable = true)\n", "\n" ] } ], "source": [ "anonymous_citationusage_nested = citationusage\\\n", " .alias(\"cit\")\\\n", " .join(session_ids.alias(\"ids\"), citationusage.event.session_token == session_ids.session_token)\\\n", " .select([\"cit.*\", \"ids.session_id\", to_timestamp(\"cit.dt\").alias(\"event_time\")])\n", "\n", "anonymous_citationusage_nested.printSchema()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- session_id: long (nullable = true)\n", " |-- event_time: timestamp (nullable = true)\n", " |-- action: string (nullable = true)\n", " |-- citation_in_text_refs: long (nullable = true)\n", " |-- dom_interactive_time: long (nullable = true)\n", " |-- event_offset_time: long (nullable = true)\n", " |-- ext_position: long (nullable = true)\n", " |-- footnote_number: long (nullable = true)\n", " |-- freely_accessible: boolean (nullable = true)\n", " |-- in_infobox: boolean (nullable = true)\n", " |-- link_occurrence: long (nullable = true)\n", " |-- link_text: string (nullable = true)\n", " |-- link_url: string (nullable = true)\n", " |-- mode: string (nullable = true)\n", " |-- namespace_id: long (nullable = true)\n", " |-- page_id: long (nullable = true)\n", " |-- page_title: string (nullable = true)\n", " |-- page_token: string (nullable = true)\n", " |-- referrer: string (nullable = true)\n", " |-- revision_id: long (nullable = true)\n", " |-- section_id: string (nullable = true)\n", " |-- skin: string (nullable = true)\n", " |-- citation_identifier_label: string (nullable = true)\n", " |-- useragent_is_bot: boolean (nullable = true)\n", " |-- recvfrom: string (nullable = true)\n", " |-- revision: long (nullable = true)\n", " |-- schema: string (nullable = true)\n", " |-- seqid: long (nullable = true)\n", " |-- uuid: string (nullable = true)\n", " |-- webhost: string (nullable = true)\n", " |-- wiki: string (nullable = true)\n", " |-- geocoded_data: map (nullable = true)\n", " | |-- key: string\n", " | |-- value: string (valueContainsNull = true)\n", " |-- year: long (nullable = true)\n", " |-- month: long (nullable = true)\n", " |-- day: long (nullable = true)\n", " |-- hour: long (nullable = true)\n", "\n" ] } ], "source": [ "anonymous_citationusage_nested.registerTempTable('anonymous_citationusage_nested')\n", "\n", "query = \"\"\"\n", "SELECT session_id, event_time, event.*, useragent.is_bot as useragent_is_bot,\n", " recvfrom, revision, schema, seqid, \n", " uuid, webhost, wiki, geocoded_data, \n", " year, month, day, hour \n", "FROM anonymous_citationusage_nested\n", "\"\"\"\n", "\n", "anonymous_citationusage = spark.sql(query).drop('session_token')\n", "anonymous_citationusage.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sanity check:" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "117186558" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "citationusage_anonymized = anonymous_citationusage.count()\n", "citationusage_anonymized" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sanity check, number of unique sessions:" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "75305945" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "citationusage_anonymized_sessions = anonymous_citationusage.select(\"session_id\").distinct().count()\n", "citationusage_anonymized_sessions" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "anonymous_citationusage.write.parquet(\"anonymous_citationusage_april.parquet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "------\n", "### Click events\n", "Number of removed events:" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of clicks events removed:\n", "34\n", "Percentage of the dataset removed:\n", "2.9013558138118737e-07\n" ] } ], "source": [ "print(\"Number of clicks events removed:\")\n", "print(citationusage_original - citationusage_anonymized)\n", "print(\"Percentage of the dataset removed:\")\n", "print((citationusage_original - citationusage_anonymized)/citationusage_original)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Number of removed sessions:" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of sessions removed:\n", "21\n", "Percentage of the dataset removed:\n", "2.7886236795634494e-07\n" ] } ], "source": [ "print(\"Number of sessions removed:\")\n", "print(citationusage_original_sessions - citationusage_anonymized_sessions)\n", "print(\"Percentage of the dataset removed:\")\n", "print((citationusage_original_sessions - citationusage_anonymized_sessions)/citationusage_original_sessions)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----\n", "### Pageloads" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of pageload removed removed:\n", "1010169\n", "Percentage of the dataset removed:\n", "0.0005520847172025311\n" ] } ], "source": [ "print(\"Number of pageload removed removed:\")\n", "print(pageloads_original - pageloads_anonymized)\n", "print(\"Percentage of the dataset removed:\")\n", "print((pageloads_original - pageloads_anonymized)/pageloads_original)" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of sessions removed:\n", "118919\n", "Percentage of the dataset removed:\n", "0.00012436625073663466\n" ] } ], "source": [ "print(\"Number of sessions removed:\")\n", "print(pageloads_original_sessions - pageloads_anonymized_sessions)\n", "print(\"Percentage of the dataset removed:\")\n", "print((pageloads_original_sessions - pageloads_anonymized_sessions)/pageloads_original_sessions)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Sanity checks\n", "\n", "Release cache:" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "spark.catalog.clearCache()" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[session_id: bigint, event_time: timestamp, action: string, citation_in_text_refs: bigint, dom_interactive_time: bigint, event_offset_time: bigint, ext_position: bigint, footnote_number: bigint, freely_accessible: boolean, in_infobox: boolean, link_occurrence: bigint, link_text: string, link_url: string, mode: string, namespace_id: bigint, page_id: bigint, page_title: string, page_token: string, referrer: string, revision_id: bigint, section_id: string, skin: string, citation_identifier_label: string, useragent_is_bot: boolean, recvfrom: string, revision: bigint, schema: string, seqid: bigint, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "anonymous_citationusage = spark.read.parquet(\"anonymous_citationusage_april.parquet\")\n", "anonymous_citationusage" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check count:" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "117186558" ] }, "execution_count": 31, "metadata": {}, "output_type": "execute_result" } ], "source": [ "anonymous_citationusage.count()" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[session_id: bigint, event_time: timestamp, action: string, dom_interactive_time: bigint, event_offset_time: bigint, mode: string, namespace_id: bigint, page_id: bigint, page_title: string, page_token: string, referrer: string, revision_id: bigint, skin: string, useragent_is_bot: boolean, recvfrom: string, revision: bigint, schema: string, seqid: bigint, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]" ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "anonymous_pageloads = spark.read.parquet(\"anonymous_pageloads_april.parquet\")\n", "anonymous_pageloads" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check count:" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "1828723456" ] }, "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ "anonymous_pageloads.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----\n", "# Check on schema" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- session_id: long (nullable = true)\n", " |-- event_time: timestamp (nullable = true)\n", " |-- action: string (nullable = true)\n", " |-- citation_in_text_refs: long (nullable = true)\n", " |-- dom_interactive_time: long (nullable = true)\n", " |-- event_offset_time: long (nullable = true)\n", " |-- ext_position: long (nullable = true)\n", " |-- footnote_number: long (nullable = true)\n", " |-- freely_accessible: boolean (nullable = true)\n", " |-- in_infobox: boolean (nullable = true)\n", " |-- link_occurrence: long (nullable = true)\n", " |-- link_text: string (nullable = true)\n", " |-- link_url: string (nullable = true)\n", " |-- mode: string (nullable = true)\n", " |-- namespace_id: long (nullable = true)\n", " |-- page_id: long (nullable = true)\n", " |-- page_title: string (nullable = true)\n", " |-- page_token: string (nullable = true)\n", " |-- referrer: string (nullable = true)\n", " |-- revision_id: long (nullable = true)\n", " |-- section_id: string (nullable = true)\n", " |-- skin: string (nullable = true)\n", " |-- citation_identifier_label: string (nullable = true)\n", " |-- useragent_is_bot: boolean (nullable = true)\n", " |-- recvfrom: string (nullable = true)\n", " |-- revision: long (nullable = true)\n", " |-- schema: string (nullable = true)\n", " |-- seqid: long (nullable = true)\n", " |-- uuid: string (nullable = true)\n", " |-- webhost: string (nullable = true)\n", " |-- wiki: string (nullable = true)\n", " |-- geocoded_data: map (nullable = true)\n", " | |-- key: string\n", " | |-- value: string (valueContainsNull = true)\n", " |-- year: long (nullable = true)\n", " |-- month: long (nullable = true)\n", " |-- day: long (nullable = true)\n", " |-- hour: long (nullable = true)\n", "\n" ] } ], "source": [ "anonymous_citationusage.printSchema()" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- session_id: long (nullable = true)\n", " |-- event_time: timestamp (nullable = true)\n", " |-- action: string (nullable = true)\n", " |-- dom_interactive_time: long (nullable = true)\n", " |-- event_offset_time: long (nullable = true)\n", " |-- mode: string (nullable = true)\n", " |-- namespace_id: long (nullable = true)\n", " |-- page_id: long (nullable = true)\n", " |-- page_title: string (nullable = true)\n", " |-- page_token: string (nullable = true)\n", " |-- referrer: string (nullable = true)\n", " |-- revision_id: long (nullable = true)\n", " |-- skin: string (nullable = true)\n", " |-- useragent_is_bot: boolean (nullable = true)\n", " |-- recvfrom: string (nullable = true)\n", " |-- revision: long (nullable = true)\n", " |-- schema: string (nullable = true)\n", " |-- seqid: long (nullable = true)\n", " |-- uuid: string (nullable = true)\n", " |-- webhost: string (nullable = true)\n", " |-- wiki: string (nullable = true)\n", " |-- geocoded_data: map (nullable = true)\n", " | |-- key: string\n", " | |-- value: string (valueContainsNull = true)\n", " |-- year: long (nullable = true)\n", " |-- month: long (nullable = true)\n", " |-- day: long (nullable = true)\n", " |-- hour: long (nullable = true)\n", "\n" ] } ], "source": [ "anonymous_pageloads.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Check at session level" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[event_time: timestamp, session_id: bigint, page_id: bigint, action: string]" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "anonymous_pageloads.registerTempTable('anonymous_pageloads')\n", "\n", "pageloads_query = \"\"\"\n", "select event_time, session_id, page_id, 'pageLoad' as action\n", "from anonymous_pageloads\n", "where wiki = 'enwiki'\n", "AND namespace_id = 0\n", "\"\"\"\n", "\n", "pageloads = spark.sql(pageloads_query)\n", "pageloads" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+----------+--------+--------+\n", "| event_time|session_id| page_id| action|\n", "+-------------------+----------+--------+--------+\n", "|2019-04-04 06:08:18| 4497935| 3157978|pageLoad|\n", "|2019-04-06 14:04:01| 2565696|49312398|pageLoad|\n", "|2019-04-06 10:35:38| 3423790|28658666|pageLoad|\n", "|2019-03-29 01:03:23| 4172151| 1418277|pageLoad|\n", "|2019-03-24 15:34:54| 2019617|52839809|pageLoad|\n", "+-------------------+----------+--------+--------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "pageloads.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the unique session IDs to filter the \"citationusage\" table (the pageload is a subsamping). " ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [], "source": [ "unique_sessions = pageloads.select(\"session_id\").distinct()" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[event_time: timestamp, session_id: bigint, page_id: bigint, action: string]" ] }, "execution_count": 39, "metadata": {}, "output_type": "execute_result" } ], "source": [ "anonymous_citationusage.registerTempTable('anonymous_citationusage')\n", "unique_sessions.registerTempTable('unique_sessions')\n", "\n", "events_query = \"\"\"\n", "select event_time, cu.session_id, page_id, action\n", "from anonymous_citationusage cu\n", "join unique_sessions us\n", "on us.session_id=cu.session_id\n", "where wiki = 'enwiki'\n", "\"\"\"\n", "\n", "events = spark.sql(events_query)\n", "events" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Merge all the events:" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [], "source": [ "sessions_rdd = events.rdd.union(pageloads.rdd)\\\n", " .map(lambda r: (r.session_id, [(r.event_time, r.page_id, r.action)]))\\\n", " .reduceByKey(lambda a,b: a+b)\\\n", " .map(lambda r: (r[0], sorted(r[1], key=lambda x: x[0])))\\\n", " .map(lambda r: Row(session_id=r[0], events=[\n", " Row(event_time=e[0], page_id=e[1], action=e[2]) for e in r[1]]))" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(events=[Row(action='pageLoad', event_time=datetime.datetime(2019, 3, 25, 1, 56, 14), page_id=45278900)], session_id=0)]" ] }, "execution_count": 41, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sessions_rdd.take(1)" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+----------+\n", "| events|session_id|\n", "+--------------------+----------+\n", "|[[pageLoad, 2019-...| 982515712|\n", "|[[pageLoad, 2019-...| 0|\n", "|[[pageLoad, 2019-...| 161480706|\n", "|[[pageLoad, 2019-...| 322961412|\n", "|[[pageLoad, 2019-...| 484442118|\n", "+--------------------+----------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "sessions = sqlContext.createDataFrame(sessions_rdd)\n", "sessions.show(5)" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [], "source": [ "def count_missing_load(row):\n", " events = {}\n", " for i in range(len(row.events)):\n", " if row.events[i].action == 'pageLoad' and row.events[i].page_id not in events:\n", " events[row.events[i].page_id] = i\n", " for i in range(len(row.events)):\n", " if row.events[i].action is not 'pageLoad' and (row.events[i].page_id not in events or events[row.events[i].page_id] > i):\n", " return 1\n", " return 0\n", "\n", "errors_count = sessions.rdd.map(count_missing_load)" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "8.327657225917923e-05" ] }, "execution_count": 44, "metadata": {}, "output_type": "execute_result" } ], "source": [ "errors_rate = errors_count.sum()/unique_sessions.count()\n", "errors_rate" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Number of sessions:" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "956079217" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sessions.count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark - YARN (large)", "language": "python", "name": "spark_yarn_pyspark_large" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.3" } }, "nbformat": 4, "nbformat_minor": 2 }