#!/usr/bin/env python """=cut =head1 NAME celery_tasks - Munin plugin to monitor the number of Celery tasks with specified names. =head1 REQUIREMENTS - Python - celery (http://celeryproject.org/) - celerymon (http://github.com/ask/celerymon) Note: don't forget to enable sending of the events on the celery daemon - run it with the --events option =head1 CONFIGURATION Default configuration: None You must set the name of at least one task you want to monitor (multiple names are separated by a comma). For example: [celery_tasks] env.tasks myapp.tasks.SendEmailTask,myapp2.tasks.FetchUserDataTask This would monitor the number of task for a task with name "myapp.tasks.SendEmailTask" and "myapp2.tasks.FetchUserDataTask". =head1 MAGIC MARKERS #%# family=manual #%# capabilities=autoconf =head1 AUTHOR Tomaz Muraus (http://github.com/Kami/munin-celery) =head1 LICENSE GPLv2 =cut""" import os import sys import urllib try: import json except: import simplejson as json API_URL = 'http://localhost:8989' URL_ENDPOINTS = { 'workers': '/api/worker/', 'worker_tasks': '/api/worker/%s/tasks', 'tasks': '/api/task/', 'task_names': '/api/task/name/', 'task_details': '/api/task/name/%s', } TASK_STATES = ( 'PENDING', 'RECEIVED', 'STARTED', 'SUCCESS', 'FAILURE', 'REVOKED', 'RETRY' ) def get_data(what, api_url, *args): try: request = urllib.urlopen('%s%s' % (api_url, \ URL_ENDPOINTS[what] % (args))) response = request.read() return json.loads(response) except IOError: print 'Could not connect to the celerymon webserver' sys.exit(-1) def check_web_server_status(api_url): try: request = urllib.urlopen(api_url) response = request.read() except IOError: print 'Could not connect to the celerymon webserver' sys.exit(-1) def clean_task_name(task_name): return task_name.replace('.', '_') # Config def print_config(task_names): print 'graph_title Celery tasks' print 'graph_args --lower-limit 0' print 'graph_scale no' print 'graph_vlabel tasks per ${graph_period}' print 'graph_category cloud' for name in task_names: print '%s.label %s' % (clean_task_name(name), name) print '%s.type DERIVE' % (clean_task_name(name)) print '%s.min 0' % (clean_task_name(name)) print '%s.info number of %s tasks' % (clean_task_name(name), name) # Values def print_values(task_names = None, api_url = None): for task_name in task_names: count = len(get_data('task_details', api_url, task_name)) print '%s.value %d' % (clean_task_name(task_name), count) if __name__ == '__main__': task_names = os.environ.get('tasks', None) api_url = os.environ.get('api_url', API_URL) check_web_server_status(api_url) if not task_names: print 'You need to define at least one task name' sys.exit(-1) task_names = task_names.split(',') if len(sys.argv) > 1: if sys.argv[1] == 'config': print_config(task_names) elif sys.argv[1] == 'autoconf': print 'yes' else: print_values(task_names, api_url)