{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# DC2 object Run1.1p Apache Spark tutorial -- Part I: Apache Spark access\n", "\n", "Author: **Julien Peloton** [@JulienPeloton](https://github.com/JulienPeloton) \n", "Based on the work of: **Francois Lanusse [@EiffL](https://github.com/LSSTDESC/DC2-analysis/issues/new?body=@EiffL), Javier Sanchez [@fjaviersanchez](https://github.com/LSSTDESC/DC2-analysis/issues/new?body=@fjaviersanchez)** using GCR. \n", "Last Verifed to Run: **2018-10-22**\n", "\n", "This notebook will illustrate the basics of accessing the merged object catalogs through Apache Spark as well as how to select useful samples of stars/galaxies from the dpdd catalogs (from DM outputs). It follows the same steps (and sometimes same documentation) as in this [notebook](https://github.com/LSSTDESC/DC2-analysis/blob/master/tutorials/object_gcr_1_intro.ipynb) which uses the Generic Catalog Reader ([GCR](https://github.com/yymao/generic-catalog-reader)).\n", "\n", "__Learning objectives__:\n", "\n", "After going through this notebook, you should be able to:\n", " 1. Load and efficiently access a DC2 object catalog with Apache Spark\n", " 2. Understand and have references for the catalog schema\n", " 3. Apply cuts to the catalog using Spark SQL functionalities\n", " 4. Have an example of quality cuts and simple star/galaxy separation cut\n", " 5. Distribute the computation and the routine to plot to be faster!\n", "\n", "__Logistics__: This notebook is intended to be run through the JupyterHub NERSC [interface](https://jupyter-dev.nersc.gov) with the `desc-pyspark` kernel. The kernel is automatically installed in your environment when you use the kernel setup script:\n", "\n", "```\n", "source /global/common/software/lsst/common/miniconda/kernels/setup.sh\n", "```\n", "\n", "For more information see [LSSTDESC/jupyter-kernels](https://github.com/LSSTDESC/jupyter-kernels). Note that a general introduction and tutorials for Apache Spark can be found at [astrolabsoftware/spark-tutorials](https://github.com/astrolabsoftware/spark-tutorials) (under construction).\n", "\n", "__Note concerning resources__\n", "\n", "```\n", "The large-memory login node used by https://jupyter-dev.nersc.gov/\n", "is a shared resource, so please be careful not to use too many CPUs\n", "or too much memory.\n", "\n", "That means avoid using `--master local[*]` in your kernel, but limit\n", "the resources to a few core. Typically `--master local[4]` is enough for\n", "prototyping a program.\n", "\n", "Then to scale the analysis, the best is to switch to batch mode! \n", "There, no limit!\n", "```\n", "\n", "This is already taken care for you in the Spark+DESC kernel setup script (from `desc-pyspark`), but keep this in mind if you use a custom kernel." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "\n", "import os\n", "\n", "import numpy as np\n", "import matplotlib.pyplot as plt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Accessing the object catalog with Apache Spark\n", "\n", "In this section, we illustrate how to use Apache Spark to access the object catalogs from DC2 Run1.1p.\n", "Let's initialise Spark and load the data into a DataFrame. We will focus on data stored in the `parquet` data format." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Where the dpdd data is stored\n", "base_dir = '/global/projecta/projectdirs/lsst/global/in2p3/Run1.1/summary'\n", "\n", "# Load one patch, all tracts\n", "datafile = os.path.join(base_dir, 'dpdd_object.parquet')\n", "print(\"Data will be read from: \\n\", datafile)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "# Initialise our Spark session\n", "spark = SparkSession.builder.getOrCreate()\n", "\n", "# Read the data as DataFrame\n", "df = spark.read.format(\"parquet\").load(datafile)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### DC2 Object catalog Schema\n", "\n", "\n", "To see the quantities available in the catalog, you can use the following:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Check what we have in the file\n", "df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The meaning of these fields follows the standard nomenclature of the LSST Data Products Definition Document [DPDD](http://ls.st/dpdd).\n", "\n", "The DPDD is an effort made by the LSST project to standardize the format of the official Data Release Products (DRP). While the native outputs of the DM stack are succeptible to change, the DPDD will be more stable. An early adoption of these conventions by the DESC will save time and energy down the road." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see that the catalog includes:\n", "\n", "* Positions\n", "* Fluxes and magnitudes (PSF and [CModel](https://www.sdss.org/dr12/algorithms/magnitudes/#cmodel))\n", "* Shapes (using [GalSim's HSM](http://galsim-developers.github.io/GalSim/namespacegalsim_1_1hsm.html))\n", "* Quality flags: e.g, does the source have any interpolated pixels? Has any of the measurement algorithms returned an error?\n", "* Other useful quantities: `blendedness`, measure of how flux is affected by neighbors: (1 - flux.child/flux.parent) (see 4.9.11 of [Bosch et al. 2018](http://adsabs.harvard.edu/abs/2018PASJ...70S...5B)); `extendedness`, classifies sources in extended and psf-like." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Accessing the data (taken from the original GCR notebook)\n", "\n", "While Run1.1p is still of manageable size, full DC2 will be much larger, accessing the whole data can be challenging. In order to access the data efficiently, it is important to understand how it is physically stored and how to access it, one piece at the time. \n", "\n", "\n", "The coadds produced by the DM stack are structured in terms of large `tracts` and smaller `patches`, illustrated here for DC2:\n", "\n", "Here the tracts have large blue numbers, and the patches are denoted with an `(x,y)` format. For DC2, each tract has 8x8 patches.\n", "\n", "You can learn more about how to make such a plot of the tract and patches [here](Plotting_the_Run1.1p_skymap.ipynb)\n", "\n", "Obviously Spark preserves the structure of the data so that any particular quantity can be accessed on a tract/patch bases. The tracts available in the catalog can be listed using the following command:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Show all available tracts\n", "df.select('tract').distinct().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The DM stack includes functionality to get the tract and patch number corresponding to a certain position `(RA,DEC)`. However, it is out of the scope of this tutorial." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Apache Spark provides `filter` mechanisms, which you can use to speed up data access if you only need a certain chunks of the dataset.\n", "For the object catalog, the chunks are broken into `tract` and `patch`, and hence those are the `filters` you can use:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Retrieve the ra,dec coordinates of all sources within tract number 4430\n", "data = df.select('ra', 'dec').where('tract == 4430').collect()\n", "\n", "# `collect` returns list of list[ra, dec], so for \n", "# plotting purpose we tranpose the output:\n", "ra, dec = np.transpose(data)\n", "\n", "# Plot a 2d histogram of sources\n", "plt.figure(figsize=(10,7))\n", "plt.hist2d(ra, dec, 100)\n", "plt.gca().set_aspect('equal')\n", "plt.colorbar(label='Number of objects')\n", "plt.xlabel('RA [deg]')\n", "plt.ylabel('dec [deg]');" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is interesting to note that there are several ways in Spark to use those filtering mechanisms" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Pure SQL**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Pure SQL\n", "cols = \"ra, dec\"\n", "\n", "# SQL - register first the DataFrame\n", "df.createOrReplaceTempView(\"full_tract\")\n", "\n", "# Keeps only columns with 0.0 < magerr_g < 0.3\n", "sql_command = \"\"\"\n", " SELECT {}\n", " FROM full_tract \n", " WHERE \n", " tract == 4430\n", "\"\"\".format(cols)\n", "\n", "# Execute the expression - return a DataFrame\n", "df_sub = spark.sql(sql_command)\n", "data = df_sub.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Spark DataFrame built-in methods**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Using select/where\n", "data = df.select('ra', 'dec').where('tract == 4430').collect()\n", "\n", "# Or using select/filter\n", "data = df.select('ra', 'dec').filter('tract == 4430').collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Data type**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The data returned by collecting a DataFrame (`collect`) in Spark is structured as a native Python list of `Row`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Data type is {}\".format(type(data[0])))\n", "print(\"Example: {}\".format(data[0]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But you can easily go back to standard list or numpy array (numpy methods do that for you most of the time) is needed:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "arow = data[0]\n", "\n", "# Explicit conversion\n", "mylist = list(arow)\n", "print(\"input type: {} / output type {}\".format(type(arow), type(mylist)))\n", "\n", "# Implicit conversion\n", "cols = np.transpose(arow)\n", "print(\"input type: {} / output type {}\".format(type(arow), type(cols)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Spark to Pandas DataFrame**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A Spark DataFrame can also easily be converted into a [Pandas DataFrame](https://pandas.pydata.org):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pdata = df.select('ra', 'dec').where('tract == 4430').toPandas()\n", "pdata" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Access time**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As a simple test, you can show the advantage of loading one tract at a time compared to the entire catalog:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_radec = df.select('ra', 'dec')\n", "%time data = df_radec.where('tract == 4430').collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time data = df_radec.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that we timed the `collect` action which is very specific (collecting data from the executors to the driver). In practice, we do not perform this action often (only at the very end of the pipeline, because it implies communication btw the machines). In Spark, we do most of the computation (including plot!) in the executors (distributed computation), and we collect the data once it is sufficiently reduced. Therefore, what matters more is the time to load the data and perform an action inside executors. The simplest one (but relevant though!) is to count the elements (O(n) complexity):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time data = df_radec.where('tract == 4430').count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time data = df_radec.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we are, super fast! Note that we loaded the data AND perform a simple action. So these benchmarks give you the IO overhead for this kind of catalogs. More about Apache Spark benchmarks can be found [here](https://github.com/LSSTDESC/DC2-production/pull/288)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Applying filters and cuts\n", "\n", "For more than one cut, this is all the same:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Simple cut to remove unreliable detections\n", "# More cuts can be added, as a logical AND, by appending GCRQuery objects to this list\n", "# good: \n", "# The source has no flagged pixels (interpolated, saturated, edge, clipped...) \n", "# and was not skipped by the deblender\n", "# tract == 4849:\n", "# Data only for tract 4849\n", "\n", "# Data after cut (DataFrame)\n", "df_cut = df_radec.where(\"tract == 4849 AND good\")\n", "\n", "# Data without cuts (DataFrame)\n", "df_full = df_radec.where(\"tract == 4849\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Number of sources before cut : {}\".format(df_full.count()))\n", "print(\"Number of sources after cut : {}\".format(df_cut.count()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Plotting the result - the standard way**\n", "\n", "The standard way means filtering data, collecting data, and plotting (e.g. you would do that in GCR). This can be written as:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Plot a 2d histogram of sources\n", "plt.figure(figsize=(15, 7))\n", "for index, dataframe in enumerate([df_full, df_cut]):\n", " ra, dec = np.transpose(dataframe.collect())\n", " plt.subplot(121 + index)\n", " (counts, xe, ye, Image) = plt.hist2d(ra, dec, 256); \n", " plt.gca().set_aspect('equal'); \n", " plt.xlabel('RA [deg]');\n", " plt.ylabel('dec [deg]');\n", " if index == 0:\n", " plt.title('Full sample');\n", " else:\n", " plt.title('Clean objects');\n", " plt.colorbar(label='Number of objects');" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now all of that works because you have only a small fraction of data. Let's imagine if you have TB of data, even after cuts. What would you do? GCR makes use of iterators. This is a workaround, but still not satisfactory as things are done _serially_. For TB of data it will work, but it will take forever. \n", "\n", "**Spark point of view: distribute the computation (and plot!)**\n", "\n", "The way to be faster is to distribute the plot (or the computation which leads to the data to be plotted).\n", "Histograms are particularly easy to distribute. Let's write a method to be apply on each Spark partition (each would contain only a fraction of the data):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def hist2d(partition, nbins=256, xyrange=None):\n", " \"\"\" Produce 2D histograms from (x, y) data\n", " \n", " Parameters\n", " ----------\n", " partition : Iterator\n", " Iterator containing partition data *[x, y].\n", " \n", " Returns\n", " ----------\n", " Generator yielding counts for each partition. \n", " Counts is an array of dimension nbins x nbins.\n", " \"\"\"\n", " # Unwrap the iterator\n", " radec = [*partition]\n", " ra, dec = np.transpose(radec)\n", " \n", " (counts, xedges, yedges, Image) = plt.hist2d(\n", " ra, dec, nbins, \n", " range=xyrange)\n", " \n", " yield counts\n", "\n", "# Min/Max values - just to make a nice plot\n", "xyrange = [[np.min(xe), np.max(xe)], [np.min(ye), np.max(ye)]]\n", "\n", "plt.figure(figsize=(15, 7))\n", "for index, dataframe in enumerate([df_full, df_cut]):\n", " plt.subplot(121 + index)\n", " # This is the crucial part - build the plot data in parallel!\n", " im = dataframe\\\n", " .rdd\\\n", " .mapPartitions(lambda partition: hist2d(partition, 256, xyrange))\\\n", " .reduce(lambda x, y: x+y)\n", " \n", " plt.imshow(im.T, origin='bottom', aspect='equal');\n", " plt.xlabel('RA [deg]');\n", " plt.ylabel('dec [deg]');\n", " if index == 0:\n", " plt.title('Full sample');\n", " else:\n", " plt.title('Clean objects');\n", " plt.colorbar(label='Number of objects');" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's time it and compare that to the previous method:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time data = df_cut.collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time data = df_cut.rdd.mapPartitions(lambda partition: hist2d(partition, xyrange)).reduce(lambda x, y: x+y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "User time is factor of 30 faster (and the volume of data is not big!), for the same total wall time. Definitely the way to go!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example of filtering: Star/galaxy separation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For now, we have `extendedness == base_ClassificationExtendedness_value` as a tool for star/galaxy classification. An object is considered extended if the the difference between the `PSF` magnitude and the [`CModel` magnitude](https://www.sdss.org/dr12/algorithms/magnitudes/#cmodel) is beyond certain threshold (0.0164). To know more about this see [Bosch et al. 2018](http://adsabs.harvard.edu/abs/2018PASJ...70S...5B) section 4.9.10\n", "\n", "**Note** JP writing: I couldn't find the counterpart of `{band}_modelfit_CModel_flux` in dpdd files. So just for the sake of the exercise, I used `magerr_{band}_cModel`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Subset of columns of interest\n", "cols = \"mag_g_cModel, mag_r_cModel, mag_i_cModel\"\n", "\n", "# SQL - register first the DataFrame\n", "df.createOrReplaceTempView(\"full_tract\")\n", "\n", "# Used to be g_modelfit_CModel_flux\n", "sql_command = \"\"\"\n", " SELECT {}\n", " FROM full_tract \n", " WHERE \n", " tract == 4849 AND\n", " clean AND\n", " magerr_g_cModel>0 AND\n", " magerr_g_cModel<1e16 AND\n", " magerr_r_cModel>0 AND\n", " magerr_r_cModel<1e16 AND\n", " magerr_i_cModel>0 AND\n", " magerr_i_cModel<1e16\n", "\"\"\".format(cols)\n", "\n", "# Execute the expression - return a DataFrame\n", "df_sub = spark.sql(sql_command)\n", "\n", "print(df_sub.count())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So now, we have selected what we think are stars. Let's take a look at the colors of these objects" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mag_g_cModel, mag_r_cModel, mag_i_cModel = np.transpose(df_sub.collect())\n", "plt.hist2d(mag_g_cModel - mag_r_cModel,\n", " mag_r_cModel - mag_i_cModel, \n", " bins=100,range=[(-1,2),(-1,2)]);\n", "plt.xlabel('$g-r$')\n", "plt.ylabel('$r-i$')\n", "plt.colorbar(label='Number of objects');" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "__More on Apache Spark to come!__" ] } ], "metadata": { "kernelspec": { "display_name": "desc-pyspark", "language": "python", "name": "desc-pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.2" } }, "nbformat": 4, "nbformat_minor": 2 }