{ "metadata": { "celltoolbar": "Slideshow", "name": "", "signature": "sha256:8bbdb2b1b9abc36381a4c4c9951e6de54f3d69a682a57cd6fb90f48473068a21" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![intro](images/Intro.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Course of talk\n", "---------------\n", "\n", "* What is Stream processing and it's challenges !\n", "* How Apache storm fits in !\n", "* Getting to know components of Apache Storm !\n", "* Apache Storm with Python !\n", "* Wikipedia edit logs : Building a working topology in Python\n", "* Need for Kafka ?\n", "* Advanced features of Storm !\n", "* Monitoring and Deployment !\n", "* Resources and Guides !" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Stream processing framework challenges :\n", "-----------------------------------------\n", "\n", "**\" Stream processing frameworks are not only for real-time computation but an infrastructure for never ending continuous data processing \"**\n", "\n", "* They are essentially abstraction over messaging systems that address variety of challenges and use-cases.\t\n", "* Store message while consumers consume it, replay them, multiple consumers for same messages.\t\n", "* Configuring what message goes where, how to deploy / parallelize workers.\n", "* Fault-Tolerance :\n", "\t* Making sure workers and Queues are always up.\n", "\t* If worker machine goes down, make sure message is treated as unprocessed.\n", "\t* Incase workers goes down, what happens to the counts / messages.\t\t\n", "* What if the underlying messaging system sends you the same message twice or loses a message ? : Both the scenarios will account for incorrect counts.\n", "* Scaling for high-throughput.\n", "* From what point should the processor run when it restarts.\n", "* What if you want to count grouped by the key, eg: group by user id.\n", "* Distribute the computation across multiple machines when single machine is not able to handle." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "How storm fits in :\n", "-------------------\n", "\n", "* **Extremely broad set of use cases** \n", "* **Scalable** \n", "* **Guarantees no data loss** \n", "* **Extremely robust** \n", "* **Fault-tolerant** \n", "* **Programming language agnostic** \n", "* **Achieve parallelism across components**." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "notes" } }, "source": [ "* **Extremely broad set of use cases:** Storm can be used for processing messages and updating databases (stream processing), doing a continuous query on data streams and streaming the results into clients (continuous computation), parallelizing an intense query like a search query on the fly (distributed RPC), and more. Storm\u2019s small set of primitives satisfy a stunning number of use cases.\n", "\n", "* **Scalable:** Storm scales to massive numbers of messages per second. To scale a topology, all you have to do is add machines and increase the parallelism settings of the topology. As an example of Storm\u2019s scale, one of Storm\u2019s initial applications processed 1,000,000 messages per second on a 10 node cluster, including hundreds of database calls per second as part of the topology. Storm\u2019s usage of Zookeeper for cluster coordination makes it scale to much larger cluster sizes.\n", "\n", "* **Guarantees no data loss:** A realtime system must have strong guarantees about data being successfully processed. A system that drops data has a very limited set of use cases. Storm guarantees that every message will be processed, and this is in direct contrast with other systems like S4.\n", "\n", "* **Extremely robust:** Unlike systems like Hadoop, which are notorious for being difficult to manage, Storm clusters just work. It is an explicit goal of the Storm project to make the user experience of managing Storm clusters as painless as possible.\n", "\n", "* **Fault-tolerant:** If there are faults during execution of your computation, Storm will reassign tasks as necessary. Storm makes sure that a computation can run forever (or until you kill the computation).\n", "\n", "* **Programming language agnostic:** Robust and scalable realtime processing shouldn\u2019t be limited to a single platform. Storm topologies and processing components can be defined in any language, making Storm accessible to nearly anyone.\n", "\n", "* Achieve parallelism across components." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Storm Components:\n", "------------------\n", "\n", "* Conceptual view\n", " * Stream\n", " * Spout\n", " * Bolts\n", " * Topology\n", "\n", "* Physical View\n", " * Nimbus\n", " * ZoopKeeper\n", " * Supervisor\n", " * Storm-UI" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "**SPOUT**\n", "-----------\n", "\n", "![spout](images/spout.png)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import itertools\n", "\n", "from streamparse.spout import Spout\n", "\n", "\n", "class SentenceSpout(Spout):\n", "\n", " def initialize(self, stormconf, context):\n", " self.sentences = [\n", " \"She advised him to take a long holiday, so he immediately quit work and took a trip around the world\",\n", " \"I was very glad to get a present from her\",\n", " \"He will be here in half an hour\",\n", " \"She saw him eating a sandwich\",\n", " ]\n", " self.sentences = itertools.cycle(self.sentences)\n", "\n", " def next_tuple(self):\n", " sentence = next(self.sentences)\n", " self.emit([sentence])\n", "\n", " def ack(self, tup_id):\n", " pass # if a tuple is processed properly, do nothing\n", "\n", " def fail(self, tup_id):\n", " pass # if a tuple fails to process, do nothing\n", "\n", "\n", "if __name__ == '__main__':\n", " SentenceSpout().run()" ], "language": "python", "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "**Stream**\n", "-----------\n", "\n", "\n", "\n", "![stream](images/stream.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "**Bolt**\n", "-----------\n", "\n", "\n", "\n", "![bolt](images/bolts.png)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import re\n", "\n", "from streamparse.bolt import Bolt\n", "\n", "class SentenceSplitterBolt(Bolt):\n", "\n", " def process(self, tup):\n", " sentence = tup.values[0] # extract the sentence\n", " sentence = re.sub(r\"[,.;!\\?]\", \"\", sentence) # get rid of punctuation\n", " words = [[word.strip()] for word in sentence.split(\" \") if word.strip()]\n", " if not words:\n", " # no words to process in the sentence, fail the tuple\n", " self.fail(tup)\n", " return\n", "\n", " self.emit_many(words)\n", " self.ack(tup) # tell Storm the tuple has been processed successfully\n", "\n", "if __name__ == '__main__':\n", " SentenceSplitterBolt().run()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "**Topology**\n", "-----------\n", "\n", "\n", "\n", "![topology](images/topology.png)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "{\"sentence-splitter\" (shell-bolt-spec\n", " ;; inputs, where does this bolt recieve it's tuples from?\n", " {\"word-spout-1\" :shuffle\n", " }\n", " ;; command to run\n", " [\"python\" \"sentence_splitter.py\"]\n", " ;; output spec, what tuples does this bolt emit?\n", " [\"word\"]\n", " ;; configuration parameters\n", " :p 2)\n", " \"word-counter\" (shell-bolt-spec\n", " ;; recieves tuples from \"sentence-splitter\", grouped by word\n", " {\"sentence-splitter\" [\"word\"]}\n", " [\"python\" \"word_counter.py\"]\n", " [\"word\" \"count\"])\n", " \"word-count-saver\" (shell-bolt-spec\n", " {\"word-counter\" :shuffle}\n", " [\"python\" \"word_saver.py\"]\n", " ;; does not emit any fields\n", " [])}" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "** Grouping **\n", "\n", "How that stream should be partitioned among the bolt\u2019s tasks.\n", "\n", "There are variety of ways you can use, but the most common one's used are :\n", "\n", "* Shuffle grouping\n", "* Fields grouping\n", "\n", "Note: For detailed list please refer to URL : http://storm.incubator.apache.org/documentation/Concepts.html#stream-groupings\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "** Physical View **\n", "![Storm-Arch](images/storm_architecture-300x195.png)\n", "\n", "* **Nimbus node** :\n", " * Manage, Monitor, coordinate topologies running on the cluster.\n", " * Deployment of topology.\n", " * Task assignment and re-assignment in case of failure.\n", "* **ZooKeeper nodes** \u2013 coordinates the Storm cluster\n", "* **Supervisor nodes** \u2013 communicates with Nimbus through Zookeeper, starts and stops workers according to signals from Nimbus\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Storm & Python\n", "---------------\n", "\n", "* StreamParse\n", "* Petrel\n", "* Integration using Jython\n", "* Native multilang topology.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "My <3 for StreamParse\n", "----------------------\n", "![spout](images/streamparse_arch.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "* Clean project layout\n", "\n", "| File/Folder | Contents |\n", "|---|---|\n", "| config.json | Configuration information for all of your topologies. |\n", "| fabfile.py | Optional custom fabric tasks.|\n", "| project.clj | leiningen project file, can be used to add external JVM dependencies |\n", "| src/ | Python source files (bolts/spouts/etc.) for topologies. |\n", "| tasks.py | Optional custom invoke tasks. |\n", "| topologies/ | Contains topology definitions written using the Clojure DSL for Storm. |\n", "| virtualenvs/ | Contains pip requirements files in order to install dependencies on remote Storm servers. |\n", " \t" ] }, { "cell_type": "code", "collapsed": false, "input": [ "#DSL for defining topologies\n", "#----------------------------\n", "\n", "(ns wordcount\n", " (:use [backtype.storm.clojure])\n", " (:gen-class))\n", "\n", "(def wordcount\n", " [\n", " ;; spout configuration\n", " {\"word-spout\" (shell-spout-spec\n", " [\"python\" \"words.py\"]\n", " [\"word\"]\n", " )\n", " }\n", " ;; bolt configuration\n", " {\"count-bolt\" (shell-bolt-spec\n", " {\"word-spout\" :shuffle}\n", " [\"python\" \"wordcount.py\"]\n", " [\"word\" \"count\"]\n", " :p 2\n", " )\n", " }\n", " ]\n", ")" ], "language": "python", "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "** Enough of talking, now let's build something. **" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Wikipedia edit trends\n", "------------------------------\n", "\n", "* Human vs Bots.\n", "* LoggedIN vs Anonymous.\n", "* Action wise split." ] }, { "cell_type": "code", "collapsed": false, "input": [ "Sample message\n", "--------------\n", "\n", "{\n", " \"action\": \"edit\",\n", " \"change_size\": 98,\n", " \"flags\": null,\n", " \"is_anon\": false,\n", " \"is_bot\": false,\n", " \"is_minor\": false,\n", " \"is_new\": false,\n", " \"is_unpatrolled\": false,\n", " \"ns\": \"Main\",\n", " \"page_title\": \"Abu Bakr al-Baghdadi\",\n", " \"parent_rev_id\": \"615788913\",\n", " \"rev_id\": \"615784505\",\n", " \"summary\": \"/* As Caliph of the Islamic State */ this should have pointed out that some sources deny this was al-Baghdadi - Wikipedia cannot say in its own voice that this is genuine while it is contested\",\n", " \"url\": \"http://en.wikipedia.org/w/index.php?diff=615788913&oldid=615784505\",\n", " \"user\": \"Dougweller\"\n", "}" ], "language": "python", "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "[user@localhost opt]# sparse quickstart wikipedia_editLogs_trends" ], "language": "python", "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%%bash \n", "cd /opt/wikipedia_editLogs_trends/\n", "ls\n", "\n", "_build config.json fabfile.py log-cleaner.log project.clj README.md _resources src tasks.py topologies virtualenvs zookeeper.out" ], "language": "python", "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "#src/KafkaSpout.py\n", "\n", "from streamparse.spout import Spout\n", "from kafka.client import KafkaClient\n", "from kafka.consumer import SimpleConsumer\n", "\n", "class KafkaSpout(Spout):\n", "\n", " def initialize(self, stormconf, context):\n", " self.kafka = KafkaClient('127.0.0.1:9092')\n", " self.consumer = SimpleConsumer(self.kafka,\"SpoutTopologyGroup\",\"WikiPediaLogs\")\n", "\n", " def next_tuple(self):\n", " for message in self.consumer:\n", " self.emit([message])\n", "\n", "if __name__ == '__main__':\n", " KafkaSpout().run()\n" ], "language": "python", "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "#src/ParseJson.py\n", "from collections import Counter\n", "\n", "from streamparse.bolt import Bolt\n", "import json\n", "import time\n", "\n", "class ParseJson(Bolt):\n", "\n", " def initialize(self, conf, ctx):\n", " self.counts = Counter()\n", "\n", " def process(self, tup):\n", "\t# Few castings\n", "\tnull = \"\"\n", "\ttrue = True\n", "\tfalse = False\n", "\n", "\t##Main Logic\n", "\tlist_words = []\n", "\tjson_object = json.loads(str(tup.values[0]))\n", "\taction_str = \"g2_\" + str(json_object['action'])\n", "\tlist_words.append([action_str])\n", "\n", "\t##Bot or Not\n", "\tif json_object['is_bot']:\n", "\t\tlist_words.append([\"bot\"])\n", "\telse:\n", "\t\tlist_words.append([\"human\"])\n", "\t\n", "\n", "\t##Anon or LoggedIN\n", "\tif json_object['is_anon']:\n", "\t\tlist_words.append([\"g3_Anon\"])\n", "\telse:\n", "\t\tlist_words.append([\"g3_LoggedIN\"])\n", "\t\n", "\tself.emit_many(list_words)\n", "\n", "if __name__ == '__main__':\n", " ParseJson().run()\n" ], "language": "python", "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "#src/Counts\n", "from collections import Counter\n", "from collections import defaultdict\n", "\n", "from streamparse.bolt import Bolt\n", "\n", "from websocket import create_connection\n", "import json\n", "import time\n", "\n", "class WordCounter(Bolt):\n", "\n", " def initialize(self, conf, ctx):\n", " #self.counts = Counter()\n", " self.counts = defaultdict(int)\n", " self.ws = create_connection(\"ws://localhost:2546\")\n", "\n", " def process(self, tup):\n", " word = tup.values[0]\n", " self.counts[word] += 1 \n", " self.ws.send(json.dumps(self.counts)) \n", " self.emit([word,self.counts[word]])\n", "\n", "\n", "if __name__ == '__main__':\n", " WordCounter().run()" ], "language": "python", "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "#Topology\n", "(ns wikipediaEditLogs\n", " (:use [backtype.storm.clojure])\n", " (:gen-class))\n", "\n", "(def wordcount\n", " [\n", " ;; spout configuration\n", " {\"kafka-spout\" (shell-spout-spec\n", " [\"python\" \"KafkaSpout.py\"]\n", " [\"word\"]\n", " )\n", " }\n", " ;; bolt configuration\n", " {\"parse-bolt\" (shell-bolt-spec\n", " {\"kafka-spout\" :shuffle}\n", " [\"python\" \"ParseJson.py\"]\n", " [\"word\"]\n", " :p 2\n", " )\n", " ;; bolt configuration\n", " \"count-bolt\" (shell-bolt-spec\n", " {\"parse-bolt\" [\"word\"]}\n", " [\"python\" \"wordcount.py\"]\n", " [\"word\" \"count\"]\n", " :p 2\n", " )\n", " }\n", " ]\n", ")" ], "language": "python", "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "[user@localhost opt]# sparse run\n", "[user@localhost opt]# sparse submit --name topology_name" ], "language": "python", "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Need for Kafka\n", "---------------\n", "\n", "* Kafka is a **High-Throughput**, **Distributed**, **Persistent Messaging (Publish-Subscribe)** system.\n", "* Let's you think in a log-centric way." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Important-Concepts\n", "-------------------\n", "* Cluster: Set of brokers\n", "* Broker : Individual node in the cluster\n", "* Topic : Group of related messages\n", "* Partition: part of topic, used for replication\n", "* offset : Point from where consumer needs to read." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Rethinking how logs work\n", "------------------------" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Kafka with Python\n", "------------------\n", "* python-kafka\n", "* Samsa\n", "* Kafka + Storm" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Advanced features of Storm\n", "--------------------------\n", "\n", "* DRPC\n", "* Transacational topologies\n", "* Trident topologies" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Resources\n", "----------\n", "\n", "* streamparse : https://github.com/Parsely/streamparse\n", "* Storm official docs : https://storm.incubator.apache.org/\n", "* Kafka offical docs : http://kafka.apache.org/\n", "* Python Kafka libraries : \n", " * kafka-python : https://github.com/mumrah/kafka-python\n", " * samsa : https://github.com/mumrah/kafka-python\n", "* Other interesting reads:\n", " * Understanding storm paralleism : http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "
**And just like a topology \"This topic is a never ending discussion, catch me around for demo and more details\".**
\n", "\n", "
Thank you & Question :)
" ] } ], "metadata": {} } ] }