{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Harvest indexes\n", "\n", "This notebook harvests data from all of [NSW State Archives online indexes](https://mhnsw.au/archive/subjects/?filter=indexes), saving the data as a collection of easily downloadable CSV files.\n", "\n", "Before you harvest the data, you need to [get the details of all the indexes](get-list-of-indexes.ipynb).\n", "\n", "If you just want the data, my latest harvest of the indexes is [available from this repository](https://github.com/wragge/srnsw-indexes).\n", "\n", "If you'd like to explore the harvested data, try the [Index Explorer](https://glam-workbench.net/nsw-state-archives/#nsw-state-archives-index-explorer)!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import what we need" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "tags": [] }, "outputs": [], "source": [ "import os\n", "import shutil\n", "import time\n", "from pathlib import Path\n", "\n", "import pandas as pd\n", "import requests_cache\n", "from requests.adapters import HTTPAdapter\n", "from requests.packages.urllib3.util.retry import Retry\n", "from tqdm.auto import tqdm\n", "\n", "\n", "def filter_errors(response):\n", " \"\"\"\n", " Errors are returned with a status code of 200, so we have to stop them from being cached.\n", " \"\"\"\n", " return \"errors\" not in response.json()\n", "\n", "\n", "s = requests_cache.CachedSession(\n", " allowable_methods=(\"GET\", \"POST\"), filter_fn=filter_errors\n", ")\n", "retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])\n", "s.mount(\"http://\", HTTPAdapter(max_retries=retries))\n", "s.mount(\"https://\", HTTPAdapter(max_retries=retries))" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%capture\n", "# Load environment variables if available\n", "%load_ext dotenv\n", "%dotenv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Configure the API query\n", "\n", "The index tables are populated using a GraphQL API query. Here we create a basic GraphQL query to request a page of table data for a specific index. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Define the GraphQL query\n", "q_str = \"\"\"\n", "query SaraIndex($slug: String, $table: PrimoTableInput) {\n", " stateArchives {\n", " saraIndex(slug: $slug) {\n", " path\n", " title\n", " percentDigitised\n", " table(input: $table) {\n", " pagination {\n", " page\n", " perPage\n", " results\n", " total\n", " totalPages\n", " }\n", " headers {\n", " title\n", " }\n", " rows {\n", " columns {\n", " content\n", " link\n", " }\n", " }\n", " }\n", " }\n", " }\n", "}\n", "\"\"\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The complete API query includes variables that supply a slug to identifies the index and pagination details to specify a slice of the dataset. We can change these values as we work through the indexes to retrieve the full datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Construct the basic data payload, the table variables will be updated by the harvest\n", "query = {\n", " \"query\": q_str,\n", " \"operationName\": \"SaraIndex\",\n", " \"variables\": {\n", " \"slug\": \"colonial-architect-index\",\n", " \"table\": {\"query\": \"\", \"pagination\": {\"page\": 1, \"perPage\": 100}},\n", " },\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define some functions" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "tags": [] }, "outputs": [], "source": [ "def get_total(query):\n", " \"\"\"\n", " Get the total number of pages in the result set returned by the supplied API query.\n", " Returns the number of pages in the results set as an integer.\n", " \"\"\"\n", " response = s.post(\"https://api.mhnsw.au/graphql\", json=query)\n", " data = response.json()\n", " # Sometimes the requests fail and return JSON with an \"errors\" key\n", " # These seem to be random and re-requesting after a short pause seems to work ok\n", " if \"errors\" not in data:\n", " pagination = data[\"data\"][\"stateArchives\"][\"saraIndex\"][\"table\"][\"pagination\"]\n", " return pagination[\"totalPages\"]\n", " else:\n", " time.sleep(60)\n", " get_total()\n", "\n", "\n", "def get_headers(table):\n", " \"\"\"\n", " Get a list of column names from the table.\n", " \"\"\"\n", " headers = []\n", " for h in table[\"headers\"]:\n", " if h[\"title\"]:\n", " headers.append(h[\"title\"])\n", " else:\n", " headers.append(\"Details\")\n", " return headers\n", "\n", "\n", "def get_rows(table):\n", " \"\"\"\n", " Get data from all the rows/columns on the current table page.\n", " Returns a list of lists representing rows/columns.\n", " \"\"\"\n", " rows = []\n", " for row in table[\"rows\"]:\n", " cols = []\n", " for col in row[\"columns\"]:\n", " if col[\"content\"] == \"Details\" and col[\"link\"]:\n", " cols.append(col[\"link\"])\n", " else:\n", " cols.append(col[\"content\"])\n", " rows.append(cols)\n", " return rows\n", "\n", "\n", "def harvest_index(query, output_dir=\"indexes\"):\n", " \"\"\"\n", " Harvest all the the data returned by the supplied query.\n", " The data is saved as a CSV file in the specified output directory.\n", " \"\"\"\n", " dfs = []\n", " current_page = 0\n", " # Get the total number of pages\n", " total_pages = get_total(query)\n", " with tqdm(total=total_pages, desc=query[\"variables\"][\"slug\"]) as pbar:\n", " # Continue until the current page equals the total number of pages\n", " while current_page < total_pages:\n", " current_page += 1\n", " # Set the page required in the table pagination\n", " query[\"variables\"][\"table\"][\"pagination\"][\"page\"] = current_page\n", " response = s.post(\"https://api.mhnsw.au/graphql\", json=query)\n", " data = response.json()\n", " # Sometimes the requests fail and return JSON with an \"errors\" key\n", " # These seem to be random and re-requesting after a short pause seems to work ok\n", " if \"errors\" not in data:\n", " # Get the table data from the response\n", " table = data[\"data\"][\"stateArchives\"][\"saraIndex\"][\"table\"]\n", " # Get column headers\n", " headers = get_headers(table)\n", " # Get rows\n", " rows = get_rows(table)\n", " # Create a dataframe from the rows and headers and append to a list\n", " dfs.append(pd.DataFrame(rows, columns=headers))\n", " pbar.update(1)\n", " if not response.from_cache:\n", " time.sleep(0.5)\n", " # If there's been an error, wait for 60 secs then try again\n", " else:\n", " current_page = current_page - 1\n", " time.sleep(60)\n", " # Combine all the dataframes\n", " df = pd.concat(dfs)\n", " # I don't think this is necessary, but just in case\n", " df.drop_duplicates(inplace=True)\n", " # Set up output directory and save df as a CSV file\n", " output_dir = Path(output_dir)\n", " output_dir.mkdir(exist_ok=True)\n", " df.to_csv(Path(output_dir, f\"{query['variables']['slug']}.csv\"), index=False)\n", " del df\n", "\n", "\n", "def harvest_indexes(query, indexes, output_dir=\"indexes\"):\n", " \"\"\"\n", " Harvest data from all of the indexes.\n", " \"\"\"\n", " for index in tqdm(indexes):\n", " slug = index[\"url\"].strip(\"/\").split(\"/\")[-1]\n", " query[\"variables\"][\"slug\"] = slug\n", " harvest_index(query, output_dir=output_dir)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Harvest all the indexes\n", "\n", "Note that this can take quite a while and sometimes there are errors that interrupt the harvest. I've noted in the code that sometimes the JSON response includes an 'errors' key. These problems seem temporary and re-requesting after a short pause seems to work ok. Other errors sometimes result in a reponse that's not JSON, generating a `JSONDecodeError`. I haven't tried to handle these in the code, as they don't seem as common and I'm not quite sure what the problem is. But because the requests/responses are all cached, you can simply re-run `harvest_indexes()` to pick up where you left off." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "nbval-skip" ] }, "outputs": [], "source": [ "# Load the pre-harvested list of indexes\n", "indexes = pd.read_csv(\"indexes.csv\").to_dict(\"records\")\n", "\n", "# Harvest all the indexes to the default directory (\"indexes\").\n", "harvest_indexes(query, indexes)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Harvest a single index\n", "\n", "To harvest a single index you just need to update the `slug` parameter in `query`. The index's `slug` is the last part of the url. For example: 'unemployed-in-sydney'." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "nbval-skip" ] }, "outputs": [], "source": [ "query[\"variables\"][\"slug\"] = \"unemployed-in-sydney\"\n", "harvest_index(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# IGNORE -- THIS CELL IS FOR AUTOMATED TESTING ONLY\n", "if os.getenv(\"GW_STATUS\") == \"dev\":\n", " indexes = pd.read_csv(\"indexes.csv\").to_dict(\"records\")[:1]\n", " harvest_indexes(query, indexes, output_dir=\"test\")\n", "\n", " index = indexes[0]\n", " slug = index[\"url\"].strip(\"/\").split(\"/\")[-1]\n", "\n", " assert Path(\"test\", f\"{slug}.csv\").exists()\n", " assert not pd.read_csv(Path(\"test\", f\"{slug}.csv\")).empty\n", "\n", " shutil.rmtree(\"test\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----\n", "\n", "Created by [Tim Sherratt](https://timsherratt.org/) for the [GLAM Workbench](https://glam-workbench.net/)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.2" } }, "nbformat": 4, "nbformat_minor": 4 }