{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Access ANMN CTD Mooring NRT data in Parquet\n", "\n", "A jupyter notebook to show how to access and plot ANMN Mooring NRT data available as a [Parquet](https://parquet.apache.org) dataset on S3" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "dataset_name = \"mooring_ctd_delayed_qc\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Install/Update packages and Load common functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# only run once, then restart session if needed\n", "!pip install uv\n", "\n", "import os\n", "import sys\n", "\n", "def is_colab():\n", " try:\n", " import google.colab\n", " return True\n", " except ImportError:\n", " return False\n", "\n", "# Get the current directory of the notebook\n", "current_dir = os.getcwd()\n", "\n", "# Check if requirements.txt exists in the current directory\n", "local_requirements = os.path.join(current_dir, 'requirements.txt')\n", "if os.path.exists(local_requirements):\n", " requirements_path = local_requirements\n", "else:\n", " # Fall back to the online requirements.txt file\n", " requirements_path = 'https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/notebooks/requirements.txt'\n", "\n", "# Install packages using uv and the determined requirements file\n", "if is_colab():\n", " os.system(f'uv pip install --system -r {requirements_path}')\n", "else:\n", " os.system('uv venv')\n", " os.system(f'uv pip install -r {requirements_path}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import requests\n", "import os\n", "if not os.path.exists('parquet_queries.py'):\n", " print('Downloading parquet_queries.py')\n", " url = 'https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/aodn_cloud_optimised/lib/ParquetDataQuery.py'\n", " response = requests.get(url)\n", " with open('parquet_queries.py', 'w') as f:\n", " f.write(response.text)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from parquet_queries import create_time_filter, create_bbox_filter, query_unique_value, plot_spatial_extent, get_spatial_extent, get_temporal_extent, get_schema_metadata\n", "import pyarrow.parquet as pq\n", "import pyarrow.dataset as pds\n", "import pyarrow as pa\n", "import os\n", "import pandas as pd\n", "import pyarrow.compute as pc" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Location of the parquet dataset" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "BUCKET_OPTIMISED_DEFAULT=\"imos-data-lab-optimised\"\n", "dname = f\"s3://{BUCKET_OPTIMISED_DEFAULT}/cloud_optimised/cluster_testing/{dataset_name}.parquet/\"\n", "parquet_ds = pq.ParquetDataset(dname,partitioning='hive')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Understanding the Dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get partition keys\n", "Partitioning in Parquet involves organising data files based on the values of one or more columns, known as partition keys. When data is written to Parquet files with partitioning enabled, the files are physically stored in a directory structure that reflects the partition keys. This directory structure makes it easier to retrieve and process specific subsets of data based on the partition keys." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "dataset = pds.dataset(dname, format=\"parquet\", partitioning=\"hive\")\n", "\n", "partition_keys = dataset.partitioning.schema\n", "print(partition_keys)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## List unique partition values" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "%%time\n", "unique_partition_value = query_unique_value(parquet_ds, 'site_code')\n", "print(list(unique_partition_value)[0:2]) # showing a subset only" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualise Spatial Extent of the dataset\n", "In this section, we're plotting the polygons where data exists. This helps then with creating a bounding box where there is data" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "plot_spatial_extent(parquet_ds)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get Temporal Extent of the dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similary to the spatial extent, we're retrieving the minimum and maximum timestamp partition values of the dataset. This is not necessarely accurately representative of the TIME values, as the timestamp partition can be yearly/monthly... but is here to give an idea" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "get_temporal_extent(parquet_ds)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read Metadata\n", "\n", "For all parquet dataset, we create a sidecar file in the root of the dataset named **_common_matadata**. This contains the variable attributes." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# parquet_meta = pa.parquet.read_schema(os.path.join(dname + '_common_metadata')) # parquet metadata\n", "metadata = get_schema_metadata(dname) # schema metadata\n", "metadata" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Query and Plot" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a TIME and BoundingBox filter" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "filter_time = create_time_filter(parquet_ds, date_start='2022-12-05', date_end='2022-12-15')\n", "filter_geo = create_bbox_filter(parquet_ds, lat_min=-34, lat_max=-28, lon_min=151, lon_max=160)\n", "\n", "\n", "filter = filter_geo & filter_time" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# using pandas instead of pyarrow so that filters can directly be applied to the data, and not just the partition\n", "df = pd.read_parquet(dname, engine='pyarrow',filters=filter)\n", "df.info()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a TIME and scalar/number filter" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "filter_time = create_time_filter(parquet_ds, date_start='2022-12-05', date_end='2022-12-15')\n", "filter_geo = create_bbox_filter(parquet_ds, lat_min=-34, lat_max=-28, lon_min=151, lon_max=160)\n", "\n", "expr_1 = pc.field('site_code') == pa.scalar(\"CH100\")\n", "filter = expr_1 & filter_time & filter_geo" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# using pandas instead of pyarrow so that filters can directly be applied to the data, and not just the partition\n", "df = pd.read_parquet(dname, engine='pyarrow',filters=filter)\n", "df.info()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "df[df['NOMINAL_DEPTH'] == 9]['site_code'].unique()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "df[(df['NOMINAL_DEPTH'] == 9) & (df['site_code']=='CH100')].plot.scatter(x='TEMP', y='PSAL', \n", " c='DENS', marker='+', linestyle=\"None\", cmap='RdYlBu_r', \n", " title='Temperature for each location',\n", " ylabel=metadata['PSAL']['standard_name'],\n", " xlabel=metadata['TEMP']['standard_name'])" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "df[(df['NOMINAL_DEPTH'] == 9) & (df['site_code']=='CH100')].sort_values('TIME').plot(y='TEMP', x='TIME',\n", " ylabel=metadata['TEMP']['standard_name'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Plot TS diagram - compare with quality control values" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Example A" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "df[(df['NOMINAL_DEPTH'] <= 20) ].plot.scatter(x='TEMP', y='PSAL', c='site_code', \n", " marker='+', linestyle=\"None\", cmap='RdYlBu_r', \n", " title='Temperature for each location',\n", " ylabel=metadata['PSAL']['standard_name'],\n", " xlabel=metadata['TEMP']['standard_name'])" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "df[(df['NOMINAL_DEPTH'] <= 20) & (df['PSAL_quality_control'] == 1) ].plot.scatter(x='TEMP', y='PSAL', c='site_code', \n", " marker='+', linestyle=\"None\", cmap='RdYlBu_r', \n", " alpha=0.8, title='Temperature for each location',\n", " ylabel=metadata['PSAL']['standard_name'],\n", " xlabel=metadata['TEMP']['standard_name'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example B" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "df[(df['site_code']=='CH100')].plot.scatter(x='TEMP', y='PSAL', c='NOMINAL_DEPTH', \n", " marker='+', linestyle=\"None\", cmap='RdYlBu_r', \n", " title='TS at CH100 at various nominal depths',\n", " ylabel=metadata['PSAL']['standard_name'],\n", " xlabel=metadata['TEMP']['standard_name'])" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "df[(df['site_code']=='CH100') & (df['PSAL_quality_control'] == 1)].plot.scatter(x='TEMP', y='PSAL', \n", " c='NOMINAL_DEPTH', marker='+', linestyle=\"None\", \n", " cmap='RdYlBu_r', \n", " title='TS at CH100 at various nominal depths',\n", " ylabel=metadata['PSAL']['standard_name'],\n", " xlabel=metadata['TEMP']['standard_name'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.6" } }, "nbformat": 4, "nbformat_minor": 4 }