{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Running inference tools\n", "\n", "As machine learning (ML) becomes more popular in HEP analysis, `coffea` also\n", "provide tools to assist with using ML tools within the coffea framework. For\n", "training and validation, you would likely need custom data mangling tools to\n", "convert HEP data formats ([NanoAOD][nanoaod], [PFNano][pfnano]) to a format that\n", "best interfaces with the ML tool of choice, as for training and validation, you\n", "typical want to have fine control over what computation is done. For more\n", "advanced use cases of data mangling and data saving, refer to the [awkward array\n", "manual][datamangle] and [uproot][uproot_write]/[parquet][ak_parquet] write\n", "operations for saving intermediate states. The helper tools provided in coffea\n", "focuses on ML inference, where ML tool outputs are used as another variable to\n", "be used in the event/object selection chain.\n", "\n", "[nanoaod]: https://twiki.cern.ch/twiki/bin/view/CMSPublic/WorkBookNanoAOD\n", "[pfnano]: https://github.com/cms-jet/PFNano\n", "[datamangle]: https://awkward-array.org/doc/main/user-guide/how-to-restructure.html\n", "[uproot_write]: https://uproot.readthedocs.io/en/latest/basic.html#writing-ttrees-to-a-file\n", "[ak_parquet]: https://awkward-array.org/doc/main/reference/generated/ak.to_parquet.html\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Why these wrapper tools are needed\n", "\n", "The typical operation of using ML inference tools in the awkward/coffea analysis\n", "tools involves the conversion and padding of awkward array to ML tool containers\n", "(usually something that is `numpy`-compatible), run the inference, then\n", "convert-and-truncate back into the awkward array syntax required for the\n", "analysis chain to continue. With awkward arrays' laziness now being handled\n", "entirely by [`dask`][dask_awkward], the conversion operation of awkward array to\n", "other array types needs to be wrapped in a way that is understandable to `dask`.\n", "The packages in the `ml_tools` package attempts to wrap the common tools used by\n", "the HEP community with a common interface to reduce the verbosity of the code on\n", "the analysis side.\n", "\n", "[dask_awkward]: https://dask-awkward.readthedocs.io/en/stable/gs-limitations.html\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Example using ParticleNet-like jet variable calculation using PyTorch\n", "\n", "The example given in this notebook be using [`pytorch`][pytorch] to calculate a\n", "jet-level discriminant using its constituent particles. An example for how to\n", "construct such a `pytorch` network can be found in the docs file, but for\n", "`mltools` in coffea, we only support the [TorchScript][pytorch] format files to\n", "load models to ensure operability when scaling to clusters. Let us first start\n", "by downloading the example ParticleNet model file and a small `PFNano`\n", "compatible file, and a simple function to open the `PFNano` with and without\n", "dask.\n", "\n", "[pytorch]: https://pytorch.org/\n", "[pytorch_jit]: https://pytorch.org/tutorials/beginner/saving_loading_models.html#export-load-model-in-torchscript-format\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "!wget --quiet -O model.pt https://github.com/CoffeaTeam/coffea/raw/ml_tools/tests/samples/triton_models_test/pn_test/1/model.pt\n", "!wget --quiet -O pfnano.root https://github.com/CoffeaTeam/coffea/raw/ml_tools/tests/samples/pfnano.root\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from coffea.nanoevents import NanoEventsFactory\n", "from coffea.nanoevents.schemas import PFNanoAODSchema\n", "\n", "\n", "def open_events(permit_dask=False):\n", " factory = NanoEventsFactory.from_root(\n", " \"file:./pfnano.root\",\n", " schemaclass=PFNanoAODSchema,\n", " permit_dask=permit_dask,\n", " )\n", " return factory.events()\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Now we prepare a class to handle inference request by extending the\n", "`mltools.torch_wrapper` class. As the base class cannot know anything about the\n", "data mangling required for the users particular model, we will need to overload\n", "at least the method `prepare_awkward`:\n", "\n", "- The input can be an arbitrary number of awkward arrays or dask awkward array\n", " (but never a mix of dask/non-dask array). In this example, we will be passing\n", " in the event array.\n", "- The output should be single tuple `a` + single dictionary `b`, this is to\n", " ensure that arbitrarily complicated outputs can be passed to the underlying\n", " `pytorch` model instance like `model(*a, **b)`. The contents of `a` and `b`\n", " should be `numpy`-compatible awkward-like arrays: if the inputs are non-dask\n", " awkward arrays, the return should also be non-dask awkward arrays that can be\n", " trivially converted to `numpy` arrays via a `ak.to_numpy` call; if the inputs\n", " are dask awkward arrays, the return should be still be dask awkward arrays\n", " that can be trivially converted via a `to_awkward().to_numpy()` call. To\n", " minimize changes to the code, a simple `dask_awkward/awkward` switcher\n", " `get_awkward_lib` is provided, as there should be (near)-perfect feature\n", " parity between the dask and non-dask arrays.\n", "\n", " In this ParticleNet-like example, the model expects the following inputs:\n", "\n", " - A `N` jets x `2` coordinate x `100` constituents \"points\" array,\n", " representing the constituent coordinates.\n", " - A `N` jets x `5` feature x `100` constituents \"features\" array, representing\n", " the constituent features of interest to be used for inference.\n", " - A `N` jets x `1` mask x `100` constituent \"mask\" array, representing whether\n", " a constituent should be masked from the inference request.\n", "\n", " In this case, we will need to flatten the `E` events x `N` jets structure,\n", " then, we will need to stack the constituent attributes of interest via\n", " `ak.concatenate` into a single array.\n", "\n", "After defining this minimum class, we can attempt to run inference using the\n", "`__call__` method defined in the base class. Notice that overloading this single\n", "method will automatically allow for the inference to be called on both awkward\n", "and dask-awkward.\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/ensc/VirtualENV/coffea-test/lib/python3.8/site-packages/coffea/ml_tools/helper.py:163: UserWarning: No format checks were performed on input!\n", " warnings.warn(\"No format checks were performed on input!\")\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Awkward results: [[0.0693, -0.0448], [0.0678, -0.0451], ..., [0.0616, ...], [0.0587, -0.0172]]\n", "Dask awkward results: dask.awkward\n" ] } ], "source": [ "from coffea.ml_tools.torch_wrapper import torch_wrapper\n", "import awkward\n", "import dask_awkward\n", "import numpy as np\n", "\n", "\n", "class ParticleNetExample1(torch_wrapper):\n", " def prepare_awkward(self, events):\n", " ak = self.get_awkward_lib(events)\n", " jets = ak.flatten(events.Jet)\n", "\n", " def pad(arr):\n", " return ak.fill_none(\n", " ak.pad_none(arr, 100, axis=1, clip=True),\n", " 0.0,\n", " )\n", "\n", " # Human readable version of what the inputs are\n", " # Each array is a N jets x 100 constituent array\n", " imap = {\n", " \"points\": {\n", " \"deta\": pad(jets.eta - jets.constituents.pf.eta),\n", " \"dphi\": pad(jets.delta_phi(jets.constituents.pf)),\n", " },\n", " \"features\": {\n", " \"dr\": pad(jets.delta_r(jets.constituents.pf)),\n", " \"lpt\": pad(np.log(jets.constituents.pf.pt)),\n", " \"lptf\": pad(np.log(jets.constituents.pf.pt / jets.pt)),\n", " \"f1\": pad(np.log(np.abs(jets.constituents.pf.d0) + 1)),\n", " \"f2\": pad(np.log(np.abs(jets.constituents.pf.dz) + 1)),\n", " },\n", " \"mask\": {\n", " \"mask\": pad(ak.ones_like(jets.constituents.pf.pt)),\n", " },\n", " }\n", "\n", " # Compacting the array elements into the desired dimension using\n", " # ak.concatenate\n", " retmap = {\n", " k: ak.concatenate([x[:, np.newaxis, :] for x in imap[k].values()], axis=1)\n", " for k in imap.keys()\n", " }\n", "\n", " # Returning everything using a dictionary. Also perform type conversion!\n", " return (), {\n", " \"points\": ak.values_astype(retmap[\"points\"], \"float32\"),\n", " \"features\": ak.values_astype(retmap[\"features\"], \"float32\"),\n", " \"mask\": ak.values_astype(retmap[\"mask\"], \"float16\"),\n", " }\n", "\n", "\n", "# Setting up the model container\n", "pn_example1 = ParticleNetExample1(\"model.pt\")\n", "\n", "# Running on awkward arrays\n", "ak_events = open_events(permit_dask=False)\n", "ak_results = pn_example1(ak_events)\n", "print(\"Awkward results:\", ak_results) # Runs fine!\n", "\n", "# Running on dask_awkward array\n", "dask_events = open_events(permit_dask=True)\n", "dask_results = pn_example1(dask_events)\n", "print(\"Dask awkward results:\", dask_results) # Also runs file!\n", "\n", "# Checking that the results are identical\n", "assert awkward.all(dask_results.compute() == ak_results)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "For each jet in the input to the `torch` model, the model returns a 2-tuple\n", "probability value. Without additional specification, the `torch_wrapper` class\n", "performs a trival conversion of `ak.from_numpy` of the torch model's output. We\n", "can specify that we want to fold this back into nested structure by overloading\n", "the `postprocess_awkward` method of the class.\n", "\n", "For the ParticleNet example we are going perform additional computation for the\n", "conversion back to awkward array formats:\n", "\n", "- Calculate the `softmax` method for the return of each jet (commonly used as\n", " the singular ML inference \"scores\")\n", "- Fold the computed `softmax` array back into nested structure that is\n", " compatible with the original events.Jet array.\n", "\n", "Notice that the inputs of the `postprocess_awkward` method is different from the\n", "`prepare_awkward` method, only by that the first argument is the return array\n", "of the model inference after the trivial `from_numpy` conversion. Notice that\n", "the return_array can be dask arrays, so the awkward/dask-awkward switching\n", "function should also be used in this method.\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[0.528, 0.528, 0.524, 0.523, 0.521, 0.52, 0.519, 0.519], ..., [0.528, ...]]\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/home/ensc/VirtualENV/coffea-test/lib/python3.8/site-packages/dask_awkward/lib/structure.py:751: UserWarning: Please ensure that dask.awkward\n", " is partitionwise-compatible with dask.awkward\n", " (e.g. counts comes from a dak.num(array, axis=1)),\n", " otherwise this unflatten operation will fail when computed!\n", " warnings.warn(\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "dask.awkward\n" ] } ], "source": [ "class ParticleNetExample2(ParticleNetExample1):\n", " def postprocess_awkward(self, return_array, events):\n", " ak = self.get_awkward_lib(return_array)\n", " softmax = np.exp(return_array)[:, 0] / ak.sum(np.exp(return_array), axis=-1)\n", " njets = ak.count(events.Jet.pt, axis=-1)\n", " return ak.unflatten(softmax, njets)\n", "\n", "\n", "pn_example2 = ParticleNetExample2(\"model.pt\")\n", "\n", "# Running on awkward\n", "ak_events = open_events(permit_dask=False)\n", "ak_jets = ak_events.Jet\n", "ak_jets[\"MLresults\"] = pn_example2(ak_events)\n", "ak_events[\"Jet\"] = ak_jets\n", "print(ak_events.Jet.MLresults)\n", "\n", "# Running on dask awkward\n", "dask_events = open_events(permit_dask=True)\n", "dask_jets = dask_events.Jet\n", "dask_jets[\"MLresults\"] = pn_example2(dask_events)\n", "dask_events[\"Jet\"] = dask_jets\n", "print(dask_events.Jet.MLresults)\n", "\n", "assert awkward.all(ak_events.Jet.MLresults == dask_events.Jet.MLresults.compute())\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Of course, the implementation of the classes above can be written in a single\n", "class. Here is a copy-and-paste implementation of the class with all the\n", "functionality described in the cells above:\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[0.528, 0.528, 0.524, 0.523, 0.521, 0.52, 0.519, 0.519], ..., [0.528, ...]]\n", "dask.awkward\n", "{'from-uproot-9ff71c2b6c06eb15f091c3e41ead5497': ['Jet.pt', 'PFCands.d0', 'JetPFCands.pFCandsIdxG', 'PFCands.phi', 'Jet.phi', 'Jet.eta', 'PFCands.eta', 'PFCands.pt', 'Jet.pFCandsIdxG', 'PFCands.dz']}\n" ] } ], "source": [ "class ParticleNetExample(torch_wrapper):\n", " def prepare_awkward(self, events):\n", " ak = self.get_awkward_lib(events)\n", " jets = ak.flatten(events.Jet)\n", "\n", " def pad(arr):\n", " return ak.fill_none(\n", " ak.pad_none(arr, 100, axis=1, clip=True),\n", " 0.0,\n", " )\n", "\n", " # Human readable version of what the inputs are\n", " # Each array is a N jets x 100 constituent array\n", " imap = {\n", " \"points\": {\n", " \"deta\": pad(jets.eta - jets.constituents.pf.eta),\n", " \"dphi\": pad(jets.delta_phi(jets.constituents.pf)),\n", " },\n", " \"features\": {\n", " \"dr\": pad(jets.delta_r(jets.constituents.pf)),\n", " \"lpt\": pad(np.log(jets.constituents.pf.pt)),\n", " \"lptf\": pad(np.log(jets.constituents.pf.pt / jets.pt)),\n", " \"f1\": pad(np.log(np.abs(jets.constituents.pf.d0) + 1)),\n", " \"f2\": pad(np.log(np.abs(jets.constituents.pf.dz) + 1)),\n", " },\n", " \"mask\": {\n", " \"mask\": pad(ak.ones_like(jets.constituents.pf.pt)),\n", " },\n", " }\n", "\n", " # Compacting the array elements into the desired dimension using\n", " # ak.concatenate\n", " retmap = {\n", " k: ak.concatenate([x[:, np.newaxis, :] for x in imap[k].values()], axis=1)\n", " for k in imap.keys()\n", " }\n", "\n", " # Returning everything using a dictionary. Also take care of type\n", " # conversion here.\n", " return (), {\n", " \"points\": ak.values_astype(retmap[\"points\"], \"float32\"),\n", " \"features\": ak.values_astype(retmap[\"features\"], \"float32\"),\n", " \"mask\": ak.values_astype(retmap[\"mask\"], \"float16\"),\n", " }\n", "\n", " def postprocess_awkward(self, return_array, events):\n", " ak = self.get_awkward_lib(return_array, events)\n", " softmax = np.exp(return_array)[:, 0] / ak.sum(np.exp(return_array), axis=-1)\n", " njets = ak.count(events.Jet.pt, axis=-1)\n", " return ak.unflatten(softmax, njets)\n", "\n", "\n", "pn_example = ParticleNetExample(\"model.pt\")\n", "\n", "# Running on awkward arrays\n", "ak_events = open_events(permit_dask=False)\n", "ak_jets = ak_events.Jet\n", "ak_jets[\"MLresults\"] = pn_example(ak_events)\n", "ak_events[\"Jet\"] = ak_jets\n", "print(ak_events.Jet.MLresults)\n", "\n", "# Running on dask awkward arrays\n", "dask_events = open_events(permit_dask=True)\n", "dask_jets = dask_events.Jet\n", "dask_jets[\"MLresults\"] = pn_example(dask_events)\n", "dask_events[\"Jet\"] = dask_jets\n", "print(dask_events.Jet.MLresults)\n", "# Checking that we get identical results\n", "assert awkward.all(dask_events.Jet.MLresults.compute() == ak_events.Jet.MLresults)\n", "print(dask_awkward.necessary_columns(dask_events.Jet.MLresults))\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "In particular, analyzers should check that the last line contains only the\n", "branches required for ML inference; if there are many non-required branches,\n", "this may lead the significant performance penalties. \n", "\n", "As per other dask tools, the users can extract how dask is analyzing the\n", "processing the computation routines using the following snippet." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "HighLevelGraph with 103 layers.\n", "\n", " 0. from-uproot-40ed9ba7c4cc45e94105e61283f124a3\n", " 1. JetPFCands-f14e9b07eb3c690873f7ea00af7ee8eb\n", " 2. PFCands-d4a8642b8c01c731c7d43e4bde63dba7\n", " 3. JetPFCands-6271438633eb0569493e2a65beb35cb1\n", " 4. PFCands-988dd86d5d366180aedd789b9fd97788\n", " 5. JetPFCands-b9487676f464447b15804d03bca317da\n", " 6. PFCands-915a7a9dd26d7fb2110a52bc38fae16a\n", " 7. JetPFCands-1535528183c6524bb8872d941e3a2320\n", " 8. PFCands-a4e509a69376ee57025fb7c7bfbf7903\n", " 9. JetPFCands-0e8588a67ca85b26e05c720442904a9d\n", " 10. PFCands-e95bd3adde3a14965f38e941f6b912c3\n", " 11. JetPFCands-f14ebf216a7d1a1761a234ad85c21a54\n", " 12. PFCands-18e5d62f1b50a1c0edafa1f39ebe765d\n", " 13. JetPFCands-6beb51836e39d136853699523596bf0e\n", " 14. PFCands-27deeb09b87b1ac27f7199f705a0b003\n", " 15. JetPFCands-8da6714502a3b61dd64f44221e6d0299\n", " 16. PFCands-7e6e94c83dcffeb122571204de8d1392\n", " 17. Jet-fa3ab6fd9c4726ec37bac08fbeed78c9\n", " 18. flatten-4b26fe19858d9257a26a602aef991da8\n", " 19. pFCandsIdxG-e6ffb60d1c3a5132916c6e67aa728476\n", " 20. _apply_global_index-deae4d7cd7d1e0d8d803f96bc280cfa6\n", " 21. pFCandsIdxG-2edff7ce8b2126a66b44af833d8b713b\n", " 22. _apply_global_index-0d462573e4213b3c2d08e82f54e93c0f\n", " 23. pt-506e1c8af61c15ef17f71c0f91448da4\n", " 24. ones-like-4cecc07513ea49b0b40dff2fd351a5b9\n", " 25. pad-none-62e19b5d7cbf908345333361b2c3676f\n", " 26. fill-none-d69152c91524f0008baeabefd0ecee5e\n", " 27. getitem-8d5c6a7b1c97dc8ed9e10c049cc6c2be\n", " 28. " ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "print(dask_results.dask)\n", "dask_results.visualize(optimize_graph=False)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Or a peek at the optimized results:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/ensc/VirtualENV/coffea-test/lib/python3.8/site-packages/coffea/ml_tools/helper.py:163: UserWarning: No format checks were performed on input!\n", " warnings.warn(\"No format checks were performed on input!\")\n" ] }, { "data": { "image/png": "", "text/plain": [ "" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dask_results.visualize(optimize_graph=True)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Comments about generalizing to other ML tools\n", "\n", "All ML wrappers provided in the `coffea.mltools` module (`triton_wrapper` for\n", "[triton][triton] server inference, `torch_wrapper` for pytorch, and\n", "`xgboost_wrapper` for [xgboost][xgboost] inference) follow the same design:\n", "analyzers is responsible for providing the model of interest, along with\n", "providing an inherited class that overloads of the following methods to data\n", "type conversion:\n", "\n", "- `prepare_awkward`: converting awkward arrays to `numpy`-compatible awkward\n", " arrays, the output arrays should be in the format of a tuple `a` and a\n", " dictionary `b`, which can be expanded out to the input of the ML tool like\n", " `model(*a, **b)`. Notice some additional trivial conversion, such as the\n", " conversion to available kernels for `pytorch`, converting to a matrix format\n", " for `xgboost`, and slice of array for `triton` is handled automatically by the\n", " respective wrappers. To handle both dask/non-dask arrays, the user should use\n", " the provided `get_awkward_lib` library switcher.\n", "- `postprocess_awkward` (optional): converting the trivial converted numpy array\n", " results back to the analysis specific format. If this is not provided, then a\n", " simple `ak.from_numpy` conversion results is returned.\n", "\n", "If the ML tool of choice for your analysis has not been implemented by the\n", "`coffea.mltools` modules, consider constructing your own with the provided\n", "`numpy_call_wrapper` base class in `coffea.mltools`. Aside from the functions\n", "listed above, you will also need to provide the `numpy_call` method to perform\n", "any additional data format conversions, and call the ML tool of choice. If you\n", "think your implementation is general, also consider submitting a PR to the\n", "`coffea` repository!\n", "\n", "[triton]: https://catalog.ngc.nvidia.com/orgs/nvidia/containers/tritonserver\n", "[xgboost]: https://xgboost.readthedocs.io/en/stable/\n" ] } ], "metadata": { "kernelspec": { "display_name": "coffea-developement", "language": "python", "name": "coffea-dev" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.16" } }, "nbformat": 4, "nbformat_minor": 4 }