{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "

Processing HDF5 files with Spark

\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook demonstrates accessing and processing HDF5 files\n", "***Author: Prasanth Kothuri*** \n", "***Contact: Prasanth Kothuri*** \n", " \n", "To run this notebook we used the following configuration:\n", "* *Software stack*: LCG_96 Python3\n", "* *Platform*: centos8-gcct\n", "* *Spark Cluster: cloud containers" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2.9.0\n" ] } ], "source": [ "# Ensure h5py package is available\n", "import os\n", "import h5py, io\n", "\n", "print (h5py.__version__)\n", "\n", "prefix = \"root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/\"\n", "HDF5Files = [\n", " \"1541897545335000000_162_1.h5\",\n", " \"1541902534935000000_163_1.h5\",\n", " \"1541902534935000000_163_1.h5\"\n", "]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# map to process HDF5 files\n", "def extractHDF5(hdf5file):\n", " prefix = hdf5file[0]\n", " content = hdf5file[1]\n", " f=h5py.File(io.BytesIO(content))\n", " return hdf5file[0],int(f['AwakeEventInfo']['configurationVersionNumber'][()]),int(f['AwakeEventInfo']['eventNumber'][()]),int(f['AwakeEventInfo']['runNumber'][()]),int(f['AwakeEventInfo']['timestamp'][()])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/1541897545335000000_162_1.h5', 'root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/1541902534935000000_163_1.h5', 'root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/1541902534935000000_163_1.h5']\n" ] } ], "source": [ "# build list of files\n", "files = []\n", "for HDF5File in HDF5Files:\n", " files.append(prefix + HDF5File)\n", "print(files)\n", "\n", "# RDD representing tuples of file path and corresponding file content\n", "inputData = sc.binaryFiles(','.join(files))\n", "#inputData = sc.binaryFiles(\"root://eospublic.cern.ch//eos/experiment/awake/event_data/2018/11/11\")\n", "# Apply map function\n", "hdf5_reduced_collection = inputData.map(lambda x: extractHDF5(x)) \n", "# convert RDD to DF\n", "df = spark.createDataFrame(hdf5_reduced_collection).toDF(\"filename\", \"configurationVersionNumber\", \"eventNumber\", \"runNumber\", \"timestamp\")" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# save to eos as csv \n", "try:\n", " df.coalesce(numPartitions = 1) \\\n", " .write \\\n", " .option(key = \"header\", value = \"true\") \\\n", " .option(key = \"sep\", value = \",\") \\\n", " .option(key = \"encoding\", value = \"UTF-8\") \\\n", " .option(key = \"compresion\", value = \"none\") \\\n", " .mode(saveMode = \"OVERWRITE\") \\\n", " .csv(path = \"root://eosuser.cern.ch//eos/user/p/pkothuri/result/\") \n", "\n", "except Exception as e:\n", " # There is a bug in xrootd-connector and we can ignore it\n", " if \"ch.cern.eos.XRootDFileSystem.delete\" in str(e):\n", " pass\n", " else:\n", " raise Exception(e)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "4\n" ] } ], "source": [ "# and reading back\n", "csvOutput = spark.read.csv(\"root://eosuser.cern.ch//eos/user/p/pkothuri/result/\")\n", "print csvOutput.count()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.16" }, "sparkconnect": { "bundled_options": [], "list_of_options": [] } }, "nbformat": 4, "nbformat_minor": 2 }