{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Extract Section Heirarchy from WPM pages\n",
    "Data extracted from public HTML pages and written to `population_wpm_sections` table. Extraction details:\n",
    "- fetch revision IDs for all pages found in `enwiki-20190420-pages-articles-multistream.xml.bz2` dump file\n",
    "- fetch HTML using revision_id for each WP:M page\n",
    "- write page and section H2 data to `population_wpm_sections` for use in other notebooks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "%run -i 'data-defaults.py'\n",
    "\n",
    "import urllib.request\n",
    "from bs4 import BeautifulSoup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# method to fetch page HTML and map section_ids to section H2s\n",
    "def extract_h2s(row):\n",
    "    page_id = str(row['page_id'])\n",
    "    rev_id = str(row['rev_id'])\n",
    "    url = 'https://en.wikipedia.org/?oldid=' + rev_id\n",
    "    sections = list()\n",
    "    try:\n",
    "        wpm_page = urllib.request.urlopen(url)\n",
    "    except urllib.error.HTTPError:\n",
    "        print('error fetching data for: ' + str(row))\n",
    "        return\n",
    "    soup = BeautifulSoup(wpm_page.read(), features=\"lxml\")\n",
    "\n",
    "    h2 = \"\"\n",
    "    for span in soup.find_all(class_=\"mw-headline\"):\n",
    "        if('h2' == span.parent.name):\n",
    "            h2 = span.get('id')\n",
    "        sections.append(Row(page_id=page_id, section_h2=h2, section_id=span.get('id')))\n",
    "    return sections"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# only need revision ids for 20190420\n",
    "WIKIPEDIA_XML_DUMPS = ['enwiki-20190420-pages-articles-multistream.xml.bz2']\n",
    "\n",
    "def page_rev(entity, date):\n",
    "    return Row(page_id=entity.id, rev_id=entity.revision.id, dt=date)\n",
    "\n",
    "page_revs_rdd = sc.emptyRDD()\n",
    "for file in WIKIPEDIA_XML_DUMPS:\n",
    "    wikipedia = sqlContext.read.format('com.databricks.spark.xml').options(rowTag='page').load(file)\n",
    "    dump_date = re.search(r'.*(\\d{8}).*',file).group(1)\n",
    "    articles = wikipedia\\\n",
    "        .filter(\"ns = '0'\")\\\n",
    "        .filter(\"redirect._title is null\") \\\n",
    "        .filter(\"revision.text._VALUE is not null\") \\\n",
    "        .filter(\"length(revision.text._VALUE) > 0\")\n",
    "    daily_page_revs = sqlContext.createDataFrame(articles.rdd.map(lambda entity: page_rev(entity, dump_date)))\n",
    "    page_revs_rdd = page_revs_rdd.union(daily_page_revs.rdd)\n",
    "\n",
    "page_revs_merged = sqlContext.createDataFrame(page_revs_rdd)\n",
    "page_revs_merged.registerTempTable(\"page_revs_date\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>page_id</th>\n",
       "      <th>rev_id</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>count</th>\n",
       "      <td>3.259700e+04</td>\n",
       "      <td>3.259700e+04</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>mean</th>\n",
       "      <td>2.118422e+07</td>\n",
       "      <td>8.676358e+08</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>std</th>\n",
       "      <td>1.888627e+07</td>\n",
       "      <td>3.811546e+07</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>min</th>\n",
       "      <td>2.500000e+01</td>\n",
       "      <td>4.303365e+08</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>25%</th>\n",
       "      <td>3.237962e+06</td>\n",
       "      <td>8.614966e+08</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>50%</th>\n",
       "      <td>1.728788e+07</td>\n",
       "      <td>8.830005e+08</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>75%</th>\n",
       "      <td>3.660101e+07</td>\n",
       "      <td>8.891040e+08</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>max</th>\n",
       "      <td>6.053808e+07</td>\n",
       "      <td>8.937050e+08</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "            page_id        rev_id\n",
       "count  3.259700e+04  3.259700e+04\n",
       "mean   2.118422e+07  8.676358e+08\n",
       "std    1.888627e+07  3.811546e+07\n",
       "min    2.500000e+01  4.303365e+08\n",
       "25%    3.237962e+06  8.614966e+08\n",
       "50%    1.728788e+07  8.830005e+08\n",
       "75%    3.660101e+07  8.891040e+08\n",
       "max    6.053808e+07  8.937050e+08"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# extract WP:M page revisions for 20190420\n",
    "query = \"\"\"\n",
    "SELECT DISTINCT page_id, rev_id\n",
    "FROM \n",
    "    page_revs_date\n",
    "WHERE page_id IN \n",
    "    (SELECT page_id FROM ryanmax.population_wpm_pages_with_extlinks)\n",
    "    AND dt = '20190420'\n",
    "\"\"\"\n",
    "\n",
    "wpm_revs_pandas = spark.sql(query).toPandas()\n",
    "wpm_revs_pandas.describe()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1000 pages fetched\n",
      "2000 pages fetched\n",
      "3000 pages fetched\n",
      "4000 pages fetched\n",
      "5000 pages fetched\n",
      "6000 pages fetched\n",
      "7000 pages fetched\n",
      "8000 pages fetched\n",
      "9000 pages fetched\n",
      "10000 pages fetched\n",
      "11000 pages fetched\n",
      "12000 pages fetched\n",
      "13000 pages fetched\n",
      "14000 pages fetched\n",
      "15000 pages fetched\n",
      "16000 pages fetched\n",
      "17000 pages fetched\n",
      "18000 pages fetched\n",
      "19000 pages fetched\n",
      "20000 pages fetched\n",
      "21000 pages fetched\n",
      "22000 pages fetched\n",
      "23000 pages fetched\n",
      "24000 pages fetched\n",
      "25000 pages fetched\n",
      "26000 pages fetched\n",
      "27000 pages fetched\n",
      "28000 pages fetched\n",
      "29000 pages fetched\n",
      "30000 pages fetched\n",
      "31000 pages fetched\n",
      "32000 pages fetched\n"
     ]
    }
   ],
   "source": [
    "all_sections = list()\n",
    "i = 0\n",
    "for index, row in wpm_revs_pandas.iterrows():\n",
    "    for section in extract_h2s(row):\n",
    "        all_sections.append(section)\n",
    "    i += 1\n",
    "    if (i%1000 == 0):\n",
    "        print(str(i) + \" pages fetched\")\n",
    "\n",
    "sections_df = spark.createDataFrame(all_sections)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>page_id</th>\n",
       "      <th>section_h2</th>\n",
       "      <th>section_id</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>count</th>\n",
       "      <td>248540</td>\n",
       "      <td>248540</td>\n",
       "      <td>248540</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>unique</th>\n",
       "      <td>32496</td>\n",
       "      <td>29280</td>\n",
       "      <td>74213</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>top</th>\n",
       "      <td>602009</td>\n",
       "      <td>References</td>\n",
       "      <td>References</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>freq</th>\n",
       "      <td>517</td>\n",
       "      <td>31411</td>\n",
       "      <td>30879</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "       page_id  section_h2  section_id\n",
       "count   248540      248540      248540\n",
       "unique   32496       29280       74213\n",
       "top     602009  References  References\n",
       "freq       517       31411       30879"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sections_df.toPandas().describe()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# write section data to table for later use\n",
    "sections_df.registerTempTable(\"temp_population_wpm_sections\")\n",
    "sqlContext.sql(\"DROP TABLE IF EXISTS ryanmax.population_wpm_sections\")\n",
    "sqlContext.sql(\"CREATE TABLE ryanmax.population_wpm_sections AS SELECT * FROM temp_population_wpm_sections\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark large: spark-xml jar and local venv path ",
   "language": "python",
   "name": "spark-ryanmax"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}