# Harvest indexes

This notebook harvests data from all of [NSW State Archives online indexes](https://mhnsw.au/archive/subjects/?filter=indexes), saving the data as a collection of easily downloadable CSV files.

Before you harvest the data, you need to [get the details of all the indexes](get-list-of-indexes.ipynb).

If you just want the data, my latest harvest of the indexes is [available from this repository](https://github.com/wragge/srnsw-indexes).

If you'd like to explore the harvested data, try the [Index Explorer](https://glam-workbench.net/nsw-state-archives/#nsw-state-archives-index-explorer)!

## Import what we need

In [1]:
import os
import shutil
import time
from pathlib import Path

import pandas as pd
import requests_cache
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from tqdm.auto import tqdm


def filter_errors(response):
 """
 Errors are returned with a status code of 200, so we have to stop them from being cached.
 """
 return "errors" not in response.json()


s = requests_cache.CachedSession(
 allowable_methods=("GET", "POST"), filter_fn=filter_errors
)
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount("http://", HTTPAdapter(max_retries=retries))
s.mount("https://", HTTPAdapter(max_retries=retries))

In [2]:
%%capture
# Load environment variables if available
%load_ext dotenv
%dotenv

## Configure the API query

The index tables are populated using a GraphQL API query. Here we create a basic GraphQL query to request a page of table data for a specific index. 

In [None]:
# Define the GraphQL query
q_str = """
query SaraIndex($slug: String, $table: PrimoTableInput) {
 stateArchives {
 saraIndex(slug: $slug) {
 path
 title
 percentDigitised
 table(input: $table) {
 pagination {
 page
 perPage
 results
 total
 totalPages
 }
 headers {
 title
 }
 rows {
 columns {
 content
 link
 }
 }
 }
 }
 }
}
"""

The complete API query includes variables that supply a slug to identifies the index and pagination details to specify a slice of the dataset. We can change these values as we work through the indexes to retrieve the full datasets.

In [None]:
# Construct the basic data payload, the table variables will be updated by the harvest
query = {
 "query": q_str,
 "operationName": "SaraIndex",
 "variables": {
 "slug": "colonial-architect-index",
 "table": {"query": "", "pagination": {"page": 1, "perPage": 100}},
 },
}

## Define some functions

In [15]:
def get_total(query):
 """
 Get the total number of pages in the result set returned by the supplied API query.
 Returns the number of pages in the results set as an integer.
 """
 response = s.post("https://api.mhnsw.au/graphql", json=query)
 data = response.json()
 # Sometimes the requests fail and return JSON with an "errors" key
 # These seem to be random and re-requesting after a short pause seems to work ok
 if "errors" not in data:
 pagination = data["data"]["stateArchives"]["saraIndex"]["table"]["pagination"]
 return pagination["totalPages"]
 else:
 time.sleep(60)
 get_total()


def get_headers(table):
 """
 Get a list of column names from the table.
 """
 headers = []
 for h in table["headers"]:
 if h["title"]:
 headers.append(h["title"])
 else:
 headers.append("Details")
 return headers


def get_rows(table):
 """
 Get data from all the rows/columns on the current table page.
 Returns a list of lists representing rows/columns.
 """
 rows = []
 for row in table["rows"]:
 cols = []
 for col in row["columns"]:
 if col["content"] == "Details" and col["link"]:
 cols.append(col["link"])
 else:
 cols.append(col["content"])
 rows.append(cols)
 return rows


def harvest_index(query, output_dir="indexes"):
 """
 Harvest all the the data returned by the supplied query.
 The data is saved as a CSV file in the specified output directory.
 """
 dfs = []
 current_page = 0
 # Get the total number of pages
 total_pages = get_total(query)
 with tqdm(total=total_pages, desc=query["variables"]["slug"]) as pbar:
 # Continue until the current page equals the total number of pages
 while current_page < total_pages:
 current_page += 1
 # Set the page required in the table pagination
 query["variables"]["table"]["pagination"]["page"] = current_page
 response = s.post("https://api.mhnsw.au/graphql", json=query)
 data = response.json()
 # Sometimes the requests fail and return JSON with an "errors" key
 # These seem to be random and re-requesting after a short pause seems to work ok
 if "errors" not in data:
 # Get the table data from the response
 table = data["data"]["stateArchives"]["saraIndex"]["table"]
 # Get column headers
 headers = get_headers(table)
 # Get rows
 rows = get_rows(table)
 # Create a dataframe from the rows and headers and append to a list
 dfs.append(pd.DataFrame(rows, columns=headers))
 pbar.update(1)
 if not response.from_cache:
 time.sleep(0.5)
 # If there's been an error, wait for 60 secs then try again
 else:
 current_page = current_page - 1
 time.sleep(60)
 # Combine all the dataframes
 df = pd.concat(dfs)
 # I don't think this is necessary, but just in case
 df.drop_duplicates(inplace=True)
 # Set up output directory and save df as a CSV file
 output_dir = Path(output_dir)
 output_dir.mkdir(exist_ok=True)
 df.to_csv(Path(output_dir, f"{query['variables']['slug']}.csv"), index=False)
 del df


def harvest_indexes(query, indexes, output_dir="indexes"):
 """
 Harvest data from all of the indexes.
 """
 for index in tqdm(indexes):
 slug = index["url"].strip("/").split("/")[-1]
 query["variables"]["slug"] = slug
 harvest_index(query, output_dir=output_dir)

## Harvest all the indexes

Note that this can take quite a while and sometimes there are errors that interrupt the harvest. I've noted in the code that sometimes the JSON response includes an 'errors' key. These problems seem temporary and re-requesting after a short pause seems to work ok. Other errors sometimes result in a reponse that's not JSON, generating a `JSONDecodeError`. I haven't tried to handle these in the code, as they don't seem as common and I'm not quite sure what the problem is. But because the requests/responses are all cached, you can simply re-run `harvest_indexes()` to pick up where you left off.

In [None]:
# Load the pre-harvested list of indexes
indexes = pd.read_csv("indexes.csv").to_dict("records")

# Harvest all the indexes to the default directory ("indexes").
harvest_indexes(query, indexes)

## Harvest a single index

To harvest a single index you just need to update the `slug` parameter in `query`. The index's `slug` is the last part of the url. For example: 'unemployed-in-sydney'.

In [None]:
query["variables"]["slug"] = "unemployed-in-sydney"
harvest_index(query)

In [None]:
# IGNORE -- THIS CELL IS FOR AUTOMATED TESTING ONLY
if os.getenv("GW_STATUS") == "dev":
 indexes = pd.read_csv("indexes.csv").to_dict("records")[:1]
 harvest_indexes(query, indexes, output_dir="test")

 index = indexes[0]
 slug = index["url"].strip("/").split("/")[-1]

 assert Path("test", f"{slug}.csv").exists()
 assert not pd.read_csv(Path("test", f"{slug}.csv")).empty

 shutil.rmtree("test")

----

Created by [Tim Sherratt](https://timsherratt.org/) for the [GLAM Workbench](https://glam-workbench.net/).