{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipeline for the topology classifier with Apache Spark\n", "\n", "**2. Event Filtering and Feature Engineering** In this stage we prepare the input files for the three classifier models. Starting from the output of the previous stage (data ingestion) and producing the test and training datasets in Apache Parquet format.\n", "\n", "To run this notebook we used the following configuration:\n", "* *Software stack*: Spark 3.3.2\n", "* *Platform*: CentOS 7, Python 3.9\n", "* *Spark cluster*: Analytix" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "23/03/03 20:48:05 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.\n", "23/03/03 20:48:22 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!\n" ] } ], "source": [ "# No need to run this when using CERN SWAN service\n", "# Just add the configuration parameters for Spark on the \"star\" button integration\n", "\n", "# pip install pyspark or use your favorite way to set Spark Home, here we use findspark\n", "import findspark\n", "findspark.init('/home/luca/Spark/spark-3.3.2-bin-hadoop3') #set path to SPARK_HOME\n", "\n", "# Create Spark session and configure according to your environment\n", "from pyspark.sql import SparkSession\n", "\n", "spark = ( SparkSession.builder\n", " .appName(\"2-Feature Preparation\")\n", " .master(\"yarn\")\n", " .config(\"spark.driver.memory\",\"2g\")\n", " .config(\"spark.executor.memory\",\"64g\")\n", " .config(\"spark.executor.cores\",\"8\")\n", " .config(\"spark.dynamicAllocation.enabled\",\"true\")\n", " .config(\"spark.ui.showConsoleProgress\", \"false\")\n", " .getOrCreate()\n", " )\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.3.2
\n", "
Master
\n", "
yarn
\n", "
AppName
\n", "
2-Feature Preparation
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Check if Spark Session has been created correctly\n", "spark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This loads info a Spark dataframe the parquet files produced in the previous data ingestion step." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "There are 25468470 events\n" ] } ], "source": [ "# This is the input dataset. \n", "# It is the output of Step 1 - Data Ingestion\n", "# It is made available for CERN users on the Hadoop Analytix cluster\n", "dataset_path = \"hdfs://analytix/Training/Spark/TopologyClassifier/dataIngestion_full_13TeV_20190522\"\n", "\n", "data = ( spark.read \n", " .format(\"parquet\") \n", " .load(dataset_path)\n", " )\n", "\n", "events = data.count()\n", "print(\"There are {} events\".format(events))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also have a look at the distribution between classes after the filtering" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "There are:\n", "\t* 14265397 tt events (frac = 0.560)\n", "\t* 9776447 W+jets events (frac = 0.384)\n", "\t* 1426626 QCD events (frac = 0.056)\n" ] } ], "source": [ "labels = ['QCD', 'tt', 'W+jets']\n", "counts = data.groupBy('label').count().collect()\n", "\n", "qcd_events = 0\n", "tt_events = 0 \n", "wjets_events = 0\n", "\n", "print('There are:')\n", "for i in range(3):\n", " print('\\t* {} {} events (frac = {:.3f})'\n", " .format(\n", " counts[i][1],\n", " labels[counts[i].label],\n", " counts[i][1]*1.0/events\n", " ))\n", " if counts[i].label==0:\n", " qcd_events = counts[i][1]\n", " elif counts[i].label==1:\n", " tt_events = counts[i][1] \n", " elif counts[i].label==2:\n", " wjets_events = counts[i][1]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The dataset is imbalanced, we may need to undersample it.\n", "
\n", "## Feature preparation\n", "\n", "In the parquet produced in the previous step we have three columns:\n", "1. `hfeatures` containing the 14 High Level Features\n", "2. `lfeature` containing the Low Level Features (list of 801 particles each of them with 19 features)\n", "3. `label` identifying the sample" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- hfeatures: vector (nullable = true)\n", " |-- lfeatures: array (nullable = true)\n", " | |-- element: array (containsNull = true)\n", " | | |-- element: double (containsNull = true)\n", " |-- label: integer (nullable = true)\n", "\n" ] } ], "source": [ "data.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can begin by preparing the input for the HLF classifier which simply requires to scale features and encode the label. To use Spark `MinMaxScaler` we need to convert the input into `dense vectors`." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml.linalg import Vectors, VectorUDT\n", "from pyspark.sql.functions import udf\n", "\n", "vector_dense_udf = udf(lambda r : Vectors.dense(r),VectorUDT())\n", "data = data.withColumn('hfeatures_dense',vector_dense_udf('hfeatures'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now build the pipeline to scale HLF and encode labels" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 333 ms, sys: 264 ms, total: 597 ms\n", "Wall time: 31.3 s\n" ] } ], "source": [ "from pyspark.ml import Pipeline\n", "from pyspark.ml.feature import OneHotEncoder\n", "from pyspark.ml.feature import MinMaxScaler\n", "\n", "## One-Hot-Encode\n", "encoder = OneHotEncoder(inputCols=[\"label\"],\n", " outputCols=[\"encoded_label\"],\n", " dropLast=False)\n", "\n", "## Scale feature vector\n", "scaler = MinMaxScaler(inputCol=\"hfeatures_dense\",\n", " outputCol=\"HLF_input\")\n", "\n", "pipeline = Pipeline(stages=[encoder, scaler])\n", "\n", "%time fitted_pipeline = pipeline.fit(data)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# Apply the pipeline to data\n", "data = fitted_pipeline.transform(data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "New columns has been created, if we want to drop some of them we can use\n", "```Python \n", "data = data.drop(\"col-name\") \n", "```" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- hfeatures: vector (nullable = true)\n", " |-- lfeatures: array (nullable = true)\n", " | |-- element: array (containsNull = true)\n", " | | |-- element: double (containsNull = true)\n", " |-- label: integer (nullable = true)\n", " |-- hfeatures_dense: vector (nullable = true)\n", " |-- encoded_label: vector (nullable = true)\n", " |-- HLF_input: vector (nullable = true)\n", "\n" ] } ], "source": [ "data.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Moving on the particle-sequence classifier we need to sort the particles in each event by decreasing $\\Delta R$ distance from the isolated lepton, where $$\\Delta R = \\sqrt{\\Delta \\eta^2 + \\Delta \\phi^2}$$\n", "\n", "From the production of the low level features we know that the isolated lepton is the first particle of the list and the 19 features are \n", "```Python \n", "features = [\n", " 'Energy', 'Px', 'Py', 'Pz', 'Pt', 'Eta', 'Phi',\n", " 'vtxX', 'vtxY', 'vtxZ', 'ChPFIso', 'GammaPFIso', 'NeuPFIso', \n", " 'isChHad', 'isNeuHad', 'isGamma', 'isEle', 'isMu', 'Charge'\n", "]\n", "```\n", "therefore we need features 5 ($\\eta$) and 6 ($\\phi$) to compute $\\Delta R$. " ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "import math\n", "\n", "class lepAngularCoordinates():\n", " \"\"\"\n", " This class is used to store the lepton and compute DeltaR \n", " from the other particles\n", " \"\"\"\n", " def __init__(self, eta, phi):\n", " self.Eta = eta\n", " self.Phi = phi\n", " \n", " def DeltaR(self, eta, phi):\n", " deta = self.Eta - eta\n", " \n", " dphi = self.Phi - phi \n", " pi = math.pi\n", " while dphi > pi: dphi -= 2*pi\n", " while dphi < -pi: dphi += 2*pi\n", " \n", " return math.sqrt(deta*deta + dphi*dphi)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import ArrayType, DoubleType\n", "from sklearn.preprocessing import StandardScaler\n", "\n", "@udf(returnType=ArrayType(ArrayType(DoubleType())))\n", "def transform(particles):\n", " ## The isolated lepton is the first partiche in the list\n", " ISOlep = lepAngularCoordinates(particles[0][5], particles[0][6])\n", " \n", " ## Sort the particles based on the distance from the isolated lepton\n", " particles.sort(key = lambda part: ISOlep.DeltaR(part[5], part[6]),\n", " reverse=True)\n", " \n", " ## Standardize\n", " particles = StandardScaler().fit_transform(particles).tolist()\n", " \n", " return particles" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "data = data.withColumn('GRU_input', transform('lfeatures'))" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- hfeatures: vector (nullable = true)\n", " |-- lfeatures: array (nullable = true)\n", " | |-- element: array (containsNull = true)\n", " | | |-- element: double (containsNull = true)\n", " |-- label: integer (nullable = true)\n", " |-- hfeatures_dense: vector (nullable = true)\n", " |-- encoded_label: vector (nullable = true)\n", " |-- HLF_input: vector (nullable = true)\n", " |-- GRU_input: array (nullable = true)\n", " | |-- element: array (containsNull = true)\n", " | | |-- element: double (containsNull = true)\n", "\n" ] } ], "source": [ "data.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Undersample the dataset" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "qcd = data.filter('label=0')\n", "tt = data.filter('label=1')\n", "wjets = data.filter('label=2')" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "# Create the undersampled dataframes\n", "# False means to sample without repetition\n", "tt = tt.sample(False, qcd_events*1.0/tt_events) \n", "wjets = wjets.sample(False, qcd_events*1.0/wjets_events)\n", "\n", "dataUndersampled = qcd.union(tt).union(wjets)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+-------+\n", "|label| count|\n", "+-----+-------+\n", "| 0|1426626|\n", "| 1|1427006|\n", "| 2|1426639|\n", "+-----+-------+\n", "\n" ] } ], "source": [ "dataUndersampled.groupBy('label').count().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Shuffle the dataset\n", "\n", "Because of how the dataset has been created it is made by \"three blocks\" obtained with the union of three samples. Therefore we need to shuffle the dataset. We splid this dataset into `train`/`test` and shuffle the train dataset." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import rand \n", "trainUndersampled, testUndersampled = dataUndersampled.randomSplit([0.8, 0.2], seed=42)\n", "trainUndersampled = trainUndersampled.orderBy(rand(seed=42))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that the whole pipeline will be trigger by the action of saving to the parquet files." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save the datasets as Apache Parquet files" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 408 ms, sys: 341 ms, total: 749 ms\n", "Wall time: 1h 24s\n" ] } ], "source": [ "PATH = \"hdfs://analytix/Training/Spark/TopologyClassifier/\"\n", "\n", "numTestPartitions = 800\n", "\n", "%time testUndersampled.coalesce(numTestPartitions).write.parquet(PATH + 'testUndersampled.parquet')" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.93 s, sys: 1.59 s, total: 3.52 s\n", "Wall time: 2h 9min 18s\n" ] } ], "source": [ "numTrainPartitions = 800\n", "\n", "%time trainUndersampled.coalesce(numTrainPartitions).write.parquet(PATH + 'trainUndersampled.parquet')" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "spark.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.7" }, "sparkconnect": { "bundled_options": [], "list_of_options": [ { "name": "spark.dynamicAllocation.enabled", "value": "false" }, { "name": "spark.executor.memory", "value": "14G" }, { "name": "spark.executor.cores", "value": "4" }, { "name": "spark.executor.instances", "value": "10" }, { "name": "spark.driver.memory", "value": "10G" }, { "name": "spark.driver.maxResultSize", "value": "9G" } ] } }, "nbformat": 4, "nbformat_minor": 2 }