{ "cells": [ { "cell_type": "markdown", "id": "blocked-chemical", "metadata": {}, "source": [ "In this demo, we first show how to use the Arrow `Dataset` API `SkyhookFileFormat` API to scan parquet files by pushing down scan opertations into Ceph and then we show how to use the `Dataset` API to process parquet files containing NanoEvents stored in Ceph in parallel through Coffea using Dask." ] }, { "cell_type": "markdown", "id": "medieval-profile", "metadata": {}, "source": [ "## Exploring SkyhookFileFormat with PyArrow\n", "\n", "We import the Dataset API and the Parquet API from PyArrow." ] }, { "cell_type": "code", "execution_count": 19, "id": "fifty-syndrome", "metadata": {}, "outputs": [], "source": [ "import pyarrow\n", "import pyarrow.dataset as ds\n", "import pyarrow.parquet as pq" ] }, { "cell_type": "markdown", "id": "inappropriate-watts", "metadata": {}, "source": [ "Now, we will instantiate the `SkyhookFileFormat`. Upon instantiation, the connection to the Ceph cluster is made under the hood. The connection is closed automatically upon object destruction. The `SkyhookFileFormat` API currently takes the Ceph configuration file as input. It inherits from the `FileFormat` API and uses the `DirectObjectAccess` API under the hood to interact with the underlying objects that make up a file in CephFS. Since, we mount CephFS, we use the `FileSystemDataset` that comes out of the box with Apache Arrow for instantiating our dataset, as by mounting CephFS we have just another directory of Parquet files. Having the suitability of using the `FileSystemDataset`, we just can start pushing down scan operations to our Parquet files by just plugging in `SkyhookFileFormat` in the format paramter. " ] }, { "cell_type": "code", "execution_count": 20, "id": "aerial-helping", "metadata": {}, "outputs": [], "source": [ "dataset = ds.dataset(\"file:///mnt/cephfs/nyc\", format=ds.SkyhookFileFormat(\"parquet\", \"/etc/ceph/ceph.conf\"))" ] }, { "cell_type": "markdown", "id": "received-costa", "metadata": {}, "source": [ "Now we apply some projections and filters on the dataset." ] }, { "cell_type": "code", "execution_count": 21, "id": "romance-prague", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
total_amountfare_amount
075.8452.00
169.9952.00
259.8453.00
368.5053.50
470.0152.00
.........
37678.8867.00
37764.8458.50
3780.310.01
37958.8057.50
380229.80228.50
\n", "

381 rows × 2 columns

\n", "
" ], "text/plain": [ " total_amount fare_amount\n", "0 75.84 52.00\n", "1 69.99 52.00\n", "2 59.84 53.00\n", "3 68.50 53.50\n", "4 70.01 52.00\n", ".. ... ...\n", "376 78.88 67.00\n", "377 64.84 58.50\n", "378 0.31 0.01\n", "379 58.80 57.50\n", "380 229.80 228.50\n", "\n", "[381 rows x 2 columns]" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dataset.to_table(columns=[\"total_amount\", \"fare_amount\"], filter=(ds.field(\"trip_distance\") > 20.0)).to_pandas()" ] }, { "cell_type": "markdown", "id": "occupational-formula", "metadata": {}, "source": [ "## Install Dask\n", "\n", "We will be using Dask workers for parallel execution. So, let's install it." ] }, { "cell_type": "code", "execution_count": 22, "id": "homeless-overall", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: dask[distributed] in /usr/local/lib/python3.6/site-packages (2021.3.0)\n", "Requirement already satisfied: pyyaml in /usr/lib64/python3.6/site-packages (from dask[distributed]) (3.12)\n", "Requirement already satisfied: distributed>=2021.03.0 in /usr/local/lib/python3.6/site-packages (from dask[distributed]) (2021.3.0)\n", "Requirement already satisfied: sortedcontainers!=2.0.0,!=2.0.1 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (2.4.0)\n", "Requirement already satisfied: click>=6.6 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (8.0.3)\n", "Requirement already satisfied: toolz>=0.8.2 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (0.11.2)\n", "Requirement already satisfied: tblib>=1.6.0 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (1.7.0)\n", "Requirement already satisfied: zict>=0.1.3 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (2.0.0)\n", "Requirement already satisfied: psutil>=5.0 in /usr/local/lib64/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (5.8.0)\n", "Requirement already satisfied: cloudpickle>=1.5.0 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (2.0.0)\n", "Requirement already satisfied: setuptools in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (57.0.0)\n", "Requirement already satisfied: tornado>=5 in /usr/local/lib64/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (6.1)\n", "Requirement already satisfied: contextvars in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (2.4)\n", "Requirement already satisfied: msgpack>=0.6.0 in /usr/local/lib64/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (1.0.3)\n", "Requirement already satisfied: importlib-metadata in /usr/local/lib/python3.6/site-packages (from click>=6.6->distributed>=2021.03.0->dask[distributed]) (4.8.3)\n", "Requirement already satisfied: heapdict in /usr/local/lib/python3.6/site-packages (from zict>=0.1.3->distributed>=2021.03.0->dask[distributed]) (1.0.1)\n", "Requirement already satisfied: immutables>=0.9 in /usr/local/lib64/python3.6/site-packages (from contextvars->distributed>=2021.03.0->dask[distributed]) (0.16)\n", "Requirement already satisfied: typing-extensions>=3.7.4.3 in /usr/local/lib/python3.6/site-packages (from immutables>=0.9->contextvars->distributed>=2021.03.0->dask[distributed]) (4.0.1)\n", "Requirement already satisfied: zipp>=0.5 in /usr/local/lib/python3.6/site-packages (from importlib-metadata->click>=6.6->distributed>=2021.03.0->dask[distributed]) (3.6.0)\n", "\u001b[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv\u001b[0m\n", "Requirement already satisfied: fsspec>=0.3.3 in /usr/local/lib/python3.6/site-packages (2021.11.1)\n", "\u001b[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv\u001b[0m\n" ] } ], "source": [ "!pip3 install dask[distributed]\n", "!pip3 install 'fsspec>=0.3.3'" ] }, { "cell_type": "markdown", "id": "concerned-therapy", "metadata": {}, "source": [ "## Import the required modules\n", "\n", "Import `uproot`, `awkward`, `coffea`." ] }, { "cell_type": "code", "execution_count": 23, "id": "hollow-genre", "metadata": {}, "outputs": [], "source": [ "import uproot\n", "import awkward as ak\n", "from coffea.nanoevents import NanoEventsFactory, NanoAODSchema\n", "from coffea import processor, hist" ] }, { "cell_type": "markdown", "id": "manual-disposition", "metadata": {}, "source": [ "## Define a Processor instance\n", "\n", "The processor implementation given below has been taken from [here](https://github.com/CoffeaTeam/coffea/blob/master/binder/nanoevents.ipynb)." ] }, { "cell_type": "code", "execution_count": 24, "id": "domestic-interim", "metadata": {}, "outputs": [], "source": [ "class MyZPeak(processor.ProcessorABC):\n", " def __init__(self):\n", " self._histo = hist.Hist(\n", " \"Events\",\n", " hist.Cat(\"dataset\", \"Dataset\"),\n", " hist.Bin(\"mass\", \"Z mass\", 60, 60, 120),\n", " )\n", " \n", " @property\n", " def accumulator(self):\n", " return self._histo\n", " \n", " # we will receive a NanoEvents instead of a coffea DataFrame\n", " def process(self, events):\n", " out = self.accumulator.identity()\n", " mmevents = events[\n", " (ak.num(events.Muon) == 2)\n", " & (ak.sum(events.Muon.charge, axis=1) == 0)\n", " ]\n", " zmm = mmevents.Muon[:, 0] + mmevents.Muon[:, 1]\n", " out.fill(\n", " dataset=events.metadata[\"dataset\"],\n", " mass=zmm.mass,\n", " )\n", " return out\n", " \n", " def postprocess(self, accumulator):\n", " return accumulator" ] }, { "cell_type": "markdown", "id": "utility-compact", "metadata": {}, "source": [ "## Convert Root files containing NanoEvents to a Parquet file" ] }, { "cell_type": "code", "execution_count": 25, "id": "sweet-reply", "metadata": {}, "outputs": [], "source": [ "ak.to_parquet(\n", " uproot.lazy(\"../tests/samples/nano_dy.root:Events\"),\n", " \"nano_dy.parquet\",\n", " list_to32=True,\n", " use_dictionary=False,\n", " compression=\"GZIP\",\n", " compression_level=1,\n", ")\n", "\n", "ak.to_parquet(\n", " uproot.lazy(\"../tests/samples/nano_dimuon.root:Events\"),\n", " \"nano_dimuon.parquet\",\n", " list_to32=True,\n", " use_dictionary=False,\n", " compression=\"GZIP\",\n", " compression_level=1,\n", ")" ] }, { "cell_type": "markdown", "id": "together-assist", "metadata": {}, "source": [ "## Write some NanoEvents Parquet files to CephFS\n", "\n", "Here we populate the CephFS mounted directory with the parquet files created in the previous step. In this version, we need to make sure that the individual file sizes is under 4MB which is the default object size of Ceph to ensure one-to-one mapping of files to objects, which is a requirement in the multiple-file design that we have now." ] }, { "cell_type": "code", "execution_count": 26, "id": "muslim-retrieval", "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "os.makedirs(\"/mnt/cephfs/nanoevents/ZJets\", exist_ok=True)\n", "os.makedirs(\"/mnt/cephfs/nanoevents/Data\", exist_ok=True)\n", "for i in range(6):\n", " os.system(f\"cp nano_dy.parquet /mnt/cephfs/nanoevents/ZJets/nano_dy.{i}.parquet\")\n", " os.system(f\"cp nano_dimuon.parquet /mnt/cephfs/nanoevents/Data/nano_dimuon.{i}.parquet\")" ] }, { "cell_type": "code", "execution_count": 27, "id": "ready-channels", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "nano_dimuon.0.parquet nano_dimuon.2.parquet nano_dimuon.4.parquet\r\n", "nano_dimuon.1.parquet nano_dimuon.3.parquet nano_dimuon.5.parquet\r\n" ] } ], "source": [ "!ls /mnt/cephfs/nanoevents/Data" ] }, { "cell_type": "code", "execution_count": 28, "id": "christian-upset", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "nano_dy.0.parquet nano_dy.2.parquet nano_dy.4.parquet\r\n", "nano_dy.1.parquet nano_dy.3.parquet nano_dy.5.parquet\r\n" ] } ], "source": [ "!ls /mnt/cephfs/nanoevents/ZJets" ] }, { "cell_type": "markdown", "id": "parental-yorkshire", "metadata": {}, "source": [ "## Reading Nanoevents using SkyhookFileFormat" ] }, { "cell_type": "code", "execution_count": 29, "id": "republican-given", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/workspace/binder/coffea/nanoevents/schemas/nanoaod.py:195: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8\n", " RuntimeWarning,\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "events_skyhook = NanoEventsFactory.from_parquet(\"/mnt/cephfs/nanoevents/ZJets/nano_dy.0.parquet\", skyhook_options = {\"ceph_config_path\": \"/etc/ceph/ceph.conf\", \"ceph_data_pool\": \"cephfs_data\"}).events()\n", "events_skyhook.Muon" ] }, { "cell_type": "markdown", "id": "manufactured-america", "metadata": {}, "source": [ "## Running a job in parallel using Dask\n", "\n", "The `LocalCluster()` used below creates a process pool with worker count equal to the number of cores available to the Notebook where each worker is single-threaded. The `LocalCluster` can be replaced by other cluster resource managers provided by Dask Distributed like `KuberneresCluster`, `YarnCluster`, etc. Here, we create a `LocalCluster` and get a client handle to it." ] }, { "cell_type": "code", "execution_count": 30, "id": "adapted-amplifier", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/lib/python3.6/site-packages/distributed/node.py:155: UserWarning: Port 8787 is already in use.\n", "Perhaps you already have a cluster running?\n", "Hosting the HTTP server on port 44011 instead\n", " http_address[\"port\"], self.http_server.port\n" ] } ], "source": [ "from dask.distributed import Client, LocalCluster\n", "cluster = LocalCluster(processes=True, threads_per_worker=1)\n", "client = Client(cluster)" ] }, { "cell_type": "markdown", "id": "toxic-empty", "metadata": {}, "source": [ "We have added a new function called `run_parquet_job` to the executor API in coffea to run jobs on Parquet files using the Arrow Dataset API under the hood. \n", "This API takes an optional `ceph_config_path` parameter, which is basically the path to the configuration file of the Ceph cluster and instructs this function to read from RADOS using the `SkyhookFileFormat` (which allows pushdown) instead of the out of the box `ParquetFormat` API . This API also allows just passing a single directory path and the Datasets API does the dataset discovery task by itself. The calls to the Dataset API are launced in parallel and there will one Dataset API call per file." ] }, { "cell_type": "code", "execution_count": 31, "id": "altered-transition", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 2.66 s, sys: 518 ms, total: 3.18 sCompleted | 7.1s\n", "Wall time: 7.21 s\n" ] } ], "source": [ "%%time\n", "result = processor.run_parquet_job({\n", " \"ZJets\": \"/mnt/cephfs/nanoevents/ZJets\",\n", " \"Data\": \"/mnt/cephfs/nanoevents/Data\"\n", " },\n", " \"Events\",\n", " processor_instance=MyZPeak(),\n", " executor=processor.dask_executor,\n", " executor_args={\"schema\": processor.NanoAODSchema, \"client\": client, \"use_skyhook\": True}\n", ")" ] }, { "cell_type": "markdown", "id": "statutory-worst", "metadata": {}, "source": [ "## Running iteratively using the `iterative_executor`\n", "\n", "Run the same job again, but now iteratively. The calls to the Dataset API will now be sequential." ] }, { "cell_type": "code", "execution_count": 32, "id": "aboriginal-immune", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "f8970577209f4bf7b919998126b3cab2", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Processing: 0%| | 0/12 [00:00 GenJetAK8\n", " RuntimeWarning,\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 8.12 s, sys: 854 ms, total: 8.98 s\n", "Wall time: 8.94 s\n" ] } ], "source": [ "%%time\n", "result = processor.run_parquet_job({\n", " \"ZJets\": \"/mnt/cephfs/nanoevents/ZJets\",\n", " \"Data\": \"/mnt/cephfs/nanoevents/Data\"\n", " },\n", " \"Events\",\n", " processor_instance=MyZPeak(),\n", " executor=processor.iterative_executor,\n", " executor_args={\"schema\": processor.NanoAODSchema, \"use_skyhook\": True}\n", ")" ] }, { "cell_type": "markdown", "id": "consolidated-panel", "metadata": {}, "source": [ "As expected, much slower than running using Dask." ] }, { "cell_type": "markdown", "id": "surrounded-petroleum", "metadata": {}, "source": [ "## Plotting the results\n", "\n" ] }, { "cell_type": "code", "execution_count": 33, "id": "collectible-applicant", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 33, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "%matplotlib inline\n", "\n", "hist.plot1d(result)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 5 }