{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# flower REST API\n", "\n", "This document shows how to use the flower [REST API](https://github.com/mher/flower#api). \n", "\n", "We will use [requests](http://www.python-requests.org/en/latest/) for accessing the API. (See [here](http://www.python-requests.org/en/latest/user/install/) on how to install it.) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Code\n", "We'll use the following code throughout the documentation." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## tasks.py" ] }, { "cell_type": "code", "execution_count": 43, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from celery import Celery\n", "from time import sleep\n", "\n", "celery = Celery()\n", "celery.config_from_object({\n", " 'BROKER_URL': 'amqp://localhost',\n", " 'CELERY_RESULT_BACKEND': 'amqp://',\n", " 'CELERYD_POOL_RESTARTS': True, # Required for /worker/pool/restart API\n", "})\n", "\n", "\n", "@celery.task\n", "def add(x, y):\n", " return x + y\n", "\n", "\n", "@celery.task\n", "def sub(x, y):\n", " sleep(30) # Simulate work\n", " return x - y" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Running\n", "You'll need a celery worker instance and a flower instance running. In one terminal window run\n", "\n", " celery worker --loglevel INFO -A proj -E --autoscale 10,3\n", "\n", "and in another terminal run\n", "\n", " celery flower -A proj" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Tasks API\n", "The tasks API is *async*, meaning calls will return immediately and you'll need to poll on task status." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Done once for the whole docs\n", "import requests, json\n", "api_root = 'http://localhost:5555/api'\n", "task_api = '{}/task'.format(api_root)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## async-apply" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/task/async-apply/tasks.add\n" ] }, { "data": { "text/plain": [ "{u'state': u'PENDING', u'task-id': u'f4a53407-30f3-42af-869f-b7f8f4fbd684'}" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "args = {'args': [1, 2]}\n", "url = '{}/async-apply/tasks.add'.format(task_api)\n", "print(url)\n", "resp = requests.post(url, data=json.dumps(args))\n", "reply = resp.json()\n", "reply" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see that we created a new task and it's pending. Note that the API is *async*, meaning it won't wait until the task finish." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## apply" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For create task and wait results you can use 'apply' API." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/task/apply/tasks.add\n" ] }, { "data": { "text/plain": [ "{u'result': 3,\n", " u'state': u'SUCCESS',\n", " u'task-id': u'ced6fd57-419e-4b8e-8d99-0770be717cb4'}" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "args = {'args': [1, 2]}\n", "url = '{}/apply/tasks.add'.format(task_api)\n", "print(url)\n", "resp = requests.post(url, data=json.dumps(args))\n", "reply = resp.json()\n", "reply" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## result\n", "Gets the task result. This is *async* and will return immediately even if the task didn't finish (with state 'PENDING')" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/task/result/ced6fd57-419e-4b8e-8d99-0770be717cb4\n" ] }, { "data": { "text/plain": [ "{u'result': 3,\n", " u'state': u'SUCCESS',\n", " u'task-id': u'ced6fd57-419e-4b8e-8d99-0770be717cb4'}" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "url = '{}/result/{}'.format(task_api, reply['task-id'])\n", "print(url)\n", "resp = requests.get(url)\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## revoke\n", "Revoke a running task." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/task/revoke/bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8\n" ] }, { "data": { "text/plain": [ "{u'message': u\"Revoked 'bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8'\"}" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Run a task\n", "args = {'args': [1, 2]}\n", "resp = requests.post('{}/async-apply/tasks.sub'.format(task_api), data=json.dumps(args))\n", "reply = resp.json()\n", "\n", "# Now revoke it\n", "url = '{}/revoke/{}'.format(task_api, reply['task-id'])\n", "print(url)\n", "resp = requests.post(url, data='terminate=True')\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## rate-limit\n", "Update [rate limit](https://docs.celeryq.dev/en/latest/userguide/tasks.html#Task.rate_limit) for a task." ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/task/rate-limit/miki-manjaro\n" ] }, { "data": { "text/plain": [ "{u'message': u'new rate limit set successfully'}" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "worker = 'miki-manjaro' # You'll need to get the worker name from the worker API (seel below)\n", "url = '{}/rate-limit/{}'.format(task_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'taskname': 'tasks.add', 'ratelimit': '10'})\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## timeout\n", "Set timeout (both [hard](https://docs.celeryq.dev/en/latest/userguide/tasks.html#Task.time_limit) and [soft](https://docs.celeryq.dev/en/latest/userguide/tasks.html#Task.soft_time_limit)) for a task." ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/task/timeout/miki-manjaro\n" ] }, { "data": { "text/plain": [ "{u'message': u'time limits set successfully'}" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "url = '{}/timeout/{}'.format(task_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'taskname': 'tasks.add', 'hard': '3.14', 'soft': '3'}) # You can omit soft or hard\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Worker API" ] }, { "cell_type": "code", "execution_count": 55, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Once for the documentation\n", "worker_api = '{}/worker'.format(api_root)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## workers\n", "List workers." ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/workers\n" ] }, { "data": { "text/plain": [ "{u'miki-manjaro': {u'completed_tasks': 0,\n", " u'concurrency': 1,\n", " u'queues': [u'celery'],\n", " u'running_tasks': 0,\n", " u'status': True}}" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "url = '{}/workers'.format(api_root) # Only one not under /worker\n", "print(url)\n", "resp = requests.get(url)\n", "workers = resp.json()\n", "workers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/shutdown\n", "Shutdown a worker." ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/worker/shutdown/miki-manjaro\n" ] }, { "data": { "text/plain": [ "{u'message': u'Shutting down!'}" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "worker = workers.keys()[0]\n", "url = '{}/shutdown/{}'.format(worker_api, worker)\n", "print(url)\n", "resp = requests.post(url)\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/restart\n", "Restart a worker pool, you need to have [CELERYD_POOL_RESTARTS](https://docs.celeryq.dev/en/latest/configuration.html#std:setting-CELERYD_POOL_RESTARTS) enabled in your configuration)." ] }, { "cell_type": "code", "execution_count": 43, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/worker/pool/restart/miki-manjaro\n" ] }, { "data": { "text/plain": [ "{u'message': u\"Restarting 'miki-manjaro' worker's pool\"}" ] }, "execution_count": 43, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pool_api = '{}/pool'.format(worker_api)\n", "url = '{}/restart/{}'.format(pool_api, worker)\n", "print(url)\n", "resp = requests.post(url)\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/grow\n", "Grows worker pool." ] }, { "cell_type": "code", "execution_count": 53, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/worker/pool/grow/miki-manjaro\n" ] }, { "data": { "text/plain": [ "{u'message': u\"Growing 'miki-manjaro' worker's pool\"}" ] }, "execution_count": 53, "metadata": {}, "output_type": "execute_result" } ], "source": [ "url = '{}/grow/{}'.format(pool_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'n': '10'})\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/shrink\n", "Shrink worker pool." ] }, { "cell_type": "code", "execution_count": 54, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/worker/pool/shrink/miki-manjaro\n" ] }, { "data": { "text/plain": [ "{u'message': u\"Shrinking 'miki-manjaro' worker's pool\"}" ] }, "execution_count": 54, "metadata": {}, "output_type": "execute_result" } ], "source": [ "url = '{}/shrink/{}'.format(pool_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'n': '3'})\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## pool/autoscale\n", "[Autoscale](https://docs.celeryq.dev/en/latest/userguide/workers.html#autoscaling) a pool." ] }, { "cell_type": "code", "execution_count": 58, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/worker/pool/autoscale/miki-manjaro\n" ] }, { "data": { "text/plain": [ "{u'message': u\"Autoscaling 'miki-manjaro' worker\"}" ] }, "execution_count": 58, "metadata": {}, "output_type": "execute_result" } ], "source": [ "url = '{}/autoscale/{}'.format(pool_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'min': '3', 'max': '10'})\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## queue/add-consumer\n", "[Add a consumer](https://docs.celeryq.dev/en/latest/userguide/workers.html#std:control-add_consumer) to a queue." ] }, { "cell_type": "code", "execution_count": 62, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/worker/queue/add-consumer/miki-manjaro\n" ] }, { "data": { "text/plain": [ "{u'message': u\"add consumer u'jokes'\"}" ] }, "execution_count": 62, "metadata": {}, "output_type": "execute_result" } ], "source": [ "queue_api = '{}/queue'.format(worker_api)\n", "url = '{}/add-consumer/{}'.format(queue_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'queue': 'jokes'})\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## queue/cancel-consumer\n", "[Cancel a consumer](https://docs.celeryq.dev/en/latest/userguide/workers.html#queues-cancelling-consumers) queue." ] }, { "cell_type": "code", "execution_count": 63, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/worker/queue/cancel-consumer/miki-manjaro\n" ] }, { "data": { "text/plain": [ "{u'message': u'no longer consuming from jokes'}" ] }, "execution_count": 63, "metadata": {}, "output_type": "execute_result" } ], "source": [ "url = '{}/cancel-consumer/{}'.format(queue_api, worker)\n", "print(url)\n", "resp = requests.post(url, params={'queue': 'jokes'})\n", "resp.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Queue API\n", "\n", "We assume that we've two queues; the default one 'celery' and 'all'" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:5555/api/queues/length\n" ] }, { "data": { "text/plain": [ "{u'active_queues': [{u'messages': 2, u'name': u'all'},\n", " {u'messages': 1, u'name': u'celery'}]}" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "url = '{}/queues/length'.format(api_root)\n", "print(url)\n", "resp = requests.get(url)\n", "resp.json()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 0 }