""" TODO: - Modularize code - Improve documentation - Create test cases """ from datetime import datetime import glob from multiprocessing import Pool import json import logging import os import sqlite3 import zipfile from zoneinfo import ZoneInfo import pandas as pd """ TODO: Look into whether polars CSV parsing has improved since 2025-06-19. I encountered a bunch of issues with properly parsing StatCan CSVs """ import polars as pl import requests from tqdm import tqdm data_folder = "/data/tables" input_folder = f"{data_folder}/input" scratch_folder = f"{data_folder}/scratch" output_folder = f"{data_folder}/output" """ TODO: Do logging properly """ import logging logging.basicConfig( filename="processing.log", encoding="utf-8", filemode="a", format="{asctime} - {levelname} - {message}", style="{", datefmt="%Y-%m-%d %H:%M", ) """ TODO: Do this properly. These are the requirements: - Lightweight way for users to keep track what StatCan tables they have downloaded, and for them to update the table when new data is available """ if not os.path.exists(f"{data_folder}/processing.db"): con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0) cur = con.cursor() cur.executescript(""" CREATE TABLE IF NOT EXISTS downloaded ( product_id TEXT PRIMARY KEY, last_updated TEXT, last_processed TEXT ); CREATE TABLE IF NOT EXISTS cubes ( product_id TEXT PRIMARY KEY, last_updated TEXT ); """) con.commit() else: con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0) cur = con.cursor() def setup(): """ Makes data folders """ folders_to_create = [data_folder, input_folder, scratch_folder, output_folder, f"{input_folder}/en", f"{output_folder}/en", f"{input_folder}/fr", f"{output_folder}/fr", f"{input_folder}/metadata"] for folder in folders_to_create: if not os.path.exists(folder): print(f"Making folder {folder}") os.mkdir(folder) def update_last_downloaded(product_id): """ Updates SQLite database with the last time the table was updated The datetime is in Eastern timezone, so have to convert to UTC to be consistent with https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite """ filepath = f"{input_folder}/metadata/{product_id}.json" print(f"Reading metadata {filepath}") with open(filepath, 'r') as fp: metadata = json.load(fp) product_id = metadata.get("object").get("productId") last_updated = metadata.get("object").get("releaseTime") # Convert last_updated to UTC since /getAllcubesListLite uses UTC last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M") last_updated = last_updated.replace(tzinfo=ZoneInfo("America/Toronto")) last_updated = last_updated.astimezone(ZoneInfo("UTC")).isoformat() data = (product_id, last_updated) cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,)) result = cur.fetchone() if not result: cur.execute("INSERT INTO downloaded (product_id, last_updated) VALUES (?, ?)", data) else: cur.execute("UPDATE downloaded SET last_updated = ? WHERE product_id = ?", (last_updated, product_id)) con.commit() def update_last_processed(product_id): time_finished_processing = datetime.now().isoformat() cur.execute("UPDATE downloaded SET last_processed = ? WHERE product_id = ?", (time_finished_processing, product_id)) con.commit() def update_tables(): """ This currently does not work as expected because Statistics Canada has discrepancies. The "releaseTime" listed in https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite for every pdocutId is not the same as "releaseTime" listed when making a POST https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata , for example: [{"productId":10100007}] """ cur.execute(""" DELETE FROM cubes; """) con.commit() response = requests.get("https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite").json() cubes_metadata = pl.from_dicts(response)[['productId', 'releaseTime']] cubes_metadata = cubes_metadata.rename({"productId": "product_id", "releaseTime": "last_updated"}) cubes_metadata = cubes_metadata.rows() cubes_metadata_new = [] for cube in cubes_metadata: product_id, last_updated = cube # Update the date field so it is formatted the same as date field in downloaded table last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M:%SZ").astimezone(ZoneInfo("UTC")) last_updated = last_updated.isoformat() cubes_metadata_new.append((product_id, last_updated)) cur.executemany("INSERT INTO cubes VALUES(?, ?)", cubes_metadata_new) con.commit() cur.execute(""" SELECT a.product_id FROM downloaded AS a, cubes AS b WHERE a.product_id = b.product_id AND b.last_updated > a.last_updated """) results = cur.fetchall() for result in results: product_id = result[0] print(f"Updating product_id: {product_id}") download_cube(product_id) process_cube(product_id) def compute_ref_date_bounds(df): """ TODO: - Completely rewrite this, AI helpmed me figure out this logic as I had no experience with dates """ """ - There are cases where the REF_DATE is a range, ex. 2023/2024. For productId 17100022 the period is from 2023-07-01 to 2024-06-30 (seen in the metadata), so we cannot just use 2023-01-01 and 2024-12-31 """ series = df["REF_DATE"] # Initialize the two new columns with NaT df["REF_START_DATE"] = pd.NaT df["REF_END_DATE"] = pd.NaT # Skip rows that contain slashes valid_mask = ~series.str.contains("/", na=False) # Case 1: YYYY-MM-DD full_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}-\d{2}") parsed_full = pd.to_datetime(series[full_mask], format="%Y-%m-%d", errors="coerce") df.loc[full_mask, "REF_START_DATE"] = parsed_full df.loc[full_mask, "REF_END_DATE"] = parsed_full # Case 2: YYYY-MM month_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}") parsed_month = pd.to_datetime(series[month_mask], format="%Y-%m", errors="coerce") df.loc[month_mask, "REF_START_DATE"] = parsed_month df.loc[month_mask, "REF_END_DATE"] = parsed_month + pd.to_timedelta( parsed_month.dt.days_in_month - 1, unit='D' ) # Case 3: YYYY year_mask = valid_mask & series.str.fullmatch(r"\d{4}") parsed_year = pd.to_datetime(series[year_mask], format="%Y", errors="coerce") df.loc[year_mask, "REF_START_DATE"] = parsed_year df.loc[year_mask, "REF_END_DATE"] = parsed_year + pd.offsets.YearEnd(0) # Move columns after REF_DATE ref_idx = df.columns.get_loc("REF_DATE") cols = list(df.columns) cols.remove("REF_START_DATE") cols.remove("REF_END_DATE") cols[ref_idx + 1:ref_idx + 1] = ["REF_START_DATE", "REF_END_DATE"] return df[cols] def convert_to_lowest_type(df): """ Convert columns to the best possible dtypes For example, if the column is numerical and has a maximum value of 32,000 we can assign it a type of int16 """ dtypes = pd.DataFrame(df.dtypes) # Downcast to the smallest numerical dtype for row in dtypes.itertuples(): column = row[0] the_type = str(row[1]) if the_type == 'Int64': df[column] = pd.to_numeric(df[column], downcast='integer') return df def extract_zipfile(product_id, language): """ It is faster to extract the zip file and read the CSV, than open via zipfile and then Pandas """ zip_file = f"{input_folder}/{language}/{product_id}.zip" with zipfile.ZipFile(zip_file) as myzip: print(f"Extracting {zip_file} to {scratch_folder}") myzip.extractall(path=scratch_folder) def get_cube_metadata(product_id): url = f"https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata" cubes_payload = [{"productId": product_id}] result = requests.post(url, json=cubes_payload) result = result.json()[0] return result def download_cube(product_id, language="en"): """ Downloads the English CSV for a specific table """ download_url = f"https://www150.statcan.gc.ca/t1/wds/rest/getFullTableDownloadCSV/{product_id}/en" response = requests.get(download_url).json() zip_url = response['object'] zip_file_name = f"{input_folder}/{language}/{product_id}.zip" print(f"Downloading {zip_url} to {zip_file_name}") response = requests.get(zip_url, stream=True, headers={"user-agent": None}) progress_bar = tqdm( desc=zip_file_name, total=int(response.headers.get("content-length", 0)), unit="B", unit_scale=True ) with open(zip_file_name, "wb") as handle: for chunk in response.iter_content(chunk_size=512): if chunk: # filter out keep-alive new chunks handle.write(chunk) progress_bar.update(len(chunk)) progress_bar.close() def cleanup_product(product_id): """ Remove the scratch files for a given productId """ print(f"Removing scratch files for productId {product_id}") os.remove(f"{scratch_folder}/{product_id}.csv") os.remove(f"{scratch_folder}/{product_id}_MetaData.csv") def process_cube(product_id, language="en"): """ Examples: - productId 43100011 has all with DECIMAL = 1 (float64) - productId 17100009 has DECIMAL = 0 (int64) - productId 35100076 has multiple DECIMAL precisions [0, 1, 2] (int64, float64, float64) - The duplicate column issue just happens with two column names in all data products ("Value", "VALUE", and "Status", "STATUS") - productId 10100164 has two columns named the same "Value" and "VALUE". DuckDB treats column names in a case insensitve manner, so "Value" and "VALUE" are the same. So we will need to rename "Value" to "Value.1" - productId 13100902 has two columns named the same "Status" and "STATUS". We will need to rename "Status" to "STATUS" - productId 13100442 has 18 fields, but 19 fields were seen in line 162 - There are cases where the "DECIMALS" column does not exist in the CSV. productId 98100001 is one example. In this case, we do let the .read_csv method guess the data types """ cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,)) result = cur.fetchone() if result: print(f"Already processed {product_id}") return extract_zipfile(product_id, language) """ The pandas column reader is better than the Polars one Here is an example where polars was not reading it right: https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip """ # Get metadata #metadata_file = f"{input_folder}/metadata/{product_id}.json" #metadata = get_cube_metadata(product_id) #print(f"Writing metadata file {metadata_file}") #with open(metadata_file, "w") as outfile: # json.dump(metadata, outfile) # Read CSV using Pandas product_csv = f"{scratch_folder}/{product_id}.csv" print(f"Reading {product_csv}") parameters = { "filepath_or_buffer": product_csv, "engine": "c", #"nrows": 100000, "dtype": {} } columns = pd.read_csv(product_csv, nrows=0).columns columns_to_rename = ['Value', 'Status'] for column in columns_to_rename: if column in columns: print(f"Renaming '{column}' to '{column}.1'") columns = [f'{column}.1' if x == column else x for x in columns] # Explicitly tell pandas to not read column names from CSV parameters["header"] = 0 parameters["names"] = columns columns_always_int_8 = ["DECIMALS", "SCALAR_ID"] for column in columns_always_int_8: if column in columns: parameters["dtype"][column] = 'int8' columns_always_int_16 = ["UOM_ID"] for column in columns_always_int_16: if column in columns: parameters["dtype"][column] = 'int16' # REF_DATE, GEO, DGUID should always be string columns_always_string = ["REF_DATE", "GEO", "DGUID"] for column in columns_always_string: if column in columns: parameters["dtype"][column] = 'string' # The remaining columns should be string, with the exception of VALUE # Added "DECIMAL" check as there can be numeric columns that are not the VALUE column if "DECIMALS" in columns: for column in columns: if column not in columns_always_int_8 and column not in columns_always_int_16 and column != "VALUE": parameters["dtype"][column] = 'string' if not parameters["dtype"]: del parameters["dtype"] print(f"Reading {product_csv} as a Pandas dataframe") try: df = pd.read_csv(**parameters) except Exception: logging.error(f"Failed to process productId: {product_id}") cleanup_product(product_id) return if "DECIMALS" in columns: unique_decimal_values = df["DECIMALS"].unique() #print(unique_decimal_values) if any(unique_decimal_values): """ A table can have both float and integer in the VALUE field. productId 11100025 is an example So if we have unique values for DECIMALS to be [0,1], then we convert to float64 """ convert_dict = {"VALUE": "float64"} #print(convert_dict) df = df.astype(convert_dict) elif 0 in (unique_decimal_values): if df["VALUE"].dtype != "Int64": # If DECIMALS = [0] convert_dict = {"VALUE": "Int64"} #print(convert_dict) df = df.astype(convert_dict) else: parameters = { "convert_string": True, "convert_boolean": False } #print("DECIMALS not in columns, using .convert_dtypes") df = df.convert_dtypes(**parameters) df = convert_to_lowest_type(df) df = compute_ref_date_bounds(df) output_parquet = f"{output_folder}/{language}/{product_id}.parquet" print(f"Exporting dataframe as parquet to {output_parquet}") parameters = { "path": output_parquet, "engine": "pyarrow", "compression": "zstd", "index": False, "compression_level": 22 } df.to_parquet(**parameters) # Remove scratch files cleanup_product(product_id) update_last_downloaded(product_id) update_last_processed(product_id) if __name__ == '__main__': setup() files_to_process = glob.glob(f"{input_folder}/en/*.zip") # Get the product_id files_to_process = [x.split("/")[-1].split(".zip")[0] for x in files_to_process] to_process = len(files_to_process) print(f"Processing {to_process}") with Pool(processes=2) as p: p.map(process_cube, files_to_process, chunksize=1)