# Coffea-Casa Benchmark Example 1

In [1]:
import numpy as np
%matplotlib inline
from coffea import hist
import coffea.processor as processor
import awkward as ak
from coffea.nanoevents import schemas

In [2]:
# This program plots an event-level variable (in this case, MET, but switching it is as easy as a dict-key change). It also demonstrates an easy use of the book-keeping cutflow tool, to keep track of the number of events processed.

# The processor class bundles our data analysis together while giving us some helpful tools.  It also leaves looping and chunks to the framework instead of us.
class Processor(processor.ProcessorABC):
    def __init__(self):
        # Bins and categories for the histogram are defined here. For format, see https://coffeateam.github.io/coffea/stubs/coffea.hist.hist_tools.Hist.html && https://coffeateam.github.io/coffea/stubs/coffea.hist.hist_tools.Bin.html
        dataset_axis = hist.Cat("dataset", "")
        MET_axis = hist.Bin("MET", "MET [GeV]", 50, 0, 100)
        
        # The accumulator keeps our data chunks together for histogramming. It also gives us cutflow, which can be used to keep track of data.
        self._accumulator = processor.dict_accumulator({
            'MET': hist.Hist("Counts", dataset_axis, MET_axis),
            'cutflow': processor.defaultdict_accumulator(int)
        })
    
    @property
    def accumulator(self):
        return self._accumulator
    
    def process(self, events):
        output = self.accumulator.identity()
        
        # This is where we do our actual analysis. The dataset has columns similar to the TTree's; events.columns can tell you them, or events.[object].columns for deeper depth.
        dataset = events.metadata["dataset"]
        MET = events.MET.pt
        
        # We can define a new key for cutflow (in this case 'all events'). Then we can put values into it. We need += because it's per-chunk (demonstrated below)
        output['cutflow']['all events'] += ak.size(MET)
        output['cutflow']['number of chunks'] += 1
        
        # This fills our histogram once our data is collected. The hist key ('MET=') will be defined in the bin in __init__.
        output['MET'].fill(dataset=dataset, MET=MET)
        return output

    def postprocess(self, accumulator):
        return accumulator

In [3]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:36157")
client

0,1
Connection method: Direct,
Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status,

0,1
Comm: tcp://127.0.0.1:36157,Workers: 1
Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status,Total threads: 4
Started: 2 hours ago,Total memory: 15.70 GiB

0,1
Comm: tcp://127.0.0.1:34225,Total threads: 4
Dashboard: /user/oksana.shadura@cern.ch/proxy/46647/status,Memory: 15.70 GiB
Nanny: tcp://127.0.0.1:36243,
Local directory: /home/cms-jovyan/dask-worker-space/worker-crip7cn_,Local directory: /home/cms-jovyan/dask-worker-space/worker-crip7cn_
Tasks executing: 0,Tasks in memory: 1
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 518.93 MiB,Spilled bytes: 0 B
Read bytes: 41.93 kiB,Write bytes: 57.77 kiB


In [6]:
import os, shutil
import uproot
import awkward as ak

if not os.path.isfile("/mnt/cephfs/Run2012B_SingleMu/Run2012B_SingleMu.parquet"):
    ak.to_parquet(
        uproot.lazy("nano_dy.root:Events"),
        "Run2012B_SingleMu.parquet",
        list_to32=True,
        use_dictionary=False,
        compression="GZIP",
        compression_level=1,
    )
    
if not os.path.isdir("/mnt/cephfs/nanoevents/Run2012B_SingleMu"):
    os.makedirs("/mnt/cephfs/nanoevents/Run2012B_SingleMu")
    shutil.copyfile('Run2012B_SingleMu.parquet', '/mnt/cephfs/Run2012B_SingleMu/Run2012B_SingleMu.parquet')

In [7]:
fileset = {'SingleMu' : ["/mnt/cephfs/Run2012B_SingleMu/Run2012B_SingleMu.parquet"]}

run = processor.Runner(executor=processor.DaskExecutor(client=client),
                        schema=schemas.NanoAODSchema,
                        savemetrics=True,
                        use_skyhook=True,
                        skyhook_options = {"ceph_config_path": "/opt/ceph/ceph.conf", "ceph_data_pool": "cephfs_data0"},
                        format="parquet")

output, metrics = run(fileset, "Events", processor_instance=Processor())

metrics

FileNotFoundError: /mnt/cephfs/Run2012B_SingleMu/Run2012B_SingleMu.parquet

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError


In [None]:
# Generates a 1D histogram from the data output to the 'MET' key. fill_opts are optional, to fill the graph (default is a line).
hist.plot1d(output['MET'], overlay='dataset', fill_opts={'edgecolor': (0,0,0,0.3), 'alpha': 0.8})

In [None]:
# Easy way to print all cutflow dict values. Can just do print(output['cutflow']["KEY_NAME"]) for one.
for key, value in output['cutflow'].items():
    print(key, value)