#!/usr/bin/env python3 # -*- coding: utf-8; -*- import time import requests from requests import HTTPError from requests.compat import quote, json from requests.auth import HTTPBasicAuth #----------------------------------------------------------------------------- __all__ = ( 'Job', 'Jenkins', 'Server', 'JenkinsError', 'Build', 'View', 'Node' ) __version__ = '0.5.6' #----------------------------------------------------------------------------- class _JenkinsBase(object): '''Base class for Jenkins objects.''' @property def baseurl(self): raise NotImplementedError() def url(self, path): return '%s/%s' % (self.baseurl, path) @property def info(self): url = self.url('api/json?depth=0') err = '%s does not exist' % str(self) return self.server.json(url, errmsg=err) @property def exists(self): '''Check if object exists.''' try: self.info return True except HTTPError as e: if e.response.status_code == 404: return False raise except JenkinsError: return False @property def config(self): url = self.url('config.xml') res = self.server.get(url) if res.status_code != 200 or not res.headers.get('content-type', '').startswith('application/xml'): msg = 'fetching configuration for item "%s" did not return an xml document' raise JenkinsError(msg % self.name) return res.text @config.setter def config(self, newconfig): self.reconfigure(newconfig) @property def config_etree(self): # The cost of `'lxml' in sys.modules` is negligible and is # preferable to having a hard dependency on lxml. from lxml import etree return etree.fromstring(self.config.encode('utf8')) @config_etree.setter def config_etree(self, newconfig_etree): # The cost of `'lxml' in sys.modules` is negligible and is # preferable to having a hard dependency on lxml. from lxml import etree self.reconfigure(etree.tostring(newconfig_etree)) def reconfigure(self, newconfig): '''Update the config.xml of an existing item.''' self._not_exist_raise() url = self.url('config.xml') headers = {'Content-Type': 'text/xml'} params = {'name': self.name} return self.server.post(url, data=newconfig, params=params, headers=headers) def _not_exist_raise(self): if not self.exists: raise JenkinsError('%s does not exist' % str(self)) #----------------------------------------------------------------------------- class Job(_JenkinsBase): '''Represents a Jenkins job.''' __slots__ = 'name', 'server' def __init__(self, name, server): self.name = name self.server = server def __str__(self): return 'job:%r' % (self.name) def __repr__(self): cls = self.__class__.__name__ return '%s(%r)' % (cls, self.name) def __hash__(self): key = (self.name, self.server, self.__class__) return hash(key) def __eq__(self, other): return isinstance(other, self.__class__) \ and self.name == other.name \ and self.server == other.server @property def baseurl(self): return 'job/%s' % quote(self.name) def delete(self): '''Permanently remove job.''' self._not_exist_raise() url = self.url('doDelete') res = self.server.post(url, throw=False) if self.exists: raise JenkinsError('delete of job "%s" failed' % self.name) return res def enable(self): '''Enable job.''' self._not_exist_raise() url = self.url('enable') return self.server.post(url) def disable(self): '''Disable job.''' self._not_exist_raise() url = self.url('disable') return self.server.post(url) def build(self, parameters=None, token=None): '''Trigger a build.''' self._not_exist_raise() params = {} if token: params['token'] = token if parameters: params.update(parameters) url = self.url('buildWithParameters') else: url = self.url('build') return self.server.post(url, params=params) @property def enabled(self): return not 'true' in self.config @property def builds(self): return [Build(self, i['number']) for i in self.info['builds']] def __last_build_helper(self, path): url = self.url(path + '/api/json') res = self.server.json(url) return Build(self, res['number']) @property def last_build(self): return self.__last_build_helper('lastBuild') @property def last_stable_build(self): return self.__last_build_helper('lastStableBuild') @property def last_successful_build(self): return self.__last_build_helper('lastSuccessfulBuild') @property def buildnumbers(self): return [i['number'] for i in self.info['builds']] @classmethod def create(cls, name, configxml, server): '''Create a new Jenkins job.''' job = cls(name, server) if job.exists: raise JenkinsError('job "%s" already exists' % name) headers = {'Content-Type': 'text/xml'} params = {'name': name} res = server.post('createItem', data=configxml, params=params, headers=headers, throw=False) if not res or res.status_code != 200: raise JenkinsError('create "%s" failed' % name) else: res.raise_for_status() # if not job.exists: # raise JenkinsError('create "%s" failed' % name, url=res.url) @classmethod def copy(cls, source, dest, server): '''Copy a Jenkins job.''' job = cls(source, server) newjob = cls(dest, server) if newjob.exists: raise JenkinsError('job "%s" already exists' % dest) if not job.exists: raise JenkinsError('job "%s" does not exist' % source) headers = {'Content-Type': 'text/xml'} params = {'name': dest, 'mode': 'copy', 'from': source} res = server.post('createItem', params=params, headers=headers) if not newjob.exists: msg = 'could not copy job "%s" to "%s"' raise JenkinsError(msg % (source, dest)) return newjob #----------------------------------------------------------------------------- class View(_JenkinsBase): '''Represents a Jenkins view.''' __slots__ = 'name', 'server' def __init__(self, name, server): self.name = name self.server = server def __str__(self): return 'view:%r' % self.name def __repr__(self): cls = self.__class__.__name__ return '%s(%r)' % (cls, self.name) def __hash__(self): key = (self.name, self.server, self.__class__) return hash(key) def __eq__(self, other): return isinstance(other, self.__class__) \ and self.name == other.name \ and self.server == other.server @property def baseurl(self): return 'view/%s' % quote(self.name) @property def jobs(self): return [Job(i['name'], self.server) for i in self.info['jobs']] @property def jobnames(self): return [i['name'] for i in self.info['jobs']] def delete(self): '''Permanently remove view.''' self._not_exist_raise() url = self.url('doDelete') res = self.server.post(url, throw=False) if self.exists: raise JenkinsError('delete of view "%s" failed' % self.name) return res def remove_job(self, job): '''Remove job from view.''' if not self.exists: raise JenkinsError('view "%s" does not exist' % self.name) if not job.exists: raise JenkinsError('job "%s" does not exist' % job.name) url = self.url('removeJobFromView') params = {'name': job.name} res = self.server.post(url, params=params) if not res.status_code == 200: msg = 'could not remove job "%s" from view "%s"' raise JenkinsError(msg % (job.name, self.name)) def add_job(self, job): '''Add job to the view.''' if not self.exists: raise JenkinsError('view "%s" does not exist' % self.name) if not job.exists: raise JenkinsError('job "%s" does not exist' % job.name) url = self.url('addJobToView') params = {'name': job.name} res = self.server.post(url, params=params) if not res.status_code == 200: msg = 'could not add job "%s" to view "%s"' raise JenkinsError(msg % (job.name, self.name)) def has_job(self, job): '''Check if view contains job.''' config = self.config_etree jobs = config.xpath('jobNames/string/text()') job = getattr(job, 'name', job) return job in jobs def __contains__(self, job): return self.has_job(job) @classmethod def create(cls, name, configxml, server): '''Create a new Jenkins view.''' view = cls(name, server) if view.exists: raise JenkinsError('view "%s" already exists' % name) headers = {'Content-Type': 'text/xml'} params = {'name': name} res = server.post('createView', data=configxml, params=params, headers=headers, throw=False) if not res or res.status_code != 200: raise JenkinsError('create "%s" failed' % name) else: res.raise_for_status() #----------------------------------------------------------------------------- class NodeLaunchMethod: COMMAND = 'hudson.slaves.CommandLauncher' JNLP = 'hudson.slaves.JNLPLauncher' SSH = 'hudson.plugins.sshslaves.SSHLauncher' WINDOWS_SERVICE = 'hudson.os.windows.ManagedWindowsServiceLauncher' class Node(_JenkinsBase): '''Represents a Jenkins node.''' __slots__ = 'name', 'server' def __init__(self, name, server): self.name = name self.server = server def __str__(self): return '' % (self.name) def __repr__(self): cls = self.__class__.__name__ return '%s(%r)' % (cls, self.name) def __hash__(self): key = (self.name, self.server, self.__class__) return hash(key) def __eq__(self, other): return isinstance(other, self.__class__) \ and self.name == other.name \ and self.server == other.server @property def baseurl(self): return 'computer/%s' % quote(self.name) @classmethod def create(cls, name, remotefs, server, num_executors=2, node_description=None, labels=None, exclusive=False, launcher=NodeLaunchMethod.COMMAND, launcher_params={}): ''' :param name: name of node to create, ``str`` :param remotefs: Remote root directory, ``str`` :param num_executors: number of executors for node, ``int`` :param node_description: Description of node, ``str`` :param labels: Labels to associate with node, ``str`` :param exclusive: Use this node for tied jobs only, ``bool`` :param launcher: Slave launch method, ``NodeLaunchMethod|str`` :param launcher_params: Additional launcher parameters, ``dict`` ''' node = cls(name, server) if node.exists: raise JenkinsError('node "%s" already exists' % name) mode = 'EXCLUSIVE' if exclusive else 'NORMAL' launcher_params['stapler-class'] = launcher # Note that CommandLauncher class removed from Jenkins core in 2.86. # It is now available in the command-launcher plugin. inner_params = { 'name': name, 'nodeDescription': node_description, 'numExecutors': num_executors, 'remoteFS': remotefs, 'labelString': labels, 'mode': mode, 'type': 'hudson.slaves.DumbSlave$DescriptorImpl', 'retentionStrategy': { 'stapler-class': 'hudson.slaves.RetentionStrategy$Always' }, 'nodeProperties': {'stapler-class-bag': 'true'}, 'launcher': launcher_params } params = { 'name': name, 'type': 'hudson.slaves.DumbSlave$DescriptorImpl', 'json': json.dumps(inner_params) } res = server.post('computer/doCreateItem', params=params, throw=False) if not res or res.status_code != 200: print(res.text) raise JenkinsError('create "%s" failed' % name) else: res.raise_for_status() def delete(self): '''Permanently remove node.''' self._not_exist_raise() url = self.url('doDelete') res = self.server.post(url, throw=False) if self.exists: raise JenkinsError('delete of node "%s" failed' % self.name) return res def reconfigure(self, newconfig): raise NotImplementedError #----------------------------------------------------------------------------- class Build(_JenkinsBase): '''Represents a Jenkins build.''' __slots__ = 'job', 'number', 'server' def __init__(self, job, number): self.job = job self.number = number self.server = self.job.server def __hash__(self): key = (self.job, self.number, self.server, self.__class__) return hash(key) def __eq__(self, other): return isinstance(other, self.__class__) \ and self.job == other.job \ and self.number == other.number \ and self.server == other.server @property def baseurl(self): return '%s/%d' % (self.job.baseurl, self.number) @property def building(self): return self.info['building'] def __repr__(self): cls = self.__class__.__name__ return '%s(%r, %r)' % (cls, self.job, self.number) def stop(self): url = self.url('stop') return self.server.post(url) def wait(self, tick=1, timeout=None): '''Wait for build to complete.''' start = time.time() while self.building: time.sleep(tick) if timeout and (time.time() - start) > timeout: break #----------------------------------------------------------------------------- class Server(object): def __init__(self, url, username=None, password=None, verify=True, cert=None): self.url = url if url.endswith('/') else url + '/' self.auth = HTTPBasicAuth(username, password) if username else None self.verify = verify self.cert = cert self.crumb_header = None # These arguments will be passed in every call to requests.get|post(). self.request_kw = { 'auth': self.auth, 'cert': cert, 'verify': verify, } def __repr__(self): cls = self.__class__.__name__ return '%s(%s)' % (cls, self.url) def __hash__(self): key = (self.url, self.verify, self.cert, self.__class__, self.auth.username if self.auth else None, self.auth.password if self.auth else None) return hash(key) def __eq__(self, other): self_auth = (self.auth.username, self.auth.password) if self.auth else None other_auth = (other.auth.username, other.auth.password) if other.auth else None return isinstance(other, self.__class__) \ and self.url == other.url \ and self.verify == other.verify \ and self.cert == other.cert \ and self_auth == other_auth def urljoin(self, *args): return '%s%s' % (self.url, '/'.join(args)) def post(self, url, throw=True, **kw): kw = mergedict(self.request_kw, kw) if self.crumb_header is not None: headers = kw.get('headers', dict()) headers = mergedict(headers, self.crumb_header) kw['headers'] = headers res = requests.post(self.urljoin(url), **kw) throw and res.raise_for_status() return res def get(self, url, throw=True, **kw): kw = mergedict(self.request_kw, kw) res = requests.get(self.urljoin(url), **kw) throw and res.raise_for_status() return res def json(self, url, errmsg=None, throw=True, **kw): url = self.urljoin(url) kw = mergedict(self.request_kw, kw) try: res = requests.get(url, **kw) throw and res.raise_for_status() if not res: raise JenkinsError(errmsg) return res.json() except ValueError: raise JenkinsError('unparsable json response') #----------------------------------------------------------------------------- class Jenkins(object): def __init__(self, url, username=None, password=None, verify=True, cert=None): '''Create handle to Jenkins instance.''' self.server = Server(url, username, password, verify, cert) self.url = self.server.url self.server.crumb_header = self.crumb_header def __repr__(self): cls = self.__class__.__name__ return '%s(%r)' % (cls, self.url) def __hash__(self): return hash(self.server) ^ hash(self.__class__) def __eq__(self, other): return isinstance(other, self.__class__) and self.server == other.server @property def info(self): '''Get information about this Jenkins instance.''' url = 'api/json' res = self.server.json(url, 'unable to retrieve info') return res @property def computer(self): '''Get information about the Jenkins build executors.''' url = 'computer/api/json' res = self.server.json(url, 'unable to retrieve info') return res @property def crumb(self): '''Get crumb (or None if doesn't exist) from the Jenkins.''' url = 'crumbIssuer/api/json' try: res = self.server.json(url, 'unable to retrieve info') return res except HTTPError as e: if e.response.status_code == 404: return None raise except JenkinsError: return None @property def crumb_header(self): crumb = self.crumb if crumb is None: return None return {crumb['crumbRequestField']: crumb['crumb']} @property def jobs(self): return list(self.xjobs) @property def xjobs(self): return (Job(i['name'], self.server) for i in self.info['jobs']) @property def jobnames(self): return [i['name'] for i in self.info['jobs']] @property def views(self): return [View(i['name'], self.server) for i in self.info['views']] @property def viewnames(self): return [i['name'] for i in self.info['views']] @property def nodes(self): return [Node(name, self.server) for name in self.nodenames] @property def nodenames(self): names = [] for name in (comp['displayName'] for comp in self.computer['computer']): names.append(name if name != 'master' else '(master)') return names #------------------------------------------------------------------------- # alternative jenkins object api def job(self, name): return Job(name, self.server) def view(self, name): return View(name, self.server) def build(self, name, number): job = name if isinstance(name, Job) else self.job(name) return Build(job, number) def node(self, name): return Node(name, self.server) #------------------------------------------------------------------------- def job_info(self, name): return self.job(name).info def job_exists(self, name): return self.job(name).exists def job_delete(self, name): return self.job(name).delete() def job_enable(self, name): return self.job(name).enable() def job_disable(self, name): return self.job(name).disable() def job_enabled(self, name): return self.job(name).enabled def job_config(self, name): return self.job(name).config def job_reconfigure(self, name, newconfig): job = self.job(name) job.config = newconfig return job def job_config_etree(self, name): return self.job(name).config_etree def job_reconfigure_etree(self, name, newconfig): job = self.job(name) job.config_etree = newconfig return job def job_build(self, name, parameters=None, token=None): return self.job(name).build(parameters, token) def job_builds(self, name): return self.job(name).builds def job_last_build(self, name): return self.job(name).last_build def job_last_stable_build(self, name): return self.job(name).last_stable_build def job_last_successful_build(self, name): return self.job(name).last_successful_build def job_create(self, name, config): return Job.create(name, config, self.server) def job_copy(self, source, dest): return Job.copy(source, dest, self.server) #------------------------------------------------------------------------- def build_info(self, job, number): return self.build(job, number).info def build_isbuilding(self, job, number): return self.build(job, number).building def build_stop(self, job, number): return self.build(job, number).stop() def build_wait(self, job, number, interval=1, timeout=None): return self.build(job, number).wait() #------------------------------------------------------------------------- def view_exists(self, name): return self.view(name).exists def view_config(self, name): return self.view(name).config def view_jobs(self, name): return self.view(name).jobs def view_jobnames(self, name): return self.view(name).jobnames def view_reconfigure(self, name, newconfig): view = self.view(name) view.config = newconfig return view def view_config_etree(self, name): return self.view(name).config_etree def view_delete(self, name): return self.view(name).delete() def view_reconfigure_etree(self, name, newconfig): view = self.view(name) view.config_etree = newconfig return view def view_add_job(self, name, job_name): job = self.job(job_name) return self.view(name).add_job(job) def view_has_job(self, name, job_name): job = self.job(job_name) return self.view(name).has_job(job) def view_remove_job(self, name, job_name): job = self.job(job_name) return self.view(name).remove_job(job) def view_create(self, name, config): return View.create(name, config, self.server) #------------------------------------------------------------------------- def node_exists(self, name): return Node(name, self.server).exists def node_create(self, name, remotefs, *args, **kw): return Node.create(name, remotefs, self.server, *args, **kw) def node_info(self, name): return Node(name, self.server).info def node_delete(self, name): return Node(name, self.server).delete() def node_config(self, name): return self.node(name).config def node_config_etree(self, name): return self.node(name).config_etree job_exists.__doc__ = Job.exists.__doc__ job_delete.__doc__ = Job.delete.__doc__ job_enable.__doc__ = Job.enable.__doc__ job_disable.__doc__ = Job.disable.__doc__ job_reconfigure.__doc__ = Job.reconfigure.__doc__ job_build.__doc__ = Job.build.__doc__ job_create.__doc__ = Job.create.__doc__ job_copy.__doc__ = Job.copy.__doc__ build_stop.__doc__ = Build.stop.__doc__ build_wait.__doc__ = Build.wait.__doc__ view_exists.__doc__ = View.exists.__doc__ view_add_job.__doc__ = View.add_job.__doc__ view_remove_job.__doc__ = View.remove_job.__doc__ view_create.__doc__ = View.create.__doc__ view_delete.__doc__ = View.delete.__doc__ view_has_job.__doc__ = View.has_job.__doc__ node_exists.__doc__ = Node.exists.__doc__ node_create.__doc__ = Node.create.__doc__ node_delete.__doc__ = Node.delete.__doc__ #----------------------------------------------------------------------------- # Utility functions. def mergedict(a, b): c = a.copy() c.update(b) return c #----------------------------------------------------------------------------- class JenkinsError(Exception): '''Exception type for Jenkins-API related failures.''' def __init__(self, msg): super(JenkinsError, self).__init__(msg) self.msg = msg #----------------------------------------------------------------------------- # Uncomment to enable http logging # try: # import logging, httplib # httplib.HTTPConnection.debuglevel = 1 # except ImportError: # import logging, http.client # http.client.HTTPConnection.debuglevel = 1 # logging.basicConfig() # logging.getLogger().setLevel(logging.DEBUG) # requests_log = logging.getLogger('requests.packages.urllib3') # requests_log.setLevel(logging.DEBUG) # requests_log.propagate = True