{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook shows the MEP quickstart sample, which also exists as a non-notebook version at:\n", "https://bitbucket.org/vitotap/python-spark-quickstart\n", "\n", "It shows how to use Spark (http://spark.apache.org/) for distributed processing on the PROBA-V Mission Exploitation Platform. (https://proba-v-mep.esa.int/) The sample intentionally implements a very simple computation: for each PROBA-V tile in a given bounding box and time range, a histogram is computed. The results are then summed and printed. Computation of the histograms runs in parallel.\n", "## First step: get file paths\n", "A catalog API is available to easily retrieve paths to PROBA-V files:\n", "https://readthedocs.org/projects/mep-catalogclient/" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['BioPar_ALB_BHV_V1_Tiles',\n", " 'BioPar_ALB_DHV_V1_Tiles',\n", " 'BioPar_ALBH_V1_Global',\n", " 'BioPar_BA_V1_Tiles',\n", " 'BioPar_DMP_Tiles',\n", " 'BioPar_DMP300_V1_Global',\n", " 'BioPar_FAPAR_V1_Tiles',\n", " 'BioPar_FAPAR_V1_Global',\n", " 'BioPar_FAPAR_V2_Global',\n", " 'BioPar_FCOVER_V1_Tiles',\n", " 'BioPar_FCOVER_V1_Global',\n", " 'BioPar_FCOVER_V2_Global',\n", " 'BioPar_LAI_V1_Tiles',\n", " 'BioPar_LAI_V1_Global',\n", " 'BioPar_LAI_V2_Global',\n", " 'BioPar_NDVI300_V1_Global',\n", " 'BioPar_NDVI300_V1_Global_GTIFF',\n", " 'BioPar_BA300_V1_Global',\n", " 'BioPar_FCOVER300_V1_Global',\n", " 'BioPar_FAPAR300_V1_Global',\n", " 'BioPar_LAI300_V1_Global',\n", " 'BioPar_NDVI_V1_Tiles',\n", " 'BioPar_NDVI_V2_Tiles',\n", " 'BioPar_NDVI_V2_Global',\n", " 'BioPar_SWI',\n", " 'BioPar_SWI10_V3_Global',\n", " 'BioPar_TOCR_Tiles',\n", " 'BioPar_VCI_Tiles',\n", " 'BioPar_VPI_Tiles',\n", " 'BioPar_WB_V1_Tiles',\n", " 'BioPar_WB_V2_Tiles',\n", " 'BioPar_WB_V2_Global',\n", " 'BioPar_WB300_V1_Global',\n", " 'PROBAV_L3_S1_TOC_1KM',\n", " 'PROBAV_L3_S1_TOC_333M',\n", " 'PROBAV_L3_S10_TOC_333M',\n", " 'PROBAV_L3_S5_TOC_100M',\n", " 'PROBAV_L3_S1_TOC_100M',\n", " 'PROBAV_L3_S10_TOC_1KM',\n", " 'PROBAV_L3_S1_TOA_1KM',\n", " 'PROBAV_L3_S1_TOA_333M',\n", " 'PROBAV_L3_S5_TOA_100M',\n", " 'PROBAV_L3_S1_TOA_100M',\n", " 'PROBAV_L1C',\n", " 'PROBAV_L2A_1KM',\n", " 'PROBAV_L2A_333M',\n", " 'PROBAV_L2A_100M',\n", " 'PROBAV_L3_S10_TOC_NDVI_1KM',\n", " 'PROBAV_L3_S10_TOC_NDVI_333M',\n", " 'PROBAV_L3_S1_TOC_NDVI_100M',\n", " 'PROBAV_L3_S5_TOC_NDVI_100M',\n", " 'PROBAV_L2A_1KM_ANTAR',\n", " 'PROBAV_L2A_333M_ANTAR',\n", " 'PROBAV_L2A_100M_ANTAR',\n", " 'CGS_S2_FAPAR',\n", " 'CGS_S2_FAPAR_10M',\n", " 'CGS_S2_FAPAR_20M',\n", " 'CGS_S2_NDVI',\n", " 'CGS_S2_NDVI_10M',\n", " 'CGS_S2_LAI',\n", " 'CGS_S2_LAI_10M',\n", " 'CGS_S2_LAI_20M',\n", " 'CGS_S2_FCOVER',\n", " 'CGS_S2_FCOVER_10M',\n", " 'CGS_S2_FCOVER_20M',\n", " 'CGS_S2_CCC_10M',\n", " 'CGS_S2_CCC_20M',\n", " 'CGS_S2_CWC_10M',\n", " 'CGS_S2_CWC_20M',\n", " 'CGS_S2_RADIOMETRY',\n", " 'CGS_S2_RADIOMETRY_10M',\n", " 'CGS_S2_RADIOMETRY_20M',\n", " 'CGS_S2_RADIOMETRY_60M',\n", " 'CGS_S1_GRD_SIGMA0_L1',\n", " 'CGS_S1_SLC_L1',\n", " 'CGS_S1_GRD_L1',\n", " 'NEXTGEOSS_SENTINEL2_FAPAR',\n", " 'NEXTGEOSS_SENTINEL2_NDVI',\n", " 'NEXTGEOSS_SENTINEL2_LAI',\n", " 'NEXTGEOSS_SENTINEL2_FCOVER',\n", " 'NEXTGEOSS_SENTINEL2_RADIOMETRY',\n", " 'FSTEP_SENTINEL2_FAPAR',\n", " 'FSTEP_SENTINEL2_NDVI',\n", " 'FSTEP_SENTINEL2_LAI',\n", " 'FSTEP_SENTINEL2_FCOVER',\n", " 'FSTEP_SENTINEL2_RADIOMETRY',\n", " 'SPOTVEGETATION_L3_S1',\n", " 'SPOTVEGETATION_L3_S10']" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from catalogclient import catalog\n", "cat=catalog.Catalog()\n", "cat.get_producttypes()" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 2 files.\n", "/data/MTDA/TIFFDERIVED/PROBAV_L3_S1_TOC_333M/2016/20160101/PROBAV_S1_TOC_20160101_333M_V101/PROBAV_S1_TOC_X18Y02_20160101_333M_V101_NDVI.tif\n", "/data/MTDA/TIFFDERIVED/PROBAV_L3_S1_TOC_333M/2016/20160101/PROBAV_S1_TOC_20160101_333M_V101/PROBAV_S1_TOC_X18Y02_20160101_333M_V101_NDVI.tif: TIFF image data, little-endian\n" ] } ], "source": [ "date = \"2016-01-01\"\n", "products = cat.get_products('PROBAV_L3_S1_TOC_333M', \n", " fileformat='GEOTIFF', \n", " startdate=date, \n", " enddate=date, \n", " min_lon=0, max_lon=10, min_lat=36, max_lat=53)\n", "#extract NDVI geotiff files from product metadata\n", "files = [p.file('NDVI')[5:] for p in products]\n", "print('Found '+str(len(files)) + ' files.')\n", "print(files[0])\n", "#check if file exists\n", "!file {files[0]}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Second step: define function to apply\n", "Define the histogram function, this can also be done inline, which allows for a faster feedback loop when writing the code, but here we want to clearly separate the processing 'algorithm' from the parallelization code." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Calculates the histogram for a given (single band) image file.\n", "def histogram(image_file):\n", " \n", " import numpy as np\n", " import gdal\n", " \n", " \n", " # Open image file\n", " img = gdal.Open(image_file)\n", " \n", " if img is None:\n", " print( '-ERROR- Unable to open image file \"%s\"' % image_file )\n", " \n", " # Open raster band (first band)\n", " raster = img.GetRasterBand(1) \n", " xSize = img.RasterXSize\n", " ySize = img.RasterYSize\n", " \n", " # Read raster data\n", " data = raster.ReadAsArray(0, 0, xSize, ySize)\n", " \n", " # Calculate histogram\n", " hist, _ = np.histogram(data, bins=256)\n", " return hist\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Third step: setup Spark\n", "To work on the processing cluster, we need to specify the resources we want:\n", "\n", "* spark.executor.cores: Number of cores per executor. Usually our tasks are single threaded, so 1 is a good default.\n", "* spark.executor.memory: memory to assign per executor. For the Java/Spark processing, not the Python part.\n", "* spark.yarn.executor.memoryOverhead: memory available for Python in each executor.\n", "\n", "We set up the SparkConf with these parameters, and create a SparkContext sc, which will be our access point to the cluster." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 225 ms, sys: 15.4 ms, total: 240 ms\n", "Wall time: 15.4 s\n" ] } ], "source": [ "%%time\n", "# ================================================================\n", "# === Calculate the histogram for a given number of files. The ===\n", "# === processing is performed by spreading them over a cluster ===\n", "# === of Spark nodes. ===\n", "# ================================================================\n", "\n", "from datetime import datetime\n", "from operator import add\n", "import pyspark\n", "import os\n", "# Setup the Spark cluster\n", "conf = pyspark.SparkConf()\n", "conf.set('spark.yarn.executor.memoryOverhead', 512)\n", "conf.set('spark.executor.memory', '512m')\n", "\n", "sc = pyspark.SparkContext(conf=conf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fourth step: compute histograms\n", "We use a couple of Spark functions to run our job on the cluster. Comments are provided in the code." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "%%time\n", "# Distribute the local file list over the cluster.\n", "filesRDD = sc.parallelize(files,len(files))\n", "\n", "# Apply the 'histogram' function to each filename using 'map', keep the result in memory using 'cache'.\n", "hists = filesRDD.map(histogram).cache()\n", "\n", "count = hists.count()\n", "\n", "# Combine distributed histograms into a single result\n", "total = list(hists.reduce(lambda h, i: map(add, h, i)))\n", "hists.unpersist()\n", "\n", "print( \"Sum of %i histograms: %s\" % (count, total))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#stop spark session if we no longer need it\n", "sc.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fifth step: plot our result\n", "Plot the array of values as a simple line chart using matplotlib. This is the most basic Python library. More advanced options such as bokeh, mpld3 and seaborn are also available." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "import matplotlib.pyplot as plt\n", "plt.plot(total)\n", "plt.show()" ] } ], "metadata": { "kernelspec": { "display_name": "python3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 4 }