{ "metadata": { "name": "" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# flower REST API\n", "\n", "This document shows how to use the flower [REST API](https://github.com/mher/flower#api). \n", "\n", "We will use [requests](http://www.python-requests.org/en/latest/) for accessing the API. (See [here](http://www.python-requests.org/en/latest/user/install/) on how to install it.) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Code\n", "We'll use the following code throughout the documentation." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## tasks.py" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from celery import Celery\n", "from time import sleep\n", "\n", "celery = Celery()\n", "celery.config_from_object({\n", " 'BROKER_URL': 'amqp://localhost',\n", " 'CELERY_RESULT_BACKEND': 'amqp://',\n", " 'CELERYD_POOL_RESTARTS': True, # Required for /worker/pool/restart API\n", "})\n", "\n", "\n", "@celery.task\n", "def add(x, y):\n", " return x + y\n", "\n", "\n", "@celery.task\n", "def sub(x, y):\n", " sleep(30) # Simulate work\n", " return x - y" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 43 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Running\n", "You'll need a celery worker instance and a flower instance running. In one terminal window run\n", "\n", " celery worker --loglevel INFO -A proj -E --autoscale 10,3\n", "\n", "and in another terminal run\n", "\n", " celery flower -A proj" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Tasks API\n", "The tasks API is *async*, meaning calls will return immediatly and you'll need to poll on task status." ] }, { "cell_type": "code", "collapsed": false, "input": [ "# Done once for the whole docs\n", "import requests, json\n", "api_root = 'http://localhost:5555/api'\n", "task_api = '{}/task'.format(api_root)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 2 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## async-apply" ] }, { "cell_type": "code", "collapsed": false, "input": [ "args = {'args': [1, 2]}\n", "url = '{}/async-apply/tasks.add'.format(task_api)\n", "print(url)\n", "resp = requests.post(url, data=json.dumps(args))\n", "reply = resp.json()\n", "reply" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/task/async-apply/tasks.add\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 6, "text": [ "{u'state': u'PENDING', u'task-id': u'f4a53407-30f3-42af-869f-b7f8f4fbd684'}" ] } ], "prompt_number": 6 }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see that we created a new task and it's pending. Note that the API is *async*, meaning it won't wait until the task finish." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## result\n", "Gets the task result. This is *async* and will return immediatly even if the task didn't finish (with state 'PENDING')" ] }, { "cell_type": "code", "collapsed": false, "input": [ "url = '{}/result/{}'.format(task_api, reply['task-id'])\n", "print(url)\n", "resp = requests.get(url)\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/task/result/ced6fd57-419e-4b8e-8d99-0770be717cb4\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 5, "text": [ "{u'result': 3,\n", " u'state': u'SUCCESS',\n", " u'task-id': u'ced6fd57-419e-4b8e-8d99-0770be717cb4'}" ] } ], "prompt_number": 5 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## revoke\n", "Revoke a running task." ] }, { "cell_type": "code", "collapsed": false, "input": [ "# Run a task\n", "args = {'args': [1, 2]}\n", "resp = requests.post('{}/async-apply/tasks.sub'.format(task_api), data=json.dumps(args))\n", "reply = resp.json()\n", "\n", "# Now revoke it\n", "url = '{}/revoke/{}'.format(task_api, reply['task-id'])\n", "print(url)\n", "resp = requests.post(url, data='terminate=True')\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/task/revoke/bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 7, "text": [ "{u'message': u\"Revoked 'bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8'\"}" ] } ], "prompt_number": 7 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## rate-limit\n", "Update [rate limit](http://docs.celeryproject.org/en/latest/userguide/tasks.html#Task.rate_limit) for a task." ] }, { "cell_type": "code", "collapsed": false, "input": [ "worker = 'miki-manjaro' # You'll need to get the worker name from the worker API (seel below)\n", "url = '{}/rate-limit/{}'.format(task_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'taskname': 'tasks.add', 'ratelimit': '10'})\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/task/rate-limit/miki-manjaro\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 20, "text": [ "{u'message': u'new rate limit set successfully'}" ] } ], "prompt_number": 20 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## timeout\n", "Set timeout (both [hard](http://docs.celeryproject.org/en/latest/userguide/tasks.html#Task.time_limit) and [soft](http://docs.celeryproject.org/en/latest/userguide/tasks.html#Task.soft_time_limit)) for a task." ] }, { "cell_type": "code", "collapsed": false, "input": [ "url = '{}/timeout/{}'.format(task_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'taskname': 'tasks.add', 'hard': '3.14', 'soft': '3'}) # You can omit soft or hard\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/task/timeout/miki-manjaro\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 22, "text": [ "{u'message': u'time limits set successfully'}" ] } ], "prompt_number": 22 }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Worker API" ] }, { "cell_type": "code", "collapsed": false, "input": [ "# Once for the documentation\n", "worker_api = '{}/worker'.format(api_root)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 55 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## workers\n", "List workers." ] }, { "cell_type": "code", "collapsed": false, "input": [ "url = '{}/workers'.format(api_root) # Only one not under /worker\n", "print(url)\n", "resp = requests.get(url)\n", "workers = resp.json()\n", "workers" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/workers\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 25, "text": [ "{u'miki-manjaro': {u'completed_tasks': 0,\n", " u'concurrency': 1,\n", " u'queues': [u'celery'],\n", " u'running_tasks': 0,\n", " u'status': True}}" ] } ], "prompt_number": 25 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/shutdown\n", "Shutdown a worker." ] }, { "cell_type": "code", "collapsed": false, "input": [ "worker = workers.keys()[0]\n", "url = '{}/shutdown/{}'.format(worker_api, worker)\n", "print(url)\n", "resp = requests.post(url)\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/worker/shutdown/miki-manjaro\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 30, "text": [ "{u'message': u'Shutting down!'}" ] } ], "prompt_number": 30 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/restart\n", "Restart a worker pool, you need to have [CELERYD_POOL_RESTARTS](http://docs.celeryproject.org/en/latest/configuration.html#std:setting-CELERYD_POOL_RESTARTS) enabled in your configuration)." ] }, { "cell_type": "code", "collapsed": false, "input": [ "pool_api = '{}/pool'.format(worker_api)\n", "url = '{}/restart/{}'.format(pool_api, worker)\n", "print(url)\n", "resp = requests.post(url)\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/worker/pool/restart/miki-manjaro\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 43, "text": [ "{u'message': u\"Restarting 'miki-manjaro' worker's pool\"}" ] } ], "prompt_number": 43 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/grow\n", "Grows worker pool." ] }, { "cell_type": "code", "collapsed": false, "input": [ "url = '{}/grow/{}'.format(pool_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'n': '10'})\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/worker/pool/grow/miki-manjaro\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 53, "text": [ "{u'message': u\"Growing 'miki-manjaro' worker's pool\"}" ] } ], "prompt_number": 53 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/shrink\n", "Shrink worker pool." ] }, { "cell_type": "code", "collapsed": false, "input": [ "url = '{}/shrink/{}'.format(pool_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'n': '3'})\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/worker/pool/shrink/miki-manjaro\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 54, "text": [ "{u'message': u\"Shrinking 'miki-manjaro' worker's pool\"}" ] } ], "prompt_number": 54 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/autoscale\n", "[Autoscale](http://docs.celeryproject.org/en/latest/userguide/workers.html#autoscaling) a pool." ] }, { "cell_type": "code", "collapsed": false, "input": [ "url = '{}/autoscale/{}'.format(pool_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'min': '3', 'max': '10'})\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/worker/pool/autoscale/miki-manjaro\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 58, "text": [ "{u'message': u\"Autoscaling 'miki-manjaro' worker\"}" ] } ], "prompt_number": 58 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## queue/add-consumer\n", "[Add a consumer](http://docs.celeryproject.org/en/latest/userguide/workers.html#std:control-add_consumer) to a queue." ] }, { "cell_type": "code", "collapsed": false, "input": [ "queue_api = '{}/queue'.format(worker_api)\n", "url = '{}/add-consumer/{}'.format(queue_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'queue': 'jokes'})\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/worker/queue/add-consumer/miki-manjaro\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 62, "text": [ "{u'message': u\"add consumer u'jokes'\"}" ] } ], "prompt_number": 62 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## queue/cancel-consumer\n", "[Cancel a consumer](http://docs.celeryproject.org/en/latest/userguide/workers.html#queues-cancelling-consumers) queue." ] }, { "cell_type": "code", "collapsed": false, "input": [ "url = '{}/cancel-consumer/{}'.format(queue_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'queue': 'jokes'})\n", "resp.json()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "http://localhost:5555/api/worker/queue/cancel-consumer/miki-manjaro\n" ] }, { "metadata": {}, "output_type": "pyout", "prompt_number": 63, "text": [ "{u'message': u'no longer consuming from jokes'}" ] } ], "prompt_number": 63 }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }