{ "metadata": { "name": "" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ " \n", "Design considerations while Evaluating, Developing, Deploying a distributed task processing system\n", "==================================================================================================\n", "\n", "![](files/images/cover-image-devops-talk.png)\n", "\n", "\n", "**Konark Modi**\n", "---------------\n", "\n", "*@konarkmodi*\n", "\n", "*MakeMyTrip.com*\n", "\n", "*EuroPython - 2014*\n", "\n", "[Image credits](http://www.slideshare.net/mahendram/advanced-task-management-with-celery/21)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Expectations from the Talk\n", "==========================\n", "\n", "- Definition of Job / Task.\n", "- Use cases\n", "- Celery and It's Architecture \n", "- Design Choices\n", "- Various types of workflows.\n", "- Tools Available\n", "\n", "\n", "**Note:** *Each one of us have their own use-cases, this talk is primarily to understand whole ecosystem of tasks. Because we use Celery hence I'd be using celery to relate to example.*" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "What is a task & what is a distributed task processing ?\n", "===================================\n", "\n", "- Clickstream ( No. of hotels being watched, likes etc.)\n", "- Generate graphs based on a large data set for display on a site\u2019s home page / Dashboard every 15 minute\n", "- Admin applications like\n", "- Thumbnails generation \n", "- Re-generate static files by examining when certain items in the admin have been modified\n", "- Connecting to third-party APIs\n", "- Mass e-mailers\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "** Celery and it's Architecture ** \n", "-----------------------------------\n", "\n", "* **What is Celery ?**\n", " * Asynschronus task queue/job queue.\n", " * Uses distributed message passing.\n", " * Supports both real-time processing + schedule jobs.\n", " * Tasks can be run concurrently on a single or more worker servers taking advantage of multiprocessing.\n", "* **Ridiculously simple to get started with** : Will just see that in a moment :)\n", "* **Flexible & Reliable**\n", " * Configurable and Extensible\n", " * Reliable delivery and execution of tasks\n", "* **Everything is message passing**\n", " * Executing tasks\n", " * Broadcasting commands\n", " * Publish Results\n", "* **Out-Of-Box tools for operations and management of the system**\n", "* **Our use-cases over Celery**\n", " * Poll approx 300+ DB queries every 15 minutes.\n", " * Custom metric collection\n", " * Inventory management system :\n", " * Asynchronus web-interface\n", " * Periodic tasks\n", " * Execute million of data processing tasks in chunks that run over for days\n", "* **Has now become as a de-facto framework for any distirbuted task processing and background execution.**" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Architecture\n", "=============\n", "\n", "![](files/images/celery-architecture.jpg)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Architecture\n", "=============\n", "\n", "![](files/images/workers_and_queue.png)\n", "[Image credits:](http://digitheadslabnotebook.blogspot.ca/2010/10/message-queues-with-python.html)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "#Install Celery\n", "pip install celery\n", "\n", "#Celery config\n", "BROKER_URL = 'amqp://guest:guest@localhost:5672/'\n", "CELERY_RESULT_BACKEND = \"amqp\"\n", "CELERY_IMPORTS = (\"tasks\",)\n", "\n", "#celery tasks.py\n", "celery = Celery('tasks')\n", "celery.config_from_object('celeryconfig')\n", "\n", "@celery.task\n", "def test_demo():\n", " print \"name\"\n", " return True\n", "\n", "@celery.task\n", "def add(a1, a2):\n", " a3 = a1 + a2\n", " print a3\n", " return a3\n", "#Starting celery\n", "celery -A tasks worker --loglevel=info -c 1\n", "\n", "#Executing tasks\n", "from tasks import *\n", "s = add.delay(2,3)\n", "s.status\n", "s.result" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Design Choices\n", "================\n", "\n", "- Scheduling Capabilities\n", "- Task Management\n", "- Worker-Management\n", "- Admin and Reporting" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Scheduling Capabilities\n", "=======================\n", "\n", "* Scheduling not just based on time but the nature of task too.\n", "\t* Cron based syntax.\n", "\t* Humanized form of entries.\n", "\t* Interval based.\n", "\t* Immediate execution\n", "\t* Countdown based" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Task Management\n", "================\n", "\n", "* Routing of tasks\t\n", "\t* Priority of execution\n", "\t* Based on OS\n", "\t* Based on hardware-capabilities\n", "* Conflict Management\n", "* Retries\n", " * Exception Handling\n", " * Expiration\n", " * Time-Limits (Soft / Hard) \n", "* Task state-management :\n", "\t* Sent / Received / Started / Succeeded / Failed / Revoked / Retired\t\t\n", "* Controls : Pause, Kill, Delete" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Worker Management\n", "=================\n", "\n", "* **Basic Tasks**\n", " * Start / Stop (Warm-Shut-down / Cold-Shut-down)\n", " * With Traceback\n", " * Online / Offline\n", " * Heartbeat\n", "* **Worker inspection**\n", " * No of tasks being executed , scheduled and reserved\n", " * Time taken by each task being executed time-taken\n", " * Which worker is mapped to what all queues\n", "* **Time-Limits **\n", " * Soft-Limit\n", " * Hard-Limit\n", "* **Auto Scale-up and Scale-Out and also shrink to normal**\t \n", "* **Broadcast message to workers**\n", "\t* Helpful in case of revoking tasks\n", "* **Result backend**\n", "* **Handling worker failures**\n", "* **Purge all waiting tasks**" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Admin and Reporting\n", "===================\n", "\n", "* **Scheduling tasks from UI**\n", "\n", " ![](files/images/Celery-Admin-1.png)\n", " ![](files/images/celery-admin-2.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Monitoring Workers\n", "==================\n", "\n", "![](files/images/celery-flower.png)\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Tasks status\n", "================\n", "\n", "![](files/images/celery-flower-tasks.png)\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Historical Trends\n", "=================\n", "\n", "![](files/images/celery-flower-graphs.png)\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Types of workflows\n", "===================\n", "\n", "* Chains \n", "* Groups\n", "* Chord\n", "* Chunks\n", "* Task-Tress" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Chains \n", "======\n", "\n", " * Link tasks together ( [Task1] | Output -> Input | [Task2] -> Input | [Task3] ) \n", " * **task = chain(add.s(4,4),add.s(5))()**\n", " ```\n", " " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Groups\n", "======\n", "\n", "* Take a list of tasks that should be applied in parallel\n", "* (Task1, Task2,.....,TaskN)\n", "* **result_group = **group(add.s(2, 2), add.s(4, 4))**" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Chord\n", "========\n", "\n", " * A chord is a task that only executes after all of the tasks in a group have finished executing.\n", " * **((Task1, Task2,.....,TaskN)|Output - > [Input] | (TaskX))**" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Chunks\n", "=======\n", "\n", "* Divide the work into chunks, execute in parallel.\n", "* **res_chunks = add.chunks(zip(range(100), range(100)), 10)()**" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Task - Trees\n", "============\n", "\n", "* Execute trees of tasks asynchronously in a particular order\n", " \n", " ![](files/images/Task-Trees.png)\n", " \n", " from celery_tasktree import task_with_callbacks, TaskTree\n", "\n", " @task_with_callbacks\n", " def some_action(...):\n", " ...\n", "\n", " def execute_actions():\n", " tree = TaskTree()\n", " task0 = tree.add_task(some_action, args=[...], kwargs={...})\n", " task1 = tree.add_task(some_action, args=[...], kwargs={...})\n", " task10 = task1.add_task(some_action, args=[...], kwargs={...})\n", " task11 = task1.add_task(some_action, args=[...], kwargs={...})\n", " task110 = task11.add_task(some_action, args=[...], kwargs={...})\n", " async_result = tree.apply_async()\n", " return async_result\t" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Tools Available\n", "===============\n", "\n", "* Jobtastic\n", "* Dagobah\n", "* Luigi\n", "* Chronos \n", "* Azkaban\n", "* Apache Oozie" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Thank You & Questions\n", "=====================\n", "\n", "# @konarkmodi\n" ] } ], "metadata": {} } ] }