# Process a tree distributedly with Spark, ROOT and C++ on IT clusters

Create a list with the input file names.

In [1]:
inputfile = "/afs/cern.ch/user/e/etejedor/public/0.root"
numfiles = 2
files = [inputfile for _ in xrange(numfiles)]
print "Will be processing", numfiles, "files"

Will be processing 2 files


ROOT imports.

In [2]:
import ROOT
from DistROOT import DistTree

Welcome to JupyROOT 6.07/09


Define the mapper and reducer functions in C++.

In [3]:
%%cpp -d
TH1F* fillCpp(TTreeReader& reader) {
  TTreeReaderValue<std::vector<ROOT::Math::PxPyPzEVector>> tracksRV(reader, "tracks");
  TH1F *h = new TH1F("hpt", "Pt histogram", 64, 0, 50);

  while (reader.Next()) {
    auto tracks = *tracksRV;
    for (auto&& track : tracks) {
      auto pt = track.Pt();
      h->Fill(pt);
    }
  } 

  return h;
}

TH1F* mergeCpp(TH1F *h1, const TH1F *h2) {
  h1->Add(h2);
  return h1;
}

Create a distributed tree with the list of file names, name of the tree and number of logical partitions. The environment was previously configured with CVMFS to use the `hadalytic` cluster.

In [4]:
dTree = DistTree(filelist = files,
                 treename = "events",
                 npartitions = 4)

16/11/07 10:12:49 INFO SparkContext: Running Spark version 1.6.0
16/11/07 10:12:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/11/07 10:12:50 INFO SecurityManager: Changing view acls to: etejedor
16/11/07 10:12:50 INFO SecurityManager: Changing modify acls to: etejedor
16/11/07 10:12:50 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(etejedor); users with modify permissions: Set(etejedor)
16/11/07 10:12:50 INFO Utils: Successfully started service 'sparkDriver' on port 36923.
16/11/07 10:12:50 INFO Slf4jLogger: Slf4jLogger started
16/11/07 10:12:50 INFO Remoting: Starting remoting
16/11/07 10:12:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@188.184.91.42:43819]
16/11/07 10:12:51 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 43819.
16/11/07 10:12:51 INFO SparkEnv:

Check the tree partitions.

In [5]:
print "Tree entries partitioning:", dTree.GetPartitions()

Tree entries partitioning: [(0,499), (500,999), (1000,1499), (1500,1999)]


Request the processing of the tree. This will fire up a map-reduce chain with `fillCpp` as mapper and `mergeCpp` as reducer functions. The final result is a histogram.

In [6]:
h = dTree.ProcessAndMerge(ROOT.fillCpp, ROOT.mergeCpp)

Plot the resulting histogram.

In [7]:
%jsroot on
c = ROOT.TCanvas()
h.Draw()
c.Draw()