{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "hollow-genre", "metadata": {}, "outputs": [], "source": [ "import uproot\n", "import awkward as ak\n", "from coffea.nanoevents import NanoEventsFactory, NanoAODSchema, schemas\n", "from coffea import processor, hist" ] }, { "cell_type": "markdown", "id": "manual-disposition", "metadata": {}, "source": [ "## Define a Processor instance\n", "\n", "The processor implementation given below has been taken from [here](https://github.com/CoffeaTeam/coffea/blob/master/binder/nanoevents.ipynb)." ] }, { "cell_type": "code", "execution_count": 2, "id": "domestic-interim", "metadata": {}, "outputs": [], "source": [ "class MyZPeak(processor.ProcessorABC):\n", " def __init__(self):\n", " self._histo = hist.Hist(\n", " \"Events\",\n", " hist.Cat(\"dataset\", \"Dataset\"),\n", " hist.Bin(\"mass\", \"Z mass\", 60, 60, 120),\n", " )\n", " \n", " @property\n", " def accumulator(self):\n", " return self._histo\n", " \n", " # we will receive a NanoEvents instead of a coffea DataFrame\n", " def process(self, events):\n", " out = self.accumulator.identity()\n", " mmevents = events[\n", " (ak.num(events.Muon) == 2)\n", " & (ak.sum(events.Muon.charge, axis=1) == 0)\n", " ]\n", " zmm = mmevents.Muon[:, 0] + mmevents.Muon[:, 1]\n", " out.fill(\n", " dataset=events.metadata[\"dataset\"],\n", " mass=zmm.mass,\n", " )\n", " return out\n", " \n", " def postprocess(self, accumulator):\n", " return accumulator" ] }, { "cell_type": "markdown", "id": "together-assist", "metadata": {}, "source": [ "## Write some NanoEvents Parquet files to CephFS\n", "\n", "Here we populate the CephFS mounted directory with the parquet files created in the previous step. In this version, we need to make sure that the individual file sizes is under 4MB which is the default object size of Ceph to ensure one-to-one mapping of files to objects, which is a requirement in the multiple-file design that we have now." ] }, { "cell_type": "code", "execution_count": 3, "id": "muslim-retrieval", "metadata": {}, "outputs": [], "source": [ "import os, shutil\n", "\n", "if not os.path.isfile(\"nano_dy.parquet\"):\n", " ak.to_parquet(\n", " uproot.lazy(\"nano_dy.root:Events\"),\n", " \"nano_dy.parquet\",\n", " list_to32=True,\n", " use_dictionary=False,\n", " compression=\"GZIP\",\n", " compression_level=1,\n", " )\n", "\n", "if not os.path.isfile(\"nano_dimuon.parquet\"):\n", " ak.to_parquet(\n", " uproot.lazy(\"nano_dimuon.root:Events\"),\n", " \"nano_dimuon.parquet\",\n", " list_to32=True,\n", " use_dictionary=False,\n", " compression=\"GZIP\",\n", " compression_level=1,\n", " )\n", " \n", "if not os.path.isdir(\"/mnt/cephfs/nanoevents/ZJets\"):\n", " os.makedirs(\"/mnt/cephfs/nanoevents/ZJets\")\n", " for i in range(2):\n", " shutil.copyfile('nano_dy.parquet', '/mnt/cephfs/nanoevents/ZJets/nano_dy.{}.parquet'.format(i))\n", " \n", " \n", "if not os.path.isdir(\"/mnt/cephfs/nanoevents/Data\"):\n", " os.makedirs(\"/mnt/cephfs/nanoevents/Data\")\n", " for i in range(2):\n", " shutil.copyfile('nano_dimuon.parquet', '/mnt/cephfs/nanoevents/Data/nano_dimuon.{}.parquet'.format(i))" ] }, { "cell_type": "markdown", "id": "manufactured-america", "metadata": { "tags": [] }, "source": [ "## Running a job in parallel using Dask\n", "\n", "The `LocalCluster()` used below creates a process pool with worker count equal to the number of cores available to the Notebook where each worker is single-threaded. The `LocalCluster` can be replaced by other cluster resource managers provided by Dask Distributed like `KuberneresCluster`, `YarnCluster`, etc. Here, we create a `LocalCluster` and get a client handle to it." ] }, { "cell_type": "code", "execution_count": 4, "id": "e73bc893-3745-4048-b0cc-ea3f1c45093d", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-2c026108-c195-11ec-8676-2284b808c7e2

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Direct
\n", " Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status\n", "
\n", "\n", " \n", "
\n", "

Scheduler Info

\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-44ffa98d-cfbe-4b3c-a054-98c4c229546e

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:34139\n", " \n", " Workers: 1\n", "
\n", " Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status\n", " \n", " Total threads: 4\n", "
\n", " Started: 42 minutes ago\n", " \n", " Total memory: 15.70 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 18

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:34125\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: /user/oksana.shadura@cern.ch/proxy/44441/status\n", " \n", " Memory: 15.70 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:40603\n", "
\n", " Local directory: /home/cms-jovyan/dask-worker-space/worker-3ao_484a\n", "
\n", " Tasks executing: 0\n", " \n", " Tasks in memory: 0\n", "
\n", " Tasks ready: 0\n", " \n", " Tasks in flight: 0\n", "
\n", " CPU usage: 2.0%\n", " \n", " Last seen: Just now\n", "
\n", " Memory usage: 179.51 MiB\n", " \n", " Spilled bytes: 0 B\n", "
\n", " Read bytes: 2.36 kiB\n", " \n", " Write bytes: 2.36 kiB\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "\n", "client = Client(\"tcp://127.0.0.1:34139\")\n", "client" ] }, { "cell_type": "markdown", "id": "toxic-empty", "metadata": {}, "source": [ "We have added a new function called `run_parquet_job` to the executor API in coffea to run jobs on Parquet files using the Arrow Dataset API under the hood. \n", "This API takes an optional `ceph_config_path` parameter, which is basically the path to the configuration file of the Ceph cluster and instructs this function to read from RADOS using the `SkyhookFileFormat` (which allows pushdown) instead of the out of the box `ParquetFormat` API . This API also allows just passing a single directory path and the Datasets API does the dataset discovery task by itself. The calls to the Dataset API are launced in parallel and there will one Dataset API call per file." ] }, { "cell_type": "code", "execution_count": 5, "id": "altered-transition", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 645 ms, sys: 114 ms, total: 759 msCompleted | 7.4s\n", "Wall time: 8 s\n" ] } ], "source": [ "%%time\n", "\n", "run = processor.Runner(\n", " executor=processor.DaskExecutor(client=client),\n", " use_skyhook=True,\n", " format=\"parquet\",\n", " schema=schemas.NanoAODSchema,\n", ")\n", "\n", "hists = run(\n", " {\n", " \"ZJets\": \"/mnt/cephfs/nanoevents/ZJets/\",\n", " \"Data\": \"/mnt/cephfs/nanoevents/Data\",\n", " },\n", " \"Events\",\n", " processor_instance=MyZPeak(),\n", ")" ] }, { "cell_type": "markdown", "id": "statutory-worst", "metadata": {}, "source": [ "## Running iteratively using the `FuturesExecutor`\n", "\n", "Run the same job again, but now iteratively. The calls to the Dataset API will now be sequential." ] }, { "cell_type": "code", "execution_count": 6, "id": "aboriginal-immune", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "8067afe2a53542e89e376d22a18735bf", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Output()" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/schemas/nanoaod.py:193: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8\n", " warnings.warn(\n", "/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/schemas/nanoaod.py:193: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8\n", " warnings.warn(\n" ] }, { "data": { "text/html": [ "
\n"
      ],
      "text/plain": []
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "
\n",
       "
\n" ], "text/plain": [ "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 496 ms, sys: 53.8 ms, total: 550 ms\n", "Wall time: 8.55 s\n" ] } ], "source": [ "%%time\n", "\n", "run = processor.Runner(\n", " executor=processor.FuturesExecutor(compression=None),\n", " use_skyhook=True,\n", " format=\"parquet\",\n", " schema=schemas.NanoAODSchema,\n", ")\n", "\n", "hists = run(\n", " {\n", " \"ZJets\": \"/mnt/cephfs/nanoevents/ZJets\",\n", " \"Data\": \"/mnt/cephfs/nanoevents/Data\",\n", " },\n", " \"Events\",\n", " processor_instance=MyZPeak(),\n", ")" ] }, { "cell_type": "markdown", "id": "6ef6a420-8718-4be3-ab2a-b1a3685b8c30", "metadata": { "tags": [] }, "source": [ "## Running iteratively without Skyhook `FuturesExecutor`\n", "\n", "Run the same job again, but now iteratively without Skyhook. The calls to the Dataset API will now be sequential." ] }, { "cell_type": "code", "execution_count": 7, "id": "e4551f3e-1679-4bc8-baa2-49cb629be7f7", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "05c8e9e40cb946399b3e0708129a09bf", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Output()" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/schemas/nanoaod.py:193: \n",
       "RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8\n",
       "  warnings.warn(\n",
       "
\n" ], "text/plain": [ "/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/schemas/nanoaod.py:193: \n", "RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8\n", " warnings.warn(\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/schemas/nanoaod.py:193: \n",
       "RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8\n",
       "  warnings.warn(\n",
       "
\n" ], "text/plain": [ "/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/schemas/nanoaod.py:193: \n", "RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8\n", " warnings.warn(\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n"
      ],
      "text/plain": []
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "
\n",
       "
\n" ], "text/plain": [ "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 6.43 s, sys: 174 ms, total: 6.61 s\n", "Wall time: 6.59 s\n" ] } ], "source": [ "%%time\n", "\n", "run = processor.Runner(\n", " executor=processor.IterativeExecutor(compression=None),\n", " use_skyhook=False,\n", " format=\"parquet\",\n", " schema=schemas.NanoAODSchema,\n", ")\n", "\n", "hists = run(\n", " {\n", " \"ZJets\": \"/mnt/cephfs/nanoevents/ZJets\",\n", " \"Data\": \"/mnt/cephfs/nanoevents/Data\",\n", " },\n", " \"Events\",\n", " processor_instance=MyZPeak(),\n", ")" ] }, { "cell_type": "markdown", "id": "consolidated-panel", "metadata": {}, "source": [ "As expected, much slower than running using Dask." ] }, { "cell_type": "markdown", "id": "197b0470-3441-479d-b062-25733d207a24", "metadata": { "tags": [] }, "source": [ "## Running `DaskExecutor` without Skyhook\n", "\n", "Run the same job again, but with `DaskExecutor` abd without Skyhook:" ] }, { "cell_type": "code", "execution_count": 8, "id": "58b40f75-c833-4ef8-b017-9325b8413b0c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 184 ms, sys: 23.5 ms, total: 208 msompleted | 4.4s\n", "Wall time: 4.49 s\n" ] } ], "source": [ "%%time\n", "\n", "run = processor.Runner(\n", " executor=processor.DaskExecutor(client=client),\n", " use_skyhook=False,\n", " format=\"parquet\",\n", " schema=schemas.NanoAODSchema,\n", ")\n", "\n", "hists = run(\n", " {\n", " \"ZJets\": \"/mnt/cephfs/nanoevents/ZJets\",\n", " \"Data\": \"/mnt/cephfs/nanoevents/Data\",\n", " },\n", " \"Events\",\n", " processor_instance=MyZPeak(),\n", ")" ] }, { "cell_type": "markdown", "id": "surrounded-petroleum", "metadata": {}, "source": [ "## Plotting the results\n", "\n" ] }, { "cell_type": "code", "execution_count": 9, "id": "collectible-applicant", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/opt/conda/lib/python3.8/site-packages/numpy/core/_methods.py:44: RuntimeWarning: invalid value encountered in reduce\n", " return umr_minimum(a, axis, None, out, keepdims, initial, where)\n", "/opt/conda/lib/python3.8/site-packages/numpy/core/_methods.py:40: RuntimeWarning: invalid value encountered in reduce\n", " return umr_maximum(a, axis, None, out, keepdims, initial, where)\n", "/opt/conda/lib/python3.8/site-packages/numpy/core/_methods.py:44: RuntimeWarning: invalid value encountered in reduce\n", " return umr_minimum(a, axis, None, out, keepdims, initial, where)\n", "/opt/conda/lib/python3.8/site-packages/numpy/core/_methods.py:40: RuntimeWarning: invalid value encountered in reduce\n", " return umr_maximum(a, axis, None, out, keepdims, initial, where)\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "%matplotlib inline\n", "\n", "hist.plot1d(hists)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" } }, "nbformat": 4, "nbformat_minor": 5 }