{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Process a tree distributedly with Spark, ROOT and C++ on IT clusters" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a list with the input file names." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Will be processing 2 files\n" ] } ], "source": [ "inputfile = \"/afs/cern.ch/user/e/etejedor/public/0.root\"\n", "numfiles = 2\n", "files = [inputfile for _ in xrange(numfiles)]\n", "print \"Will be processing\", numfiles, \"files\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "ROOT imports." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [ { "data": { "application/javascript": [ "\n", "require(['notebook'],\n", " function() {\n", " IPython.CodeCell.config_defaults.highlight_modes['magic_text/x-c++src'] = {'reg':[/^%%cpp/]};\n", " console.log(\"JupyROOT - %%cpp magic configured\");\n", " }\n", ");\n" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Welcome to JupyROOT 6.07/09\n" ] } ], "source": [ "import ROOT\n", "from DistROOT import DistTree" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define the mapper and reducer functions in C++." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "%%cpp -d\n", "TH1F* fillCpp(TTreeReader& reader) {\n", " TTreeReaderValue> tracksRV(reader, \"tracks\");\n", " TH1F *h = new TH1F(\"hpt\", \"Pt histogram\", 64, 0, 50);\n", "\n", " while (reader.Next()) {\n", " auto tracks = *tracksRV;\n", " for (auto&& track : tracks) {\n", " auto pt = track.Pt();\n", " h->Fill(pt);\n", " }\n", " } \n", "\n", " return h;\n", "}\n", "\n", "TH1F* mergeCpp(TH1F *h1, const TH1F *h2) {\n", " h1->Add(h2);\n", " return h1;\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a distributed tree with the list of file names, name of the tree and number of logical partitions. The environment was previously configured with CVMFS to use the `hadalytic` cluster." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "16/11/07 10:12:49 INFO SparkContext: Running Spark version 1.6.0\n", "16/11/07 10:12:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "16/11/07 10:12:50 INFO SecurityManager: Changing view acls to: etejedor\n", "16/11/07 10:12:50 INFO SecurityManager: Changing modify acls to: etejedor\n", "16/11/07 10:12:50 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(etejedor); users with modify permissions: Set(etejedor)\n", "16/11/07 10:12:50 INFO Utils: Successfully started service 'sparkDriver' on port 36923.\n", "16/11/07 10:12:50 INFO Slf4jLogger: Slf4jLogger started\n", "16/11/07 10:12:50 INFO Remoting: Starting remoting\n", "16/11/07 10:12:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@188.184.91.42:43819]\n", "16/11/07 10:12:51 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 43819.\n", "16/11/07 10:12:51 INFO SparkEnv: Registering MapOutputTracker\n", "16/11/07 10:12:51 INFO SparkEnv: Registering BlockManagerMaster\n", "16/11/07 10:12:51 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-01566d2d-a74f-4e16-881c-b88c71ad59f8\n", "16/11/07 10:12:51 INFO MemoryStore: MemoryStore started with capacity 511.1 MB\n", "16/11/07 10:12:51 INFO SparkEnv: Registering OutputCommitCoordinator\n", "16/11/07 10:12:51 INFO Server: jetty-8.y.z-SNAPSHOT\n", "16/11/07 10:12:51 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040\n", "16/11/07 10:12:51 INFO Utils: Successfully started service 'SparkUI' on port 4040.\n", "16/11/07 10:12:51 INFO SparkUI: Started SparkUI at http://188.184.91.42:4040\n", "16/11/07 10:12:52 INFO RMProxy: Connecting to ResourceManager at p01001532067275.cern.ch/128.142.35.237:8032\n", "16/11/07 10:12:52 INFO Client: Requesting a new application from cluster with 14 NodeManagers\n", "16/11/07 10:12:52 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)\n", "16/11/07 10:12:52 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead\n", "16/11/07 10:12:52 INFO Client: Setting up container launch context for our AM\n", "16/11/07 10:12:52 INFO Client: Setting up the launch environment for our AM container\n", "16/11/07 10:12:52 INFO Client: Preparing resources for our AM container\n", "16/11/07 10:12:53 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.\n", "16/11/07 10:12:53 INFO YarnSparkHadoopUtil: getting token for namenode: hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668\n", "16/11/07 10:12:53 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 2629586 for etejedor on 128.142.35.237:8020\n", "16/11/07 10:12:54 INFO Client: Uploading resource file:/cvmfs/sft.cern.ch/lcg/releases/spark/1.6.0-95ed9/x86_64-slc6-gcc49-opt/lib/spark-assembly-1.6.0-hadoop2.6.0.jar -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/spark-assembly-1.6.0-hadoop2.6.0.jar\n", "16/11/07 10:12:55 INFO Client: Uploading resource file:/cvmfs/sft.cern.ch/lcg/releases/spark/1.6.0-95ed9/x86_64-slc6-gcc49-opt/python/lib/pyspark.zip -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/pyspark.zip\n", "16/11/07 10:12:55 INFO Client: Uploading resource file:/cvmfs/sft.cern.ch/lcg/releases/spark/1.6.0-95ed9/x86_64-slc6-gcc49-opt/python/lib/py4j-0.9-src.zip -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/py4j-0.9-src.zip\n", "16/11/07 10:12:56 INFO Client: Uploading resource file:/tmp/spark-fec88f51-128c-448c-9c4a-939f582df7ae/__spark_conf__4669209039456569521.zip -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/__spark_conf__4669209039456569521.zip\n", "16/11/07 10:12:56 INFO DFSClient: Exception in createBlockOutputStream\n", "java.net.NoRouteToHostException: No route to host\n", "\tat sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)\n", "\tat sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)\n", "\tat org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)\n", "\tat org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)\n", "\tat org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1610)\n", "\tat org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408)\n", "\tat org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361)\n", "\tat org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588)\n", "16/11/07 10:12:56 INFO DFSClient: Abandoning BP-2054394024-128.142.35.237-1427464967087:blk_1139996772_66875988\n", "16/11/07 10:12:56 INFO DFSClient: Excluding datanode 128.142.210.232:1004\n", "16/11/07 10:12:56 INFO SecurityManager: Changing view acls to: etejedor\n", "16/11/07 10:12:56 INFO SecurityManager: Changing modify acls to: etejedor\n", "16/11/07 10:12:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(etejedor); users with modify permissions: Set(etejedor)\n", "16/11/07 10:12:56 INFO Client: Submitting application 2668 to ResourceManager\n", "16/11/07 10:12:56 INFO YarnClientImpl: Submitted application application_1478008051180_2668\n", "16/11/07 10:12:57 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)\n", "16/11/07 10:12:57 INFO Client: \n", "\t client token: Token { kind: YARN_CLIENT_TOKEN, service: }\n", "\t diagnostics: N/A\n", "\t ApplicationMaster host: N/A\n", "\t ApplicationMaster RPC port: -1\n", "\t queue: root.standard\n", "\t start time: 1478509976232\n", "\t final status: UNDEFINED\n", "\t tracking URL: http://p01001532067275.cern.ch:8088/proxy/application_1478008051180_2668/\n", "\t user: etejedor\n", "16/11/07 10:12:58 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)\n", "16/11/07 10:12:59 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)\n", "16/11/07 10:13:00 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)\n", "16/11/07 10:13:01 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)\n", "16/11/07 10:13:02 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)\n", "16/11/07 10:13:03 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)\n", "16/11/07 10:13:04 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)\n", "16/11/07 10:13:05 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)\n", "16/11/07 10:13:05 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)\n", "16/11/07 10:13:05 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> p01001532067275.cern.ch, PROXY_URI_BASES -> http://p01001532067275.cern.ch:8088/proxy/application_1478008051180_2668), /proxy/application_1478008051180_2668\n", "16/11/07 10:13:05 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter\n", "16/11/07 10:13:06 INFO Client: Application report for application_1478008051180_2668 (state: RUNNING)\n", "16/11/07 10:13:06 INFO Client: \n", "\t client token: Token { kind: YARN_CLIENT_TOKEN, service: }\n", "\t diagnostics: N/A\n", "\t ApplicationMaster host: 128.142.210.235\n", "\t ApplicationMaster RPC port: 0\n", "\t queue: root.standard\n", "\t start time: 1478509976232\n", "\t final status: UNDEFINED\n", "\t tracking URL: http://p01001532067275.cern.ch:8088/proxy/application_1478008051180_2668/\n", "\t user: etejedor\n", "16/11/07 10:13:06 INFO YarnClientSchedulerBackend: Application application_1478008051180_2668 has started running.\n", "16/11/07 10:13:06 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40031.\n", "16/11/07 10:13:06 INFO NettyBlockTransferService: Server created on 40031\n", "16/11/07 10:13:06 INFO BlockManagerMaster: Trying to register BlockManager\n", "16/11/07 10:13:06 INFO BlockManagerMasterEndpoint: Registering block manager 188.184.91.42:40031 with 511.1 MB RAM, BlockManagerId(driver, 188.184.91.42, 40031)\n", "16/11/07 10:13:06 INFO BlockManagerMaster: Registered BlockManager\n", "16/11/07 10:13:06 INFO EventLoggingListener: Logging events to hdfs:///user/spark/applicationHistory/application_1478008051180_2668\n", "16/11/07 10:13:06 INFO DFSClient: Exception in createBlockOutputStream\n", "java.net.NoRouteToHostException: No route to host\n", "\tat sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)\n", "\tat sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)\n", "\tat org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)\n", "\tat org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)\n", "\tat org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1610)\n", "\tat org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408)\n", "\tat org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361)\n", "\tat org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588)\n", "16/11/07 10:13:06 INFO DFSClient: Abandoning BP-2054394024-128.142.35.237-1427464967087:blk_1139996774_66875990\n", "16/11/07 10:13:06 INFO DFSClient: Excluding datanode 128.142.210.232:1004\n", "16/11/07 10:13:15 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (itrac1508.cern.ch:37986) with ID 2\n", "16/11/07 10:13:15 WARN TableMapping: /etc/hadoop/conf/topology.table.file cannot be read.\n", "java.io.FileNotFoundException: /etc/hadoop/conf/topology.table.file (No such file or directory)\n", "\tat java.io.FileInputStream.open0(Native Method)\n", "\tat java.io.FileInputStream.open(FileInputStream.java:195)\n", "\tat java.io.FileInputStream.(FileInputStream.java:138)\n", "\tat java.io.FileInputStream.(FileInputStream.java:93)\n", "\tat java.io.FileReader.(FileReader.java:58)\n", "\tat org.apache.hadoop.net.TableMapping$RawTableMapping.load(TableMapping.java:101)\n", "\tat org.apache.hadoop.net.TableMapping$RawTableMapping.resolve(TableMapping.java:134)\n", "\tat org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)\n", "\tat org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)\n", "\tat org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)\n", "\tat org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)\n", "\tat org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:292)\n", "\tat org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:284)\n", "\tat scala.collection.immutable.List.foreach(List.scala:318)\n", "\tat org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:284)\n", "\tat org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:196)\n", "\tat org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:167)\n", "\tat org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104)\n", "\tat org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)\n", "\tat org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)\n", "\tat org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)\n", "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n", "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n", "\tat java.lang.Thread.run(Thread.java:745)\n", "16/11/07 10:13:15 WARN TableMapping: Failed to read topology table. /default-rack will be used for all nodes.\n", "16/11/07 10:13:15 INFO BlockManagerMasterEndpoint: Registering block manager itrac1508.cern.ch:52262 with 511.5 MB RAM, BlockManagerId(2, itrac1508.cern.ch, 52262)\n", "16/11/07 10:13:16 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (itrac1501.cern.ch:55824) with ID 1\n", "16/11/07 10:13:16 INFO BlockManagerMasterEndpoint: Registering block manager itrac1501.cern.ch:43424 with 511.5 MB RAM, BlockManagerId(1, itrac1501.cern.ch, 43424)\n", "16/11/07 10:13:16 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8\n" ] } ], "source": [ "dTree = DistTree(filelist = files,\n", " treename = \"events\",\n", " npartitions = 4)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the tree partitions." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Tree entries partitioning: [(0,499), (500,999), (1000,1499), (1500,1999)]\n" ] } ], "source": [ "print \"Tree entries partitioning:\", dTree.GetPartitions()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Request the processing of the tree. This will fire up a map-reduce chain with `fillCpp` as mapper and `mergeCpp` as reducer functions. The final result is a histogram." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [], "source": [ "h = dTree.ProcessAndMerge(ROOT.fillCpp, ROOT.mergeCpp)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot the resulting histogram." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%jsroot on\n", "c = ROOT.TCanvas()\n", "h.Draw()\n", "c.Draw()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.10" } }, "nbformat": 4, "nbformat_minor": 0 }