#!/usr/bin/env python2 # # git-pw - git subcommand to integrate with patchwork # # Copyright (C) 2015 Intel Corporation # # Some snippets and ideas are taken from git-bz and git-phab, both licensed # under the GPLv2+. # # Copyright (C) 2008 Owen Taylor # Copyright (C) 2015 Xavier Claessens # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, If not, see # http://www.gnu.org/licenses/. # # Authors: # # Damien Lespiau # # Requirements: # # - On Fedora: # # $ sudo dnf install python-GitPython python-requests # # - On Debian/Ubuntu # # $ sudo apt-get install python-git python-requests # # - Using pip: # # $ cat git-pw/requirements.txt # GitPython # requests # $ pip install -r requirements.txt # # Setup (example): # # git config patchwork.default.url https://patchwork.freedesktop.org # git config patchwork.default.project intel-gfx import argparse from collections import OrderedDict import errno import fcntl import json import os import subprocess import signal import struct import sys import termios import git import requests def die(message): print >>sys.stderr, 'fatal: ' + message sys.exit(1) class HttpError(Exception): def __init__(self, status_code): self.status_code = status_code self.response = None def set_response(self, response): self.response = response def handle_error(self, msg_overrides={}): msg = { 404: "Couldn't find object", 500: "Internal error, you found a bug in Patchwork!" } msg.update(msg_overrides) if self.status_code == 401 or self.status_code == 403: die("Not authorized: %s" % self.response.json()['detail']) elif self.status_code == 400: data = self.response.json() for field in data: print(field + ':') for reason in data[field]: print(' ' + reason) die("Invalid input") elif self.status_code in msg: die(msg[self.status_code]) class Command(object): meta = { 'apply': { 'need_git_repo': True, 'need_project' : False, 'need_auth' : False, }, 'apply-patch': { 'need_git_repo': True, 'need_project' : False, 'need_auth' : False, }, 'mbox': { 'need_git_repo': True, 'need_project' : False, 'need_auth' : False, }, 'mbox-patch': { 'need_git_repo': True, 'need_project' : False, 'need_auth' : False, }, 'list': { 'need_git_repo': True, 'need_project' : True, 'need_auth' : False, }, 'poll-events': { 'need_git_repo': True, 'need_project' : True, 'need_auth' : False, }, 'post-result': { 'need_git_repo': True, 'need_project' : False, 'need_auth' : True, }, 'post-result-patch': { 'need_git_repo': True, 'need_project' : False, 'need_auth' : True, }, } aliases = { 'as': 'apply', 'ap': 'apply-patch', } def __getattr__(self, name): return self.meta[self.canonical_name()][name] def canonical_name(self): if self.name in self.aliases: return self.aliases[self.name] return self.name def method_name(self): return self.canonical_name().replace('-', '_') class User(object): def __init__(self, username, password): self.username = username self.password = password class TestState(object): PENDING = 0 SUCCESS = 1 WARNING = 2 FAILURE = 3 CHOICES = ( (PENDING, 'pending'), (SUCCESS, 'success'), (WARNING, 'warning'), (FAILURE, 'failure'), ) @classmethod def choices(cls): return [s for _, s in cls.CHOICES] class RestObject(object): def __init__(self, patchwork): self.pw = patchwork def get(self, url='/', params=None): return self.pw.get(self.url(url), params) def get_list(self, url='/', params=None, n_items=-1): return self.pw.get_list(self.url(url), params, n_items) def post(self, url='/', data={}): return self.pw.post(self.url(url), data) def absolute_url(self, url='/'): return self.pw.api_base + self.url(url) def __getattr__(self, name): return self.get()[name] class Project(RestObject): def __init__(self, patchwork, linkname): super(Project, self).__init__(patchwork) self.linkname = linkname def url(self, url='/'): return '/projects/' + self.linkname + url class Revision(RestObject): def __init__(self, series, rev): super(Revision, self).__init__(series.pw) self.series = series self._rev = rev def url(self, url='/'): return '/series/%d/revisions/%d%s' % (self.series.id, self.rev, url) @property def rev(self): if self._rev: return self._rev self._rev = self.series.get('/')['version'] return self._rev class Series(RestObject): FIELDS = ['id', 'project', 'name', 'n_patches', 'submitter', 'submitted', 'last_updated', 'version', 'reviewer'] meta = { 'id': { 'header': 'ID', }, 'project': { 'header': 'Project', }, 'name': { 'header': 'Name', 'can_shrink': True, }, 'n_patches': { 'header': 'Patches', }, 'submitter': { 'header': 'Submitter', }, 'submitted': { 'header': 'Submitted', 'format': 'date', }, 'last_updated': { 'header': 'Updated', 'format': 'date', }, 'version': { 'header': 'Version', }, 'reviewer': { 'header': 'Reviewer', }, } def __init__(self, patchwork, series_id): super(Series, self).__init__(patchwork) self.id = series_id def url(self, url='/'): return '/series/' + str(self.id) + url def get_revision(self, rev): return Revision(self, rev) class Patch(RestObject): def __init__(self, patchwork, patch_id): super(Patch, self).__init__(patchwork) self.id = patch_id def url(self, url='/'): return '/patches/' + str(self.id) + url class Patchwork(object): def __init__(self, web_root, project_linkname, user): if not web_root.endswith('/'): web_root += '/' self.api_base = web_root + 'api/1.0' self.web_root = web_root self.project = Project(self, project_linkname) self.user = user self.json_cache = {} def get(self, url, params=None): absolute_url = self.api_base + url if absolute_url in self.json_cache: return self.json_cache[absolute_url] r = requests.get(absolute_url, params=params) if r.status_code is not 200: raise HttpError(r.status_code) json = r.json() self.json_cache[absolute_url] = json return json def _compute_perpage(self, params, n_items): # always respect what we've been asked to do if 'perpage' in params: return params['perpage'] # 100 is the perpage limit the API allows if n_items >= 0 and n_items <= 100: return n_items return 100 def get_list(self, url, params=None, n_items=-1): absolute_url = self.api_base + url params['perpage'] = self._compute_perpage(params, n_items) while absolute_url and n_items != 0: r = requests.get(absolute_url, params=params) if r.status_code is not 200: raise HttpError(r.status_code) data = r.json() for element in data['results']: n_items -= 1 yield element # fetch next page absolute_url = data['next'] def post(self, url, data): absolute_url = self.api_base + url credentials = (self.user.username, self.user.password) r = requests.post(absolute_url, json=data, auth=credentials) if r.status_code < 200 or r.status_code >= 300: e = HttpError(r.status_code) e.set_response(r) raise e return r.json() def setup(self): try: self.api = self.get('/') except HttpError: die("%s doesn't seem to be a patchwork instance." % self.web_root) def get_project(self): return self.project def get_series(self, series_id): return Series(self, series_id) def get_patch(self, patch_id): return Patch(self, patch_id) class Terminal(object): DEFAULT_WIDTH = 80 DEFAULT_HEIGHT = 24 def get_size(self): try: device = fcntl.ioctl(0, termios.TIOCGWINSZ, '\0' * 8) except IOError: return self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT size = struct.unpack('hhhh', device)[:2] return size[::-1] class Table(object): default_config = { 'column-spacing': 1, 'header-separator': '-', } def __init__(self, config): self.config = {} self.config.update(self.default_config) self.config.update(config) def _layout(self, items): layout = {} layout['__table__'] = {} # table-global layout information columns = self.config['columns'] last_column = list(columns)[-1] # horizontal width of each column for key, column in columns.iteritems(): layout[key] = {} layout[key]['width'] = len(column['header']) for key, column in columns.iteritems(): format = column.get('format', None) if format == 'date': layout[key]['width'] = 10 layout[key]['ellipsis'] = True continue for item in items: it = str(item[key]) if isinstance(item[key], int) else item[key] layout[key]['width'] = max(layout[key]['width'], len(it)) # width of the table width = 0 for key, _ in columns.iteritems(): width += layout[key]['width'] width += self.config['column-spacing'] * (len(columns) - 1) layout['__table__']['width'] = width table_width = width # we allow one column to shrink to accomodate 'max_width' shrinking_column = None if ('max_width' in self.config and table_width > self.config['max_width']): for key, column in columns.iteritems(): if 'can_shrink' in column: shrinking_column = key break if shrinking_column: table_max_width = self.config['max_width'] column_old_width = layout[shrinking_column]['width'] column_min_width = len(columns[shrinking_column]['header']) other_columns_width = table_width - column_old_width column_new_width = table_max_width - other_columns_width if column_new_width < column_min_width: # Degenerate case, can't comply with the max_width being asked. # Shrink the column to min_width. column_new_width = column_min_width layout[shrinking_column]['width'] = column_new_width layout[shrinking_column]['ellipsis'] = True layout['__table__']['width'] = other_columns_width + \ column_new_width item = items[0] for key, column in columns.iteritems(): # alignment of each column align = '>' if isinstance(item[key], basestring): align = '<' layout[key]['align'] = align # format string of each column spacing = ' ' * self.config['column-spacing'] if key == last_column: spacing = '' width = layout[key]['width'] ellipsis = layout[key].get('ellipsis', False) precision = ('.' + str(width)) * int(ellipsis) layout[key]['format'] = "{:%s%s%s}%s" % (layout[key]['align'], width, precision, spacing) return layout def write(self, items): layout = self._layout(items) columns = self.config['columns'] last_column = list(columns)[-1] # print headers for key, column in columns.iteritems(): spacing = ' ' * self.config['column-spacing'] if key == last_column: spacing = '' fmt = "{:^%s}%s" % (layout[key]['width'], spacing) sys.stdout.write(fmt.format(column['header'])) sys.stdout.write("\n") sys.stdout.write(self.config['header-separator'] * layout['__table__']['width']) sys.stdout.write("\n") # print data for item in items: for key, _ in columns.iteritems(): fmt = layout[key]['format'] sys.stdout.write(fmt.decode().format(item[key])) sys.stdout.write("\n") class GitPatchwork(object): def __init__(self): self.cmd = Command() def setup(self): try: self.repo = git.Repo(os.getcwd()) except git.exc.InvalidGitRepositoryError: if self.cmd.need_git_repo: die('Not a git repository.') config = self.repo.config_reader() # Select which configuration we are going to use. In order: # - config given on the command line # - config specified in the patchwork.defaultconfig variable # - 'default' default_config = None try: default_config = config.get('patchwork', 'defaultconfig') except: pass if not self.cmd.config and default_config: self.cmd.config = default_config if not self.cmd.config: self.cmd.config = 'default' section = 'patchwork "%s"' % self.cmd.config # Grab that config 'url' and 'project' fields. 'project' is actually # optional, not all commands need one and we allow not setting it. try: web_root = config.get(section, 'url') except: die("git-pw isn't configured.\n\n" "Please set up the patchwork url and project, e.g.:\n\n" " git config patchwork.%(config)s.url " "https://patchwork.freedesktop.org\n" " git config patchwork.%(config)s.project intel-gfx\n" % { 'config': self.cmd.config, }) project = None try: project = config.get(section, 'project') except: pass if not project and self.cmd.need_project: die('No project configured.\n\n' "Please set up the patchwork project, e.g.:\n\n" " git config patchwork.%(config)s.project intel-gfx\n" % { 'config': self.cmd.config, }) # username/password + HTTP basic auth. Prefer using the Token based # auth mechanism. In any case, using HTTPS is a must. username = None password = None user = None try: username = config.get(section, 'user') password = config.get(section, 'password') user = User(username, password) except: pass if not user and self.cmd.need_auth: die('No authentication configured.\n\n' "Please set up credentials, e.g.:\n\n" " git config patchwork.%(config)s.user myusername\n" " git config patchwork.%(config)s.password mypassword\n" % { 'config': self.cmd.config, }) self.pw = Patchwork(web_root, project, user) self.pw.setup() def am(self, mailbox_url): r = requests.get(mailbox_url) if r.status_code is not 200: raise HttpError(r.status_code) args = [] if self.cmd.signoff: args += ['-s'] p = subprocess.Popen(['git', 'am', '-3'] + args, stdin=subprocess.PIPE) p.communicate(r.content) return p.returncode def cmd_get_series_revision(self): series = self.pw.get_series(self.cmd.series_id) revision = series.get_revision(self.cmd.revision) if self.cmd.revision and (self.cmd.revision < 1 or self.cmd.revision > series.version): die("Invalid revision: %d.\n" "Series %d has %d revision(s) (from 1 to %d)" % (self.cmd.revision, series.id, series.version, series.version)) return (series, revision) def do_apply(self): try: (series, revision) = self.cmd_get_series_revision() print('Applying series: %s (rev %d)' % (series.name, series.version)) return self.am(revision.absolute_url('/mbox/')) except HttpError as e: if e.status_code != 404: raise die('No series with id %d.' % self.cmd.series_id) def do_apply_patch(self): patch = self.pw.get_patch(self.cmd.patch_id) try: return self.am(patch.absolute_url('/mbox/')) except HttpError as e: if e.status_code != 404: raise die('No patch with id %d.' % self.cmd.patch_id) def _print_mbox(self, mailbox_url): r = requests.get(mailbox_url) if r.status_code is not 200: raise HttpError(r.status_code) print r.content def do_mbox(self): (series, revision) = self.cmd_get_series_revision() try: return self._print_mbox(revision.absolute_url('/mbox/')) except HttpError as e: if e.status_code != 404: raise die('No series with id %d.' % self.cmd.series_id) def do_mbox_patch(self): patch = self.pw.get_patch(self.cmd.patch_id) try: return self._print_mbox(patch.absolute_url('/mbox/')) except HttpError as e: if e.status_code != 404: raise die('No patch with id %d.' % self.cmd.patch_id) def do_list(self): project = self.pw.get_project() params = { 'ordering': '-last_updated' } filter_applied = False # since related parameters if self.cmd.submitted_since: filter_applied = True params['ordering'] = 'submitted' params['submitted_since'] = self.cmd.submitted_since elif self.cmd.updated_since: filter_applied = True params['ordering'] = 'last_updated' params['updated_since'] = self.cmd.updated_since if self.cmd.test_state: filter_applied = True params['test_state'] = self.cmd.test_state if self.cmd.title: filter_applied = True params['name'] = self.cmd.title series_list = [] try: for series in project.get_list('/series/', params=params, n_items=self.cmd.n): series_list.append(series) except HttpError as e: e.handle_error({ 404: "No project with link name %s." % project.linkname }) raise # print the series if len(series_list) == 0: if filter_applied: print('No series found!') else: print('No series in this project yet!') return # JSON if self.cmd.json: for series in series_list: print(json.dumps(series)) return # Human readable filtered_fields = {key: Series.meta[key] for key in self.cmd.fields} columns = OrderedDict(sorted(filtered_fields.items(), key=lambda k: self.cmd.fields.index(k[0]))) config = { 'columns': columns, 'max_width': Terminal().get_size()[0], } table = Table(config=config) table.write(series_list) def do_poll_events(self): project = self.pw.get_project() ts_filename = '.git-pw.%s.poll.timestamp' % project.linkname params = { 'ordering': 'event_time', } if self.cmd.event_names: params['name'] = ','.join(self.cmd.event_names) # find out if we have a 'since' GET parameter since = None try: with open(ts_filename) as ts_file: since = ts_file.read() except IOError as e: pass if self.cmd.since: since = self.cmd.since # if we don't have a 'since' parameter, default to retrieving 20 items n_items = 20 if since: params['since'] = since n_items = -1 # list the events, older first. We limit ourselves to 20 events, the # number of items per page the API will return by default. try: for event in project.get_list('/events/', params=params, n_items=n_items): print(json.dumps(event)) with open(ts_filename, 'w+') as ts_file: ts_file.write(event['event_time']) except HttpError as e: if e.status_code != 404: raise die('No project with link name %s.' % project.linkname) def _prepare_result_data(self): data = { 'test_name': self.cmd.test_name, 'state': self.cmd.state, } if self.cmd.url: if self.cmd.url in ('none', 'null'): self.cmd.url = None data['url'] = self.cmd.url if self.cmd.summary: if self.cmd.summary in ('none', 'null'): self.cmd.summary = None data['summary'] = self.cmd.summary if self.cmd.summary_from_file: try: with open(self.cmd.summary_from_file) as summary_file: data['summary'] = summary_file.read() except IOError as e: die(str(e)) return data def _do_post_results(self, obj, msg_overrides): try: data = self._prepare_result_data() r = obj.post('/test-results/', data=data) print("Posted result: %s: %s" % (r['test_name'], r['state'])) except HttpError as e: e.handle_error(msg_overrides) raise def do_post_result(self): (series, revision) = self.cmd_get_series_revision() self._do_post_results(revision, { 404: "No series with id %d" % self.cmd.series_id }) def do_post_result_patch(self): patch = self.pw.get_patch(self.cmd.patch_id) self._do_post_results(patch, { 404: "No patch with id %d" % self.cmd.patch_id }) def run(self): self.setup() method = 'do_' + self.cmd.method_name() try: ret = getattr(self, method)() ret = 0 if ret is None else ret return ret except IOError as e: if e.errno != errno.EPIPE: raise return 0 class AliasedSubParsersAction(argparse._SubParsersAction): '''Aliases for argparse positional arguments.''' class _AliasedPseudoAction(argparse.Action): def __init__(self, name, aliases, help): dest = name if aliases: dest += ' (%s)' % ','.join(aliases) sup = super(AliasedSubParsersAction._AliasedPseudoAction, self) sup.__init__(option_strings=[], dest=dest, help=help) def add_parser(self, name, **kwargs): if 'aliases' in kwargs: aliases = kwargs['aliases'] del kwargs['aliases'] else: aliases = [] parser = super(AliasedSubParsersAction, self). \ add_parser(name, **kwargs) # Make the aliases work. for alias in aliases: self._name_parser_map[alias] = parser # Make the help text reflect them, first removing old help entry. if 'help' in kwargs: help = kwargs.pop('help') self._choices_actions.pop() pseudo_action = self._AliasedPseudoAction(name, aliases, help) self._choices_actions.append(pseudo_action) return parser def parser_add_result_options(parser): parser.add_argument('--url', '-u', metavar='url', type=str, help='the URL where to get full test results') parser.add_argument('--summary', '-s', metavar='summary', type=str, help='a summary of the test results') parser.add_argument('--summary-from-file', metavar='file', type=str, help='a summary of the test results, taken from ') parser.add_argument('test_name', metavar='test_name', type=str, help='the name of the test') parser.add_argument('state', metavar='state', type=str, help='the state of the test. One of pending, success, warning or ' ' failure') if __name__ == '__main__': # on SIGINT, return back to the directory the script was launched from start_directory = os.getcwd() def sigint_handler(signal, frame): os.chdir(start_directory) sys.exit(1) signal.signal(signal.SIGINT, sigint_handler) parser = argparse.ArgumentParser() parser.register('action', 'parsers', AliasedSubParsersAction) # arguments common to all commands parser.add_argument('--config', '-c', metavar='config', type=str, help="configuration to use, 'default' if omitted") parser.add_argument("-C", dest="directory", default='.', help="change to directory") subparsers = parser.add_subparsers(dest='name', metavar='command') # apply apply_series_parser = subparsers.add_parser('apply', aliases=('as',), help='apply a series to the current branch') apply_series_parser.add_argument('--signoff', '-s', action="store_true", help='Add Signed-off-by line by the committer at the end of the ' 'commit log message') apply_series_parser.add_argument('--revision', '-r', metavar='revision', type=int, help='the revision to apply, latest if omitted') apply_series_parser.add_argument('series_id', metavar='series_id', type=int, help='the series id to apply') # apply-patch apply_patch_parser = subparsers.add_parser('apply-patch', aliases=('ap',), help='apply a patch to the current branch') apply_patch_parser.add_argument('--signoff', '-s', action="store_true", help='Add Signed-off-by line by the committer at the end of the ' 'commit log message') apply_patch_parser.add_argument('patch_id', metavar='patch_id', type=int, help='the patch id to apply') # mbox mbox_parser = subparsers.add_parser('mbox', help='retrieve a mbox file of the series and print it on stdout') mbox_parser.add_argument('--revision', '-r', metavar='revision', type=int, help='the revision to retrieve, latest if omitted') mbox_parser.add_argument('series_id', metavar='series_id', type=int, help='the series id to retrieve') # mbox-patch mbox_parser = subparsers.add_parser('mbox-patch', help='retrieve a mbox file of the patch and print it on stdout') mbox_parser.add_argument('patch_id', metavar='patch_id', type=int, help='the patch id to apply') # poll-events poll_events_parser = subparsers.add_parser('poll-events', help='list events since the last invocation') poll_events_parser.add_argument('--name', '-n', metavar='event_name', nargs='+', type=str, dest='event_names', help='retrieve events of the given type(s)') poll_events_parser.add_argument('--since', '-s', metavar='timestamp', type=str, help='retrieve events newer than the given ISO 8601 time') # post-result post_result_parser = subparsers.add_parser('post-result', help='post test results for a given series revision') post_result_parser.add_argument('--revision', '-r', metavar='revision', type=int, help='the revision tested, latest if omitted') post_result_parser.add_argument('series_id', metavar='series_id', type=int, help='the series id to report test results for') parser_add_result_options(post_result_parser) # post-result-patch post_patch_result_parser = subparsers.add_parser('post-result-patch', help='post test results for a given patch') post_patch_result_parser.add_argument('patch_id', metavar='patch_id', type=int, help='the patch id to report test results for') parser_add_result_options(post_patch_result_parser) # list list_parser = subparsers.add_parser('list', help='list series') since_group = list_parser.add_mutually_exclusive_group(required=False) since_group.add_argument('--submitted-since', '-s', metavar='timestamp', type=str, help='retrieve submitted series newer than the given ' 'ISO 8601 time') since_group.add_argument('--updated-since', '-u', metavar='timestamp', type=str, help='retrieve updated series newer than the given ISO 8601 time') filter_group = list_parser.add_argument_group() filter_group.add_argument('-n', metavar='n', type=int, default=-1, help='number of series to retrieve') filter_group.add_argument('--test-state', metavar='test_state', type=str, choices=TestState.choices(), help='retrieve series with a particular test state') filter_group.add_argument('title', type=str, metavar='str', nargs='?', help='substring to search for series by name') default_series_fields = ['id', 'name', 'n_patches', 'submitter', 'last_updated'] format_group = list_parser.add_mutually_exclusive_group(required=False) format_group.add_argument('--json', '-j', action="store_true", help='print the series in json format') format_group.add_argument('--fields', '-f', metavar='fields', nargs='+', type=str, choices=Series.FIELDS, default=default_series_fields, help='list of fields to display') git_pw = GitPatchwork() parser.parse_args(namespace=git_pw.cmd) os.chdir(git_pw.cmd.directory) ret = git_pw.run() os.chdir(start_directory) sys.exit(ret)