#!/usr/bin/env python """=cut =head1 NAME celery_tasks_states - Munin plugin to monitor the number of Celery tasks in each state. =head1 REQUIREMENTS - Python - celery (http://celeryproject.org/) - celerymon (http://github.com/ask/celerymon) Note: don't forget to enable sending of the events on the celery daemon - run it with the --events option =head1 CONFIGURATION Default configuration: [celery_tasks_states] env.api_url http://localhost:8989 env.workers all If workers variable is not set or set to "all", task number for all the workers is monitored. You can optionally set the workers variable to the string of hostnames you want to monitor separated by a comma. For example: [celery_tasks] env.workers localhost,foo.bar.net,bar.foo.net This would only monitor the number of tasks for the workers with the hostnames "localhost", "foo.bar.net" and "bar.foo.net" =head1 MAGIC MARKERS #%# family=manual #%# capabilities=autoconf =head1 AUTHOR Tomaz Muraus (http://github.com/Kami/munin-celery) =head1 LICENSE GPLv2 =cut""" import os import sys import urllib try: import json except: import simplejson as json API_URL = 'http://localhost:8989' URL_ENDPOINTS = { 'workers': '/api/worker/', 'worker_tasks': '/api/worker/%s/tasks', 'tasks': '/api/task/', 'task_names': '/api/task/name/', 'task_details': '/api/task/name/%s', } TASK_STATES = ( 'PENDING', 'RECEIVED', 'STARTED', 'SUCCESS', 'FAILURE', 'REVOKED', 'RETRY' ) def get_data(what, api_url, *args): try: request = urllib.urlopen('%s%s' % (api_url, \ URL_ENDPOINTS[what] % (args))) response = request.read() return json.loads(response) except IOError: print 'Could not connect to the celerymon webserver' sys.exit(-1) def check_web_server_status(api_url): try: request = urllib.urlopen(api_url) response = request.read() except IOError: print 'Could not connect to the celerymon webserver' sys.exit(-1) def clean_state_name(state_name): return state_name.lower() # Config def print_config(workers = None): if workers: print 'graph_title Celery tasks in each state [workers = %s]' % (', ' . join(workers)) else: print 'graph_title Celery tasks in each state' print 'graph_args --lower-limit 0' print 'graph_scale no' print 'graph_vlabel tasks per ${graph_period}' print 'graph_category cloud' for name in TASK_STATES: name = clean_state_name(name) print '%s.label %s' % (name, name) print '%s.type DERIVE' % (name) print '%s.min 0' % (name) print '%s.info number of %s tasks' % (name, name) # Values def print_values(workers = None, api_url = None): data = get_data('tasks', api_url) counters = dict([(key, 0) for key in TASK_STATES]) for task_name, task_data in data: state = task_data['state'] hostname = task_data['worker']['hostname'] if workers and hostname not in workers: continue counters[state] += 1 for name in TASK_STATES: name_cleaned = clean_state_name(name) value = counters[name] print '%s.value %d' % (name_cleaned, value) if __name__ == '__main__': workers = os.environ.get('workers', 'all') api_url = os.environ.get('api_url', API_URL) check_web_server_status(api_url) if workers in [None, '', 'all']: workers = None else: workers = workers.split(',') if len(sys.argv) > 1: if sys.argv[1] == 'config': print_config(workers) elif sys.argv[1] == 'autoconf': print 'yes' else: print_values(workers, api_url)