{ "metadata": { "name": "" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![](files/images/talk-title-mdg.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

About Me

\n", "--------------------------------------------\n", "\n", "\n", "* Moved to Munich in Jan'2015.\n", "* Currently working as a Software Engineer with Cliqz :\n", " * Cliqz is the new way to navigate the Internet.\n", " * With CLIQZ for Firefox you're clicking directly, quickly and safely through the Web.\n", "* Prior to this was working with one of the largest e-commerce websites in India." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Why do we care about a DataPipeline?

\n", "--------------------------------------------\n", "\n", "\n", "* We are generating data at a rapid pace.\n", "* Data sources are in abundance, different formats, frequencies.\n", "* Need to have a pro-active approach, gain insights as and when the data is being generated.\n", "* Behaviour of the product / app needs to adapt to how the user has engaged in the past, and is engaging at the moment." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Data Platform architecture

\n", "
\n", "![](files/images/lambda.png)\n", "
" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "

Data Platform architecture

\n", "
\n", "\n", "![](files/images/arch-diagram.png)\n", "
" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Components

\n", "----------------------------\n", "\n", "## Data ingestion Layer :\n", "--------------------------\n", "\n", "* Tightly couples together with the streamprocessing layer\n", "* Deployment model\n", "* Data source reliability\n", "* Multiple consumers\n", "* Replay messages (Will cover in guranteed message processing)\n", "* Data locality\n", "* Example : Kafka\n", "\n", "## Processing Layer(common patterns)\n", "--------------------------\n", "\n", "* Batch\n", "* Microbatch\n", "* Streaming\n", "\n", "\n", "## Storage / Query layer\n", "--------------------------\n", " \n", "* Processed data served using cache\n", "* Intermediate processing data\n", "* Persist raw and processed data" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Stream processing layer

\n", "\n", "
Layer which not only provides for real-time computation but an infrastructure for never ending continuous data processing
" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Stream processing layer

\n", "\n", "## Challenges :\n", "-----------\n", "\n", "* Scalability: Ingest humdreds of millions of events per minute / day in real time.\n", "* High degree of robustness\n", "* Reliable data processing\n", "* Fault-tolerance : Resilient to software and hardware failures and continue to meet SLA's\n", "* Low latency : Site facing applications need response times in the order of milliseconds\n", "* Partioning, Routing, Serialization\n", "* Meter and gauges to what's going under the hood\n", "* Control knobs\n", "* Multi-lang support" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Stream processing layer

\n", "\n", "## Apache storm :\n", "--------------\n", "\n", "* High degree of robustness\n", "* Reliable data processing\n", "* Fault-tolerance\n", "* Low latency\n", "* Partioning, Routing, Serialization\n", "* Meter and gauges to what's going under the hood\n", "* Control knobs\n", "* Multi-lang support" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Components of storm cluster

\n", "\n", "## Conceptual Level\n", "-------------\n", "\n", "* Topologies\n", "* Streams\n", "* Spouts\n", "* Bolts\n", "\n", "## Physical Level\n", "-------------\n", "\n", "* Nimbus\n", "* ZoopKeeper\n", "* Supervisor\n", "* Storm UI\n", " \n", "## Executing components of a topology :\n", "-------------\n", "\n", "* Workers\n", "* Tasks\n", "* Executors" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Spouts

\n", "
\n", "![spout](files/images/spout.png)\n", "
" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import itertools\n", "from streamparse.spout import Spout\n", "from websocket import create_connection\n", "import json\n", "\n", "class webSocketSpout(Spout):\n", "\n", "\tdef initialize(self, stormconf, context):\n", "\t\tself.ws = create_connection(\"ws://websocket.local.local:9000/\")\n", "\n", "\tdef next_tuple(self):\n", "\t\tresult = self.ws.recv()\n", "\t\tjson_object = json.loads(result)\n", "\t\tself.emit([result])\n", "\n", "if __name__ == '__main__':\n", " webSocketSpout().run()\n", " \n", "'''\n", "Sample event:\n", "{\n", " \"action\": \"edit\",\n", " \"change_size\": 328,\n", " \"flags\": \"M\",\n", " \"hashtags\": [],\n", " \"is_anon\": false,\n", " \"is_bot\": false,\n", " \"is_minor\": true,\n", " \"is_new\": false,\n", " \"is_unpatrolled\": false,\n", " \"mentions\": [],\n", " \"ns\": \"Main\",\n", " \"page_title\": \"St. Andre Bessette Catholic Secondary School\",\n", " \"parent_rev_id\": \"663970563\",\n", " \"rev_id\": \"659207915\",\n", " \"summary\": \"replace with infobox school per TfD\",\n", " \"url\": \"http://en.wikipedia.org/w/index.php?diff=663970563&oldid=659207915\",\n", " \"user\": \"Frietjes\"\n", "}\n", "\n", "'''" ], "language": "python", "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Streams

\n", "\n", "
\n", "\n", "![stream](files/images/stream.png)\n", "\n", "
" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Bolts

\n", "
\n", "\n", "![bolt](files/images/bolts.png)\n", "
" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from collections import Counter\n", "from streamparse.bolt import Bolt\n", "from redis import StrictRedis\n", "import json\n", "import time\n", "\n", "class jsonParser(Bolt):\n", "\n", " def initialize(self, conf, ctx):\n", " self.counts = Counter()\n", "\n", " def process(self, tup):\n", "\tkeys = []\n", "\tjson_object = json.loads(str(tup.values[0]))\n", " keys.append([\"isanon_\" + str(json_object[\"is_anon\"])])\n", "\tif json_object.get(\"is_anon\"):\n", "\t\tkeys.append([\"anon_anon\"])\n", "\telse:\n", "\t\tkeys.append([\"anon_loggedin\"])\n", "\tif json_object.get(\"is_bot\"):\n", "\t\tkeys.append([\"bot_bot\"])\n", "\telse:\n", "\t\tkeys.append([\"bot_human\"])\n", "\n", " keys.append([\"action_\" + json_object[\"action\"]])\n", "\tif json_object.get(\"geo_ip\"):\n", "\t\tkeys.append([\"country_\" + json_object[\"geo_ip\"][\"country_name\"]])\n", " self.emit_many(keys)\n", "\n", "class RedisBolt(Bolt):\n", " def initialize(self, conf, ctx):\n", " self.redis = StrictRedis(host=\"redishost\")\n", " self.counter = Counter()\n", "\n", " def process(self, tup):\n", " keys, = tup.values\n", "\tkey, word = keys.split(\"_\")\n", " #self.log_count(word)\n", "\tif key == 'action':\n", " \tself.redis.zincrby(\"actions\", str(word), 1)\n", "\t\tself.redis.zadd(key,int(time.time()), word)\n", "\n", "\telse:\n", " \tself.redis.zincrby(key, str(word), 1)" ], "language": "python", "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Topology

\n", "\n", "
\n", "![topology](files/images/topology.png)\n", "
" ] }, { "cell_type": "code", "collapsed": false, "input": [ "(ns wikipedialogs\n", " (:use [streamparse.specs])\n", " (:gen-class))\n", "\n", "(defn wikipedialogs [options]\n", " [\n", " ;; spout configuration\n", " {\"websocket-spout\" (python-spout-spec\n", " options\n", " \"spouts.websocket.webSocketSpout\"\n", " [\"word\"]\n", "\t :p 1\n", " )\n", " }\n", " ;; bolt configuration\n", " {\"parser-bolt\" (python-bolt-spec\n", " options\n", " {\"websocket-spout\" :shuffle}\n", " \"bolts.bolts.jsonParser\"\n", " [\"word\"]\n", " :p 2\n", " )\n", "\n", " ;; bolt configuration\n", " \"redis-bolt\" (python-bolt-spec\n", " options\n", " {\"count-bolt\" :shuffle}\n", " \"bolts.bolts.RedisBolt\"\n", " ;; does not emit any fields\n", " :p 2\n", " )\n", " }\n", " ]\n", ")\n", "\n", "## Submit topology\n", "# sparse run\n", "# sparse submit --name wikipedialogs" ], "language": "python", "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Grouping

\n", "\n", "
\n", "\n", "![grouping](files/images/grouping1.png)\n", "\n", "
\n", "Note: For detailed list please refer to URL : https://storm.apache.org/documentation/Tutorial.html\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Physical View

\n", "\n", "\n", "![Storm-Arch](files/images/starch1.png)\n", "\n", "\n", "* **Nimbus node** :\n", " * Manage, Monitor, coordinate topologies running on the cluster.\n", " * Deployment of topology.\n", " * Task assignment and re-assignment in case of failure.\n", "* **ZooKeeper nodes** \u2013 coordinates the Storm cluster\n", "* **Supervisor nodes** \u2013 communicates with Nimbus through Zookeeper, starts and stops workers according to signals from Nimbus\n", "* **Fault tolerant**\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Executing components

\n", "\n", "![Workers, Tasks, Executors](files/images/wte.png)\n", "\n", "Image Source :[Official docs](https://storm.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Guranteed message processing

\n", "\n", "### How does the flow work\n", "-----------------------\n", "\n", "* Tuple Tree :\n", " * Spout emits a touple, which goes to a bolt\n", " * Bolt produces another tuple based on the previous one, the next bolt produces another set.\n", " \n", "* A spout tuple is not considered fully complete until all the tuples in the tree have finished processing.\n", " * Not completed withing a specified amount of time then replay the spout tuple.\n", " \n", "* We can leverage the Reliability API by anchoring, which is essentially tagging the new tuple with input tuple.\n", "* Special tasks dedicated, called ACKER Tasks.\n", "\n", "### Scenarios\n", "---------------\n", "\n", "* A tuple isn't acked because the task died.\n", "* Acker task dies.\n", "* Spout task dies." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

STORM UI and CLI

\n", "\n", "* Basic Cluster / Topology / Spout / Bolt level summary\n", "* Useful to see performance\n", "* Basic controls\n", "* Rebalance cluster in storm, for changing parallelism" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Other features and resources

\n", "\n", "* Trident topologies\n", "* DRPC\n", "* Resource Managers : Storm-Yarn, Storm with mesos \n", "* Running Apache Storm securley : https://github.com/apache/storm/blob/master/SECURITY.md\n", "* Storm-Deploy : https://github.com/miguno/wirbelsturm\n", "* Internal messaging in Apache Storm :\n", " * Intra-worker communication : inter-thread on the same Storm node\n", " * Inter-worker communication : node-to-node across the network\n", " * Inter-topology or across cluster communication: nothing built into Storm\n", " * Useful read : http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/\n", "* streamparse : https://github.com/Parsely/streamparse\n", "* Storm official docs : https://storm.apache.org/" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

Other open-source alternatives

\n", "\n", "* Samza\n", "* S4\n", "* Linkedin Pinot \n", "* Twitter Heron" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "

And just like a topology:

\n", "
\"This topic is a never ending discussion, catch me around for demo and more details\".
\n", "
Thank you & Questions :)
" ] } ], "metadata": {} } ] }