{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating an mth5 file using Parallel HDF5" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This tutorial will examine how to build an mth5 file and write MT time series data for each station in a survey in parallel by utilising [h5py's Parallel HDF5](https://docs.h5py.org/en/stable/mpi.html) functionality. Parallel HDF5 allows users to open files across multiple parallel processes by utilising the Messaging Passing Interface (MPI) standard in [mpi4py](https://mpi4py.readthedocs.io/en/stable/).\n", "\n", "Note that in order to use Parallel HDF5, both HDF5 and h5py must be compiled with MPI support turned on.\n", "\n", "For this example, we will be converting Earth Data Logger (ASCII) time series data from 93 stations of the AusLAMP Musgraves Province survey (see https://dx.doi.org/10.25914/5eaa30d63bd17). The ASCII time series were concatenated per run for each station and the total volume of time series data was 106 GB. 90 of the 93 stations were a single run - stations SA246, SA299 and SA324-2 had multiple runs. \n", "\n", "This example was tested on the [National Computational Infrastructure's Gadi HPC system](https://nci.org.au/our-systems/hpc-systems). Gadi is Australia’s most powerful supercomputer, a highly parallel cluster comprising more than 200,000 processor cores on ten different types of compute nodes. The example also makes use of the [NCI-geophysics 2022.06 module](https://opus.nci.org.au/x/1wG7CQ) which contains Parallel HDF5.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Building an mth5 skeleton" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To build our mth5 file requires a two step process:\n", " 1. create an mth5 skeleton\n", " 2. populate the skeleton with the time series data in parallel using Parallel HDF5.\n", "\n", "Let's start by building the mth5 skeleton script which requires the following libraries:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from mth5.mth5 import MTH5\n", "import numpy as np\n", "from os import path\n", "import os\n", "import glob\n", "import time\n", "\n", "from mt_metadata.utils.mttime import MTime\n", "\n", "startTime = time.time()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we can define our working directories and file paths which will need to be changed for your use case:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### directory on Gadi file system that contains merged ASCII time series per station run\n", "work_dir = '/g/data/.../.../merged_data_all'\n", "\n", "### full path to the concatenated Earth Data Logger ASCII time series files\n", "full_path_to_files = sorted(glob.glob(work_dir+\"/*\"))\n", "\n", "### directory on Gadi to write final mth5 file to\n", "mth5_test_dir = '/g/data/.../.../mth5_outdir'\n", "\n", "### name of the mth5 file\n", "hdf5_filename = 'example_mth5_file.h5'\n", "\n", "### full path to the mth5 file\n", "h5_fn = mth5_test_dir+'/'+hdf5_filename" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can define:\n", "1. the Earth Data Logger channels \n", "2. the stations in our survey \n", "3. the survey name \n", "4. the run number for stations with a single run" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### define raw time series data channels from Earth Data Logger\n", "\n", "raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']\n", "\n", "\n", "### define stations to go into mth5 file\n", "\n", "stations_all = ['SA225-2','SA227', 'SA242', 'SA243', 'SA245',\n", " 'SA247', 'SA248', 'SA249', 'SA250', 'SA251', \n", " 'SA252', 'SA26W-2', 'SA270', 'SA271', 'SA272', \n", " 'SA273', 'SA274-2', 'SA274', 'SA275', 'SA276',\n", " 'SA277', 'SA293-2', 'SA294', 'SA295', 'SA296', \n", " 'SA297', 'SA298', 'SA300', 'SA301', 'SA319', \n", " 'SA320', 'SA320-2', 'SA321', 'SA322', 'SA323', \n", " 'SA324', 'SA325-2', 'SA325', 'SA326N', 'SA326S',\n", " 'SA344', 'SA344-2', 'SA345', 'SA346', 'SA347', \n", " 'SA348', 'SA349', 'SA350', 'SA351', ### 49 single run SA stations\n", " 'WA10', 'WA13', 'WA14', 'WA15', 'WA26',\n", " 'WA27', 'WA29', 'WA30', 'WA31', 'WA42',\n", " 'WA43', 'WA44', 'WA45', 'WA46', 'WA47',\n", " 'WA54', 'WA55', 'WA56', 'WA57', 'WA58',\n", " 'WA60', 'WA61', 'WA62', 'WA63', 'WA64',\n", " 'WA65', 'WA66', 'WA67', 'WA68', 'WA69',\n", " 'WA70', 'WA71', 'WA72', 'WA73', 'WA74',\n", " 'WA75', 'WANT19', 'WANT38', 'WANT45', 'WASA302', \n", " 'WASA327', ### 41 single run WA stations \n", " 'SA246', 'SA299', 'SA324-2'] ### 3 stations with multiple runs\n", " \n", " \n", "### define survey name and run number (for stations with a single run)\n", "\n", "survey_name = \"AusLAMP_Musgraves\"\n", "run_number = \"001\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will define some functions that will be used to create our MTH5 skeleton:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def make_mth5_dir(mth5_output_directory): \n", "### creates mth5 output directory if it doesn't already exist\n", " try:\n", " os.makedirs(mth5_output_directory)\n", " except FileExistsError:\n", " # directory already exists\n", " print('directory already exists!')\n", " pass\n", "\n", "\n", "def remove_existing_mth5_file(mth5_file):\n", "### removes existing mth5 file if it exists\n", " if path.exists(mth5_file):\n", " os.unlink(mth5_file)\n", " print(\"INFO: Removed existing file {mth5_file}\")\n", " else:\n", " print(\"File does not exist\")\n", "\n", "\n", "def channel_line_count(channels):\n", "### counts lines in electromagnetic ASCII files\n", " EX = [file for file in channels if file.endswith('EX')]\n", " EY = [file for file in channels if file.endswith('EY')]\n", " BX = [file for file in channels if file.endswith('BX')]\n", " BY = [file for file in channels if file.endswith('BY')]\n", " BZ = [file for file in channels if file.endswith('BZ')]\n", "\n", " count_ex = sum(1 for line in open(EX[0]))\n", " count_ey = sum(1 for line in open(EY[0]))\n", " count_bx = sum(1 for line in open(BX[0]))\n", " count_by = sum(1 for line in open(BY[0]))\n", " count_bz = sum(1 for line in open(BZ[0]))\n", "\n", " return count_ex, count_ey, count_bx, count_by, count_bz\n", "\n", "\n", "def create_mth5_group_station_run_channel(station):\n", "### creates the mth5 gropus, stations, runs and channels for the mth5 skeleton\n", " add_station = m.add_station(station, survey=survey_name)\n", " channels = []\n", " for file in full_path_to_files:\n", " if station in file:\n", " channels.append(file)\n", " else:\n", " continue \n", "### for stations with a single run:\n", " if len(channels) == len(raw_data_channels): \n", " add_run = m.add_run(station, run_number, survey=survey_name)\n", "\n", " count_ex,count_ey,count_bx,count_by,count_bz = channel_line_count(channels)\n", " \n", " ex_zeros = np.zeros(count_ex,)\n", " ey_zeros = np.zeros(count_ey,)\n", " bx_zeros = np.zeros(count_bx,)\n", " by_zeros = np.zeros(count_by,)\n", " bz_zeros = np.zeros(count_bz,)\n", " \n", " ex = m.add_channel(station, run_number, \"ex\", \"electric\", ex_zeros, survey=survey_name)\n", " ey = m.add_channel(station, run_number, \"ey\", \"electric\", ey_zeros, survey=survey_name)\n", " bx = m.add_channel(station, run_number, \"bx\", \"magnetic\", bx_zeros, survey=survey_name)\n", " by = m.add_channel(station, run_number, \"by\", \"magnetic\", by_zeros, survey=survey_name)\n", " bz = m.add_channel(station, run_number, \"bz\", \"magnetic\", bz_zeros, survey=survey_name)\n", " \n", " #######################################################################################\n", " # Note: At the time of writing this example, the resizing of datasets caused h5py #\n", " # parallel to fail when running using the mpio driver. A workaround was to create #\n", " # 'zeros' arrays of size count_ (see above). # #\n", " # #\n", " # ex = m.add_channel(station, run_number, \"ex\", \"electric\", None, survey=survey_name) #\n", " # ey = m.add_channel(station, run_number, \"ey\", \"electric\", None, survey=survey_name) #\n", " # bx = m.add_channel(station, run_number, \"bx\", \"magnetic\", None, survey=survey_name) #\n", " # by = m.add_channel(station, run_number, \"by\", \"magnetic\", None, survey=survey_name) #\n", " # bz = m.add_channel(station, run_number, \"bz\", \"magnetic\", None, survey=survey_name) #\n", " # #\n", " # ex.hdf5_dataset.resize((count_ex,)) #\n", " # ey.hdf5_dataset.resize((count_ey,)) #\n", " # bx.hdf5_dataset.resize((count_bx,)) #\n", " # by.hdf5_dataset.resize((count_by,)) # \n", " # bz.hdf5_dataset.resize((count_bz,)) #\n", " #######################################################################################\n", "\n", "### for stations with multiple runs: \n", " elif len(channels) > len(raw_data_channels):\n", " sort_files = sorted(channels)\n", " number_of_channels = len(raw_data_channels)\n", " split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]\n", " for i,group in enumerate(split_lists):\n", " mrun_number = i+1\n", " run = \"00%i\" % mrun_number\n", " add_run = m.add_run(station, run, survey=survey_name)\n", " count_ex,count_ey,count_bx,count_by,count_bz = channel_line_count(group)\n", " ex_zeros = np.zeros(count_ex,)\n", " ey_zeros = np.zeros(count_ey,)\n", " bx_zeros = np.zeros(count_bx,)\n", " by_zeros = np.zeros(count_by,)\n", " bz_zeros = np.zeros(count_bz,)\n", "\n", " ex = m.add_channel(station, run, \"ex\", \"electric\", ex_zeros, survey=survey_name)\n", " ey = m.add_channel(station, run, \"ey\", \"electric\", ey_zeros, survey=survey_name)\n", " bx = m.add_channel(station, run, \"bx\", \"magnetic\", bx_zeros, survey=survey_name)\n", " by = m.add_channel(station, run, \"by\", \"magnetic\", by_zeros, survey=survey_name)\n", " bz = m.add_channel(station, run, \"bz\", \"magnetic\", bz_zeros, survey=survey_name)\n", "\n", " elif len(channels) < len(raw_data_channels):\n", " print('you are likely missing some channels')\n", " print(station)\n", "\n", " else:\n", " print('something has gone wrong')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The final step is to create our mth5 skeleton. Note that the Parallel HDF5 version used in this example does not support compression, so compression was turned off when generating the MTH5 skeleton:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### create mth5 directory (if it doesn't already exist)\n", "make_mth5_dir(mth5_test_dir)\n", "\n", "### remove any existing mth5 file in our mth5 directory\n", "remove_existing_mth5_file(h5_fn)\n", "\n", "start = MTime()\n", "start.now()\n", "\n", "### ensure compression is turned off\n", "m = MTH5(file_version='0.2.0',shuffle=None,fletcher32=None,compression=None,compression_opts=None)\n", "\n", "### open mth5 file in write mode\n", "m.open_mth5(h5_fn, \"w\")\n", "\n", "### add survey group\n", "survey_group = m.add_survey(survey_name)\n", "\n", "### create station, run and channel groups for all stations in our survey\n", "for station in sorted(stations_all):\n", " create_mth5_group_station_run_channel(station)\n", "m.close_mth5()\n", "\n", "### print total time to run our mth5 skeleton script\n", "print('The script took {0} seconds !'.format(time.time()-startTime))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Putting this all together into a Python script (`mth5_skeleton.py`):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from mth5.mth5 import MTH5\n", "import numpy as np\n", "from os import path\n", "import os\n", "import glob\n", "import time\n", "\n", "from mt_metadata.utils.mttime import MTime\n", "\n", "\n", "startTime = time.time()\n", "\n", "\n", "### define working directories and file paths\n", "\n", "work_dir = '/g/data/.../.../merged_data_all'\n", "mth5_test_dir = '/g/data/.../.../mth5_outdir'\n", "hdf5_filename = 'example_mth5_file.h5'\n", "h5_fn = mth5_test_dir+'/'+hdf5_filename\n", "full_path_to_files = sorted(glob.glob(work_dir+\"/*\"))\n", "\n", "\n", "### define raw time series data channels\n", "\n", "raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']\n", "\n", "\n", "### define stations to go into mth5 file\n", "\n", "stations_all = ['SA225-2','SA227', 'SA242', 'SA243', 'SA245',\n", " 'SA247', 'SA248', 'SA249', 'SA250', 'SA251', \n", " 'SA252', 'SA26W-2', 'SA270', 'SA271', 'SA272', \n", " 'SA273', 'SA274-2', 'SA274', 'SA275', 'SA276',\n", " 'SA277', 'SA293-2', 'SA294', 'SA295', 'SA296', \n", " 'SA297', 'SA298', 'SA300', 'SA301', 'SA319', \n", " 'SA320', 'SA320-2', 'SA321', 'SA322', 'SA323', \n", " 'SA324', 'SA325-2', 'SA325', 'SA326N', 'SA326S',\n", " 'SA344', 'SA344-2', 'SA345', 'SA346', 'SA347', \n", " 'SA348', 'SA349', 'SA350', 'SA351', ### 49 single run SA stations\n", " 'WA10', 'WA13', 'WA14', 'WA15', 'WA26',\n", " 'WA27', 'WA29', 'WA30', 'WA31', 'WA42',\n", " 'WA43', 'WA44', 'WA45', 'WA46', 'WA47',\n", " 'WA54', 'WA55', 'WA56', 'WA57', 'WA58',\n", " 'WA60', 'WA61', 'WA62', 'WA63', 'WA64',\n", " 'WA65', 'WA66', 'WA67', 'WA68', 'WA69',\n", " 'WA70', 'WA71', 'WA72', 'WA73', 'WA74',\n", " 'WA75', 'WANT19', 'WANT38', 'WANT45', 'WASA302', \n", " 'WASA327', ### 41 single run WA stations \n", " 'SA246', 'SA299', 'SA324-2'] ### 3 stations with multiple runs\n", " \n", " \n", "### define survey name and run number (for stations with a single run)\n", "\n", "survey_name = \"AusLAMP_Musgraves\"\n", "run_number = \"001\"\n", "\n", "### define functions\n", "\n", "def make_mth5_dir(mth5_output_directory): \n", "### creates mth5 output directory if it doesn't already exist\n", " try:\n", " os.makedirs(mth5_output_directory)\n", " except FileExistsError:\n", " # directory already exists\n", " print('directory already exists!')\n", " pass\n", "\n", "\n", "def remove_existing_mth5_file(mth5_file):\n", "### removes existing mth5 file if it exists\n", " if path.exists(mth5_file):\n", " os.unlink(mth5_file)\n", " print(\"INFO: Removed existing file {mth5_file}\")\n", " else:\n", " print(\"File does not exist\")\n", "\n", "\n", "def channel_line_count(channels):\n", "### counts lines in electromagnetic ASCII files\n", " EX = [file for file in channels if file.endswith('EX')]\n", " EY = [file for file in channels if file.endswith('EY')]\n", " BX = [file for file in channels if file.endswith('BX')]\n", " BY = [file for file in channels if file.endswith('BY')]\n", " BZ = [file for file in channels if file.endswith('BZ')]\n", "\n", " count_ex = sum(1 for line in open(EX[0]))\n", " count_ey = sum(1 for line in open(EY[0]))\n", " count_bx = sum(1 for line in open(BX[0]))\n", " count_by = sum(1 for line in open(BY[0]))\n", " count_bz = sum(1 for line in open(BZ[0]))\n", "\n", " return count_ex, count_ey, count_bx, count_by, count_bz\n", "\n", "\n", "def create_mth5_group_station_run_channel(station):\n", "### creates the mth5 gropus, stations, runs and channels for the mth5 skeleton\n", " add_station = m.add_station(station, survey=survey_name)\n", " channels = []\n", " for file in full_path_to_files:\n", " if station in file:\n", " channels.append(file)\n", " else:\n", " continue \n", "### for stations with a single run:\n", " if len(channels) == len(raw_data_channels): \n", " add_run = m.add_run(station, run_number, survey=survey_name)\n", "\n", " count_ex,count_ey,count_bx,count_by,count_bz = channel_line_count(channels)\n", " \n", " ex_zeros = np.zeros(count_ex,)\n", " ey_zeros = np.zeros(count_ey,)\n", " bx_zeros = np.zeros(count_bx,)\n", " by_zeros = np.zeros(count_by,)\n", " bz_zeros = np.zeros(count_bz,)\n", " \n", " ex = m.add_channel(station, run_number, \"ex\", \"electric\", ex_zeros, survey=survey_name)\n", " ey = m.add_channel(station, run_number, \"ey\", \"electric\", ey_zeros, survey=survey_name)\n", " bx = m.add_channel(station, run_number, \"bx\", \"magnetic\", bx_zeros, survey=survey_name)\n", " by = m.add_channel(station, run_number, \"by\", \"magnetic\", by_zeros, survey=survey_name)\n", " bz = m.add_channel(station, run_number, \"bz\", \"magnetic\", bz_zeros, survey=survey_name)\n", " \n", " #######################################################################################\n", " # Note: At the time of writing this example, the resizing of datasets caused h5py #\n", " # parallel to fail when running using the mpio driver. A workaround was to create #\n", " # 'zeros' arrays of size count_ (see above). # #\n", " # #\n", " # ex = m.add_channel(station, run_number, \"ex\", \"electric\", None, survey=survey_name) #\n", " # ey = m.add_channel(station, run_number, \"ey\", \"electric\", None, survey=survey_name) #\n", " # bx = m.add_channel(station, run_number, \"bx\", \"magnetic\", None, survey=survey_name) #\n", " # by = m.add_channel(station, run_number, \"by\", \"magnetic\", None, survey=survey_name) #\n", " # bz = m.add_channel(station, run_number, \"bz\", \"magnetic\", None, survey=survey_name) #\n", " # #\n", " # ex.hdf5_dataset.resize((count_ex,)) #\n", " # ey.hdf5_dataset.resize((count_ey,)) #\n", " # bx.hdf5_dataset.resize((count_bx,)) #\n", " # by.hdf5_dataset.resize((count_by,)) # \n", " # bz.hdf5_dataset.resize((count_bz,)) #\n", " #######################################################################################\n", "\n", "### for stations with multiple runs: \n", " elif len(channels) > len(raw_data_channels):\n", " sort_files = sorted(channels)\n", " number_of_channels = len(raw_data_channels)\n", " split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]\n", " for i,group in enumerate(split_lists):\n", " mrun_number = i+1\n", " run = \"00%i\" % mrun_number\n", " add_run = m.add_run(station, run, survey=survey_name)\n", " count_ex,count_ey,count_bx,count_by,count_bz = channel_line_count(group)\n", " ex_zeros = np.zeros(count_ex,)\n", " ey_zeros = np.zeros(count_ey,)\n", " bx_zeros = np.zeros(count_bx,)\n", " by_zeros = np.zeros(count_by,)\n", " bz_zeros = np.zeros(count_bz,)\n", "\n", " ex = m.add_channel(station, run, \"ex\", \"electric\", ex_zeros, survey=survey_name)\n", " ey = m.add_channel(station, run, \"ey\", \"electric\", ey_zeros, survey=survey_name)\n", " bx = m.add_channel(station, run, \"bx\", \"magnetic\", bx_zeros, survey=survey_name)\n", " by = m.add_channel(station, run, \"by\", \"magnetic\", by_zeros, survey=survey_name)\n", " bz = m.add_channel(station, run, \"bz\", \"magnetic\", bz_zeros, survey=survey_name)\n", "\n", " elif len(channels) < len(raw_data_channels):\n", " print('you are likely missing some channels')\n", " print(station)\n", "\n", " else:\n", " print('something has gone wrong')\n", "\n", "\n", "### create mth5 file skeleton \n", "\n", "make_mth5_dir(mth5_test_dir)\n", "remove_existing_mth5_file(h5_fn)\n", "start = MTime()\n", "start.now()\n", "\n", "m = MTH5(file_version='0.2.0',shuffle=None,fletcher32=None,compression=None,compression_opts=None)\n", "\n", "m.open_mth5(h5_fn, \"w\")\n", "survey_group = m.add_survey(survey_name)\n", "for station in sorted(stations_all):\n", " create_mth5_group_station_run_channel(station)\n", "m.close_mth5()\n", "\n", "### print total time to run script\n", "\n", "print('The script took {0} seconds !'.format(time.time()-startTime))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For the mth5 skeleton, we have populated a single mth5 file with all station groups in our survey. No actual time series data has been added yet (this will happen in parallel in the next steps). It is important to note that this process is a `collective process` that modifies the structure of an HDF5 file and all processes must participate. It is for this reason that mth5_skeleton.py can not be run in parallel when generating a single mth5 file for all stations (see `Collective versus independent operations` in https://docs.h5py.org/en/stable/mpi.html). \n", "\n", "The following is a summary of how long mth5_skeleton.py took to run for the AusLAMP Musgraves Province survey.\n", "\n", "```\n", "run on: Gadi\n", "number of stations: 93\n", "number of runs: 101\n", "NCPUs used: 1\n", "memory used: 30GB\n", "walltime used: 1264 seconds (21m04s)\n", "CPU time used: 1162 seconds (19m22s)\n", "service units: 5.27\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Populating our mth5 skeleton with time series data in parallel" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have created our mth5 skeleton, it's time to populate the time time series data from each station in our survey using Parallel HDF5. \n", "\n", "First we will import the required Python libraries. The important libraries are [mpi4py](https://mpi4py.readthedocs.io/en/stable/) and [h5py](https://www.h5py.org/)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from mpi4py import MPI\n", "import h5py\n", "import os, psutil\n", "import glob\n", "import numpy as np\n", "import shutil\n", "import sys\n", "from os import path\n", "import time\n", "\n", "startTime = time.time()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we will define the MPI communicator, rank and size:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "comm = MPI.COMM_WORLD\n", "rank = MPI.COMM_WORLD.rank\n", "size = MPI.COMM_WORLD.size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once again, we will define our: \n", "1. working directories and file paths\n", "2. Earth Data Logger channels\n", "3. survey stations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### directory on Gadi file system that contains merged ASCII time series per station per run\n", "work_dir = '/g/data/.../.../merged_data_all'\n", "\n", "### directory to write final mth5 file\n", "mth5_test_dir = '/g/data/.../.../mth5_outdir'\n", "\n", "### name of mth5 file\n", "hdf5_filename = 'example_mth5_file.h5'\n", "\n", "### full path to mth5 file\n", "h5_fn = mth5_test_dir+'/'+hdf5_filename\n", "\n", "### full path to concatenated Earth Data Logger ASCII time series files\n", "full_path_files = sorted(glob.glob(work_dir+\"/*\"))\n", "\n", "### raw data channels\n", "raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']\n", "\n", "### MT stations to go into mth5 file\n", "stations_all = ['SA225-2','SA227', 'SA242', 'SA243', 'SA245',\n", " 'SA247', 'SA248', 'SA249', 'SA250', 'SA251',\n", " 'SA252', 'SA26W-2', 'SA270', 'SA271', 'SA272',\n", " 'SA273', 'SA274-2', 'SA274', 'SA275', 'SA276',\n", " 'SA277', 'SA293-2', 'SA294', 'SA295', 'SA296',\n", " 'SA297', 'SA298', 'SA300', 'SA301', 'SA319',\n", " 'SA320', 'SA320-2', 'SA321', 'SA322', 'SA323',\n", " 'SA324', 'SA325-2', 'SA325', 'SA326N', 'SA326S',\n", " 'SA344', 'SA344-2', 'SA345', 'SA346', 'SA347',\n", " 'SA348', 'SA349', 'SA350', 'SA351', ### 49 single run SA stations\n", " 'WA10', 'WA13', 'WA14', 'WA15', 'WA26',\n", " 'WA27', 'WA29', 'WA30', 'WA31', 'WA42',\n", " 'WA43', 'WA44', 'WA45', 'WA46', 'WA47',\n", " 'WA54', 'WA55', 'WA56', 'WA57', 'WA58',\n", " 'WA60', 'WA61', 'WA62', 'WA63', 'WA64',\n", " 'WA65', 'WA66', 'WA67', 'WA68', 'WA69',\n", " 'WA70', 'WA71', 'WA72', 'WA73', 'WA74',\n", " 'WA75', 'WANT19', 'WANT38', 'WANT45', 'WASA302'\n", " 'WASA327', ### 41 single run WA stations \n", " 'SA246', 'SA299', 'SA324-2'] ### 3 stations with multiple runs\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's define the functions that will be used to populate our MTH5 skeleton with our Earth Data Logger time series data:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def channel_data_extraction(channels):\n", "### extracts electromagnetic time series data from concatenated (per run) Earth Data Logger ASCII time series files\n", " EX = [file for file in channels if file.endswith('EX')]\n", " EY = [file for file in channels if file.endswith('EY')]\n", " BX = [file for file in channels if file.endswith('BX')]\n", " BY = [file for file in channels if file.endswith('BY')]\n", " BZ = [file for file in channels if file.endswith('BZ')]\n", " with open(EX[0], 'r') as file:\n", " EX1 = file.read().splitlines()\n", " ex = np.array(EX1).astype(np.int32)\n", " with open(EY[0], 'r') as file:\n", " EY1 = file.read().splitlines()\n", " ey = np.array(EY1).astype(np.int32)\n", " with open(BX[0], 'r') as file:\n", " BX1 = file.read().splitlines()\n", " bx = np.array(BX1).astype(np.int32)\n", " with open(BY[0], 'r') as file:\n", " BY1 = file.read().splitlines()\n", " by = np.array(BY1).astype(np.int32)\n", " with open(BZ[0], 'r') as file:\n", " BZ1 = file.read().splitlines()\n", " bz = np.array(BZ1).astype(np.int32)\n", "\n", " return ex, ey, bx, by, bz\n", "\n", "\n", "def write_channels(list_of_stations,full_path_to_files):\n", "### writes time series data to the mth5 skeleton file in an embarrisingly parallel way - that is,\n", "### each rank is dedicated to writing data from a single station. \n", " for i,station in enumerate(sorted(list_of_stations)):\n", " if i%size!=rank:\n", " continue\n", " channels = []\n", " for file in full_path_to_files:\n", " if station in file:\n", " channels.append(file)\n", " else:\n", " continue\n", "\n", "### for stations with a single run: \n", " if len(channels) == len(raw_data_channels): \n", " run = '001'\n", " station_group = 'Experiment/Surveys/AusLAMP_Musgraves/Stations/'\n", " site_run_path = station_group+station+'/'+run\n", " channel_ex_path = site_run_path+'/'+'ex'\n", " channel_ey_path = site_run_path+'/'+'ey'\n", " channel_bx_path = site_run_path+'/'+'bx'\n", " channel_by_path = site_run_path+'/'+'by'\n", " channel_bz_path = site_run_path+'/'+'bz'\n", "\n", " channel_ex = f[channel_ex_path]\n", " channel_ey = f[channel_ey_path]\n", " channel_bx = f[channel_bx_path]\n", " channel_by = f[channel_by_path]\n", " channel_bz = f[channel_bz_path]\n", " \n", " ex,ey,bx,by,bz = channel_data_extraction(channels)\n", " \n", " channel_ex[:] = ex\n", " channel_ey[:] = ey\n", " channel_bx[:] = bx\n", " channel_by[:] = by\n", " channel_bz[:] = bz\n", "\n", " process = psutil.Process(os.getpid())\n", " print('this run took %d MB of memory' % (process.memory_info().rss / 1024 ** 2))\n", " print(\"Station number %d (%s) being done by processor %d of %d\" % (i, station, rank, size))\n", "\n", "### for stations with multiple runs: \n", " elif len(channels) > len(raw_data_channels):\n", " sort_files = sorted(channels)\n", " number_of_channels = len(raw_data_channels)\n", " split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]\n", " for i,group in enumerate(split_lists):\n", " mrun_number = i+1\n", " run = \"00%i\" % mrun_number\n", " station_group = 'Experiment/Surveys/AusLAMP_Musgraves/Stations/'\n", " site_run_path = station_group+station+'/'+run\n", " channel_ex_path = site_run_path+'/'+'ex'\n", " channel_ey_path = site_run_path+'/'+'ey'\n", " channel_bx_path = site_run_path+'/'+'bx'\n", " channel_by_path = site_run_path+'/'+'by'\n", " channel_bz_path = site_run_path+'/'+'bz'\n", "\n", " channel_ex = f[channel_ex_path]\n", " channel_ey = f[channel_ey_path]\n", " channel_bx = f[channel_bx_path]\n", " channel_by = f[channel_by_path]\n", " channel_bz = f[channel_bz_path]\n", "\n", " ex,ey,bx,by,bz = channel_data_extraction(group)\n", "\n", " channel_ex[:] = ex\n", " channel_ey[:] = ey\n", " channel_bx[:] = bx\n", " channel_by[:] = by\n", " channel_bz[:] = bz\n", "\n", " process = psutil.Process(os.getpid())\n", " print('this run took %d MB of memory' % (process.memory_info().rss / 1024 ** 2))\n", " print(\"Station number %d (%s) being done by processor %d of %d\" % (i, station, rank, size))\n", "\n", " elif len(channels) < len(raw_data_channels):\n", " print('you are likely missing some channels')\n", "\n", " else:\n", " print('something has gone wrong')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we need to open our MTH5 skeleton in `append` mode utilising the `mpio` driver. We can then write our time series channels for all our stations in parallel and close the file:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### write to mth5 file in parallel using the mpio driver\n", "with h5py.File(h5_fn,'a',driver='mpio',comm=MPI.COMM_WORLD) as f:\n", " write_channels(stations_all,full_path_files)\n", "f.close()\n", "\n", "### print total time for script to run\n", "if rank==0:\n", " print('The script took {0} seconds !'.format(time.time()-startTime))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Putting this all together into a Python script (`mth5_muscle.py`):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from mpi4py import MPI\n", "import h5py\n", "import os, psutil\n", "import glob\n", "import numpy as np\n", "import shutil\n", "import sys\n", "from os import path\n", "import time\n", "\n", "startTime = time.time()\n", "\n", "\n", "### define MPI comm, rank and size\n", "\n", "comm = MPI.COMM_WORLD\n", "rank = MPI.COMM_WORLD.rank\n", "size = MPI.COMM_WORLD.size\n", "\n", "\n", "### define working directories, hdf5 file name and the full path to the ASCII MT time series files\n", "\n", "work_dir = '/g/data/.../.../merged_data_all'\n", "mth5_test_dir = '/g/data/.../.../mth5_outdir'\n", "hdf5_filename = 'example_mth5_file.h5'\n", "h5_fn = mth5_test_dir+'/'+hdf5_filename\n", "full_path_files = sorted(glob.glob(work_dir+\"/*\"))\n", "\n", "\n", "### define raw data channels\n", "\n", "raw_data_channels = ['EX','EY','BX','BY','BZ','TP','ambientTemperature']\n", "\n", "\n", "### define MT stations to go into mth5 file\n", "\n", "stations_all = ['SA225-2','SA227', 'SA242', 'SA243', 'SA245',\n", " 'SA247', 'SA248', 'SA249', 'SA250', 'SA251',\n", " 'SA252', 'SA26W-2', 'SA270', 'SA271', 'SA272',\n", " 'SA273', 'SA274-2', 'SA274', 'SA275', 'SA276',\n", " 'SA277', 'SA293-2', 'SA294', 'SA295', 'SA296',\n", " 'SA297', 'SA298', 'SA300', 'SA301', 'SA319',\n", " 'SA320', 'SA320-2', 'SA321', 'SA322', 'SA323',\n", " 'SA324', 'SA325-2', 'SA325', 'SA326N', 'SA326S',\n", " 'SA344', 'SA344-2', 'SA345', 'SA346', 'SA347',\n", " 'SA348', 'SA349', 'SA350', 'SA351', ### 49 single run SA stations\n", " 'WA10', 'WA13', 'WA14', 'WA15', 'WA26',\n", " 'WA27', 'WA29', 'WA30', 'WA31', 'WA42',\n", " 'WA43', 'WA44', 'WA45', 'WA46', 'WA47',\n", " 'WA54', 'WA55', 'WA56', 'WA57', 'WA58',\n", " 'WA60', 'WA61', 'WA62', 'WA63', 'WA64',\n", " 'WA65', 'WA66', 'WA67', 'WA68', 'WA69',\n", " 'WA70', 'WA71', 'WA72', 'WA73', 'WA74',\n", " 'WA75', 'WANT19', 'WANT38', 'WANT45', 'WASA302'\n", " 'WASA327', ### 41 single run WA stations \n", " 'SA246', 'SA299', 'SA324-2'] ### 3 stations with multiple runs\n", "\n", "\n", "### define functions\n", "\n", "def channel_data_extraction(channels):\n", " EX = [file for file in channels if file.endswith('EX')]\n", " EY = [file for file in channels if file.endswith('EY')]\n", " BX = [file for file in channels if file.endswith('BX')]\n", " BY = [file for file in channels if file.endswith('BY')]\n", " BZ = [file for file in channels if file.endswith('BZ')]\n", " with open(EX[0], 'r') as file:\n", " EX1 = file.read().splitlines()\n", " ex = np.array(EX1).astype(np.int32)\n", " with open(EY[0], 'r') as file:\n", " EY1 = file.read().splitlines()\n", " ey = np.array(EY1).astype(np.int32)\n", " with open(BX[0], 'r') as file:\n", " BX1 = file.read().splitlines()\n", " bx = np.array(BX1).astype(np.int32)\n", " with open(BY[0], 'r') as file:\n", " BY1 = file.read().splitlines()\n", " by = np.array(BY1).astype(np.int32)\n", " with open(BZ[0], 'r') as file:\n", " BZ1 = file.read().splitlines()\n", " bz = np.array(BZ1).astype(np.int32)\n", "\n", " return ex, ey, bx, by, bz\n", "\n", "def write_channels(list_of_stations,full_path_to_files):\n", " for i,station in enumerate(sorted(list_of_stations)):\n", " if i%size!=rank:\n", " continue\n", " channels = []\n", " for file in full_path_to_files:\n", " if station in file:\n", " channels.append(file)\n", " else:\n", " continue\n", " if len(channels) == len(raw_data_channels): \n", " run = '001'\n", " station_group = 'Experiment/Surveys/AusLAMP_Musgraves/Stations/'\n", " site_run_path = station_group+station+'/'+run\n", " channel_ex_path = site_run_path+'/'+'ex'\n", " channel_ey_path = site_run_path+'/'+'ey'\n", " channel_bx_path = site_run_path+'/'+'bx'\n", " channel_by_path = site_run_path+'/'+'by'\n", " channel_bz_path = site_run_path+'/'+'bz'\n", "\n", " channel_ex = f[channel_ex_path]\n", " channel_ey = f[channel_ey_path]\n", " channel_bx = f[channel_bx_path]\n", " channel_by = f[channel_by_path]\n", " channel_bz = f[channel_bz_path]\n", " \n", " ex,ey,bx,by,bz = channel_data_extraction(channels)\n", " \n", " channel_ex[:] = ex\n", " channel_ey[:] = ey\n", " channel_bx[:] = bx\n", " channel_by[:] = by\n", " channel_bz[:] = bz\n", "\n", " process = psutil.Process(os.getpid())\n", " print('this run took %d MB of memory' % (process.memory_info().rss / 1024 ** 2))\n", " print(\"Station number %d (%s) being done by processor %d of %d\" % (i, station, rank, size))\n", "\n", "\n", " elif len(channels) > len(raw_data_channels):\n", " sort_files = sorted(channels)\n", " number_of_channels = len(raw_data_channels)\n", " split_lists = [sort_files[x:x+number_of_channels] for x in range(0, len(sort_files), number_of_channels)]\n", " for i,group in enumerate(split_lists):\n", " mrun_number = i+1\n", " run = \"00%i\" % mrun_number\n", " station_group = 'Experiment/Surveys/AusLAMP_Musgraves/Stations/'\n", " site_run_path = station_group+station+'/'+run\n", " channel_ex_path = site_run_path+'/'+'ex'\n", " channel_ey_path = site_run_path+'/'+'ey'\n", " channel_bx_path = site_run_path+'/'+'bx'\n", " channel_by_path = site_run_path+'/'+'by'\n", " channel_bz_path = site_run_path+'/'+'bz'\n", "\n", " channel_ex = f[channel_ex_path]\n", " channel_ey = f[channel_ey_path]\n", " channel_bx = f[channel_bx_path]\n", " channel_by = f[channel_by_path]\n", " channel_bz = f[channel_bz_path]\n", "\n", " ex,ey,bx,by,bz = channel_data_extraction(group)\n", "\n", " channel_ex[:] = ex\n", " channel_ey[:] = ey\n", " channel_bx[:] = bx\n", " channel_by[:] = by\n", " channel_bz[:] = bz\n", "\n", " process = psutil.Process(os.getpid())\n", " print('this run took %d MB of memory' % (process.memory_info().rss / 1024 ** 2))\n", " print(\"Station number %d (%s) being done by processor %d of %d\" % (i, station, rank, size))\n", "\n", " elif len(channels) < len(raw_data_channels):\n", " print('you are likely missing some channels')\n", "\n", " else:\n", " print('something has gone wrong')\n", "\n", "\n", "### write to mth5 file in parallel using the mpio driver\n", "\n", "with h5py.File(h5_fn,'a',driver='mpio',comm=MPI.COMM_WORLD) as f:\n", " write_channels(stations_all,full_path_files)\n", "f.close()\n", "\n", "### print total time for script to run\n", "\n", "if rank==0:\n", " print('The script took {0} seconds !'.format(time.time()-startTime))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have created our `mth5_muscle.py` script, we next need to create a job submission script to submit to the Gadi PBSPro scheduler. The job submission script specifys the queue to use and the duration/resources needed for the job." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```\n", "#!/bin/bash\n", "\n", "#PBS -N mth5_mpi4py\n", "#PBS -q hugemem\n", "#PBS -P fp0\n", "#PBS -l walltime=0:30:00\n", "#PBS -l ncpus=96\n", "#PBS -l mem=1000GB\n", "#PBS -l jobfs=10GB\n", "#PBS -l storage=gdata/fp0+gdata/my80+gdata/lm70+gdata/up99\n", "\n", "module use /g/data/up99/modulefiles\n", "module load NCI-geophys/22.06 \n", "\n", "cd ${PBS_O_WORKDIR}\n", "\n", "mpirun -np $PBS_NCPUS python3 mth5_muscle.py > ./pbs_job_logs/$PBS_JOBID.log\n", "\n", "\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For our jobscript above (`mth5_muscle.sh`), we have requested:\n", "1. to use the [hugemem queue](https://opus.nci.org.au/x/1wBiBQ)\n", "2. 96 CPUs (2 nodes)\n", "3. 1 TB of memory\n", "4. half an hour of walltime\n", "\n", "We also need to define the NCI project codes used in `mth5_muscle.py`. The project code `up99` is required to make use of the NCI-geophys/22.06 module that contains Parallel HDF5.\n", "\n", "To submit this jobscript (`mth5_muscle.sh`) on Gadi:\n", "\n", "```\n", "$ qsub mth5_muscle.sh\n", "\n", "```\n", "\n", "This job will process the 93 stations in our survey using 96 MPI ranks (one station per rank)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following is a summary of how long `mth5_muscle.py` took to run for the AusLAMP Musgraves Province survey.\n", "\n", "```\n", "run on: Gadi\n", "number of stations: 93\n", "number of runs: 101\n", "NCPUs used: 96 (2 nodes)\n", "memory used: 736 GB\n", "walltime used: 836 seconds (13m56s)\n", "CPU time used: 48570 seconds (13h29m30s)\n", "service units: 66.88\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Concluding remarks" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Generating one mth5 file for many stations can take a significant amount of time if no parallelism is introduced. For the Musgraves example above, if using the mth5 library alone it would have taken approximately 14 hours to generate our final mth5 file. By utilising Parallel HDF5 we have managed to reduce this time to approximately 35 minutes.\n", "\n", "For the _\"all stations in a single mth5 file\"_ model, the bottleneck lies in generating the mth5 skeleton as this can't be done in parallel. The authors have created the tutorial [mth5_in_parallel_one_file_per_station](https://github.com/kujaku11/mth5/blob/master/examples/notebooks/mth5_in_parallel_one_file_per_station.ipynb) that shows how to generate a single mth5 file per station in parallel, which yields much quicker results than the _\"all stations in a single mth5 file\"_ model.\n", "\n", "This example only dealt with the writing of MT time series data and did not consider [mt_metadata](https://mt-metadata.readthedocs.io/en/latest/). The [mth5_in_parallel_one_file_per_station](https://github.com/kujaku11/mth5/blob/master/examples/notebooks/mth5_in_parallel_one_file_per_station.ipynb) tutorial demonstrates how one could add mt_metadata into their mth5 automations." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" } }, "nbformat": 4, "nbformat_minor": 4 }