#!/usr/bin/env python # # Copyright (c) STMicroelectronics 2014 # # This file is part of repo-mirror. # # repo-mirror is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License v2.0 # as published by the Free Software Foundation # # repo-mirror is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # v2.0 along with repo-mirror. If not, see . # # # Usage: get usage with repo-mirror -h # from __future__ import print_function import sys # Fail early if python version is not supported def check_python_version(): try: assert sys.hexversion >= 0x02060000 except: # pragma: no cover sys.stderr.write('repo-mirror: error: python version >= 2.6 is required\n') sys.exit(1) check_python_version() # Setup reasonably quiet mode on ^C import signal def interrupt_handler(signum, frame): """ Handler for signals that require immediate exit. """ sys.stderr.write("repo-mirror: interrupted by signal %d\n" % signum) sys.exit(128 + signum) signal.signal(signal.SIGINT, interrupt_handler) import os, subprocess, fcntl, errno, optparse, time, hashlib, logging, tempfile, re, select, stat # Update VERSION for major.minor.patch releases. # The sha1sum will be appended to the version string. VERSION="1.4.3" class ExitCodes: """ Exit codes used to feedback the parent process. """ """ Aligned with the coreutils timeout implementation. """ USER = 2 # user error TIMEDOUT = 124 # semaphore timed out CANCELED = 125 # internal error CANNOT_INVOKE = 126 # error executing command ENOENT = 127 # couldn't find command to execute class LocalOptionParser(optparse.OptionParser): """ Overrides OptionParser. Exits with the correct code on error. Overrides version output. """ def __init__(self): optparse.OptionParser.__init__( self, prog="repo-mirror", description="run repo commands with transparent local git mirrors management.", usage="%prog [options] -- REPO_COMMAND..." ) self.disable_interspersed_args() def parse_args(self): opts, args = optparse.OptionParser.parse_args(self) return self.process_args(opts, args) @staticmethod def handle_version(option, opt, value, parser): with open(__file__, "rb") as f: sha1 = hashlib.sha1(f.read()).hexdigest() print("%s version %s [sha1:%s]" % (parser.prog, VERSION, sha1)) parser.exit(0) def process_args(self, opts, args): """ Process parsed args into suitable form after some checks. Return a single namespace with all arguments. """ if opts.internal_locking not in ["none", "path"]: self.exit(1, "%s: error: invalid locking scheme: --internal-locking='%s'\n" % (self.prog, opts.internal_locking)) if not opts.mirror_dir: home = os.environ.get('HOME', None) if home: opts.mirror_dir = os.path.join(home, ".repo-mirror") else: self.exit(1, "%s: error: $HOME undefined, please specify a mirror dir argument (--mirror-dir MIRROR_DIR)\n" % self.prog) if opts.clean or opts.clean_all or opts.list: if len(args) > 0: self.exit(1, "%s: error: unexpected additional arguments for --list or --clean or --clean-all action\n") else: if len(args) < 1: self.exit(1, "%s: error: missing repo arguments\n" % self.prog) opts.command = args[0] opts.arguments = args[1:] return opts def exit(self, status=0, message=None): """ Exit with message and exit code 2 on user arguments errors. """ if status != 0: status = ExitCodes.USER optparse.OptionParser.exit(self, status, message) parser = LocalOptionParser() parser.add_option("-m", "--mirror-dir", help="repo mirror dir (default: $HOME/.repo-mirror)") parser.add_option("-j", "--jobs", type=int, default=0, help="repo mirror sync jobs, passed to repo sync when fetching mirrors") parser.add_option("-r", "--repo", default="repo", help="repo tool actual executable. Default: repo (as found in $PATH)") parser.add_option("-i", "--id", default="default", help="optional identifier for the mirrored repo tree. The default is to mirror all trees at the same location. Default: default") parser.add_option("--list", action="store_true", help="list currently mirrored repo trees") parser.add_option("--clean", action="store_true", help="clean the repo mirror tree identified by --id (or the default)") parser.add_option("--clean-all", action="store_true", help="clean all existing repo mirror trees") parser.add_option("-q", "--quiet", action="store_true", help="if set, repo init/sync of mirrors are done with --quiet option") parser.add_option("-v", "--version", help="output version string", action="callback", callback=parser.handle_version) parser.add_option("-n", "--dry-run", action="store_true", help="dry run mode, log execution but do not produce side effect") parser.add_option("-d", "--debug", help="debug mode", action="store_true") parser.add_option("--log-file", default="&stderr", help="log file, &stderr if not specified") parser.add_option("--internal-locking", help="internal option: locking scheme, one of: path, none", default="path") def mkdir_p(path, dryrun=False, logger=None, mode=None): cmd_str = "mkdir -p '%s'" % path if dryrun: if logger: logger.info("dry-run: %s" % cmd_str) return 0 try: if logger: logger.debug("executing %s..." % cmd_str) os.makedirs(path) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise # pragma: no cover if mode: os.chmod(path, mode) def clean_rec(path, dryrun=False, logger=None): cmd_str = "rm -rf '%s'" % path if dryrun: if logger: logger.info("dry-run: %s" % cmd_str) return 0 if os.path.exists(path): if logger: logger.debug("executing %s..." % cmd_str) subprocess.call(['rm', '-rf', path]) return 0 class Lock: """ Cross process lock. Note that this works on NFS """ """ mounts only from the same system. """ def __init__(self, filename, logger=None): self.filename = filename self.logger = logger self.handle = None if self.logger: self.logger.debug("creating lock '%s'..." % self.filename) mkdir_p(os.path.dirname(filename)) self.handle = open(filename, 'w') def acquire(self): if self.logger: self.logger.debug("acquiring lock '%s'..." % self.filename) fcntl.flock(self.handle, fcntl.LOCK_EX) def release(self): if self.logger: self.logger.debug("releasing lock '%s'." % self.filename) fcntl.flock(self.handle, fcntl.LOCK_UN) def __del__(self): if self.handle: self.handle.close() class ICacheLock: """ Interface for the cache hierarchy locking scheme. """ """ rootname is the root directory for the locking hierarchy """ """ rootname does not need to be absolute/normalized and will """ """ be unconditionally created """ """ dpath is the path from the rootdir and must be normalized """ """ and starting with /. Such as /a/b, which will actually lock """ """ rootpath/a/b """ """ dpath will be created on demand """ def __init__(self, rootname, dpath, logger=None): """ Initialize the lock object """ def acquire_dir(self): """ Acquire a directory lock for dpath """ def release_dir(self): """ Release a directory lock for dpath """ class CacheNoneLock(ICacheLock): """ Implementation of a non locked hierarchy, for test purpose """ def __init__(self, rootname, dpath, logger=None): assert(os.path.normpath(dpath) == dpath) def acquire_dir(self): pass def release_dir(self): pass class CachePathLock(ICacheLock): """ Implementation of hierarchical locking based on path """ """ Actually this scheme recursively locks the parent path before """ """ acquiring the actual path lock. """ def __init__(self, rootname, dpath, logger=None): if logger: logger.debug("creating CachePathLock '%s' '%s'..." % (rootname, dpath)) assert(os.path.normpath(dpath) == dpath) assert(dpath[0] == '/') mkdir_p(rootname) self.rootname = os.path.abspath(rootname) self.dpath = dpath self.logger = logger self.lock = None def _get_parent_dpath(self, dpath): assert(dpath[0] == "/") (head, tail) = os.path.split(dpath) if tail == "": assert(head == "/") return "/" else: assert(head[0] == "/") return head def _get_lock_path(self, dpath): assert(dpath[0] == "/") (head, tail) = os.path.split(dpath) if tail == "": assert(head == "/") return os.path.join(self.rootname, "_.lock") else: assert(head[0] == "/") assert(tail != "_") return os.path.join(self.rootname, os.path.join(head[1:], "%s.lock" % tail)) def _acquire_dir_rec(self, dpath): parent_dpath = self._get_parent_dpath(dpath) if dpath != parent_dpath: parent_lock = self._acquire_dir_rec(parent_dpath) try: this_lock = Lock(self._get_lock_path(dpath), logger=self.logger) this_lock.acquire() finally: parent_lock.release() else: this_lock = Lock(self._get_lock_path(dpath), logger=self.logger) this_lock.acquire() return this_lock def acquire_dir(self): if self.logger: self.logger.debug("acquiring CachePathLock '%s' '%s'..." % (self.rootname, self.dpath)) self.lock = self._acquire_dir_rec(self.dpath) def release_dir(self): if self.logger: self.logger.debug("releasing CachePathLock '%s' '%s'..." % (self.rootname, self.dpath)) if self.lock: self.lock.release() class RepoMirror(): """ Executable class for repo mirror. Providing run() method. """ def __init__(self, args): """ Constructor, arguments are stored into the args object. """ self.args = args self.lock_class = None self.logger = None def _list_entries(self): lock = self.lock_class(self.args.mirror_dir, "/", logger=self.logger) try: lock.acquire_dir() for root, dirs, files in os.walk(self.args.mirror_dir): break finally: lock.release_dir() return dirs def _clean_entry(self, entry): lock = self.lock_class(self.args.mirror_dir, os.path.join("/", entry), logger=self.logger) try: lock.acquire_dir() self.logger.debug("cleaning mirror entry '%s' '%s'" % (self.args.mirror_dir, entry)) clean_rec(os.path.join(self.args.mirror_dir, entry), self.args.dry_run, self.logger) finally: lock.release_dir() def list(self): """ List all currently mirrored repo trees and gits repos. Alphanum order. """ for entry in sorted(self._list_entries()): print("%s" % entry) return 0 def clean(self): """ Clean the current mirror id. """ self._clean_entry(self.args.id) return 0 def clean_all(self): """ Clean all current mirrors. """ for entry in self._list_entries(): self._clean_entry(entry) return 0 def _execute_command(self, command_args, cwd=None, env=None, stdin=None, stdout=None, stderr=None, no_input=False, no_output=False): """ Execute the given command args list with options. """ cmd_str = "cd '%s' && " % cwd if cwd != None else "" cmd_str += " ".join(command_args) self.logger.debug("init repo: executing %s..." % cmd_str) if self.args.dry_run: self.logger.info("dry-run: %s" % cmd_str) return 0 def setfl(fd, msk): if not msk: return flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | msk) streams_map = {} for ids, filename, iostream in [('out', stdout, sys.stdout), ('err', stderr, sys.stderr)]: outs = [] if not no_output and filename == None: fd = None else: fd = subprocess.PIPE if filename != None: outs.append(open(filename, "wb")) if not no_output: outs.append(os.fdopen(os.dup(iostream.fileno()), "wb")) streams_map[ids] = {'fd': fd, 'outs': outs} if no_input: fdin = open(os.devnull, "r") else: fdin = open(stdin, "r") if stdin != None else None code = None try: proc = subprocess.Popen(command_args, cwd=cwd, env=env, stdin=fdin, stdout=streams_map['out']['fd'], stderr=streams_map['err']['fd']) fds_in = [] # set flags to get non-blocking read of stdio/stderr if streams_map['out']['fd']: setfl(proc.stdout, msk=os.O_NONBLOCK) fds_in.append(proc.stdout) if streams_map['err']['fd']: setfl(proc.stderr, msk=os.O_NONBLOCK) fds_in.append(proc.stderr) while fds_in: ready = select.select(fds_in, [], [])[0] for fd in ready: data = fd.read() if data: ids = 'out' if fd == proc.stdout else 'err' for out in streams_map[ids]['outs']: out.write(data) out.flush() else: fds_in.remove(fd) code = proc.wait() except OSError as e: print("repo-mirror: error: failed to run " \ "command: %s : %s" % (e.strerror, " ".join(command_args)), file=sys.stderr) if e.errno == errno.ENOENT: code = ExitCodes.ENOENT else: code = ExitCodes.CANNOT_INVOKE if fdin: fdin.close() for ids in streams_map: for out in streams_map[ids]['outs']: out.close() return code def _update_mirror_config(self, gitdir): """ Update a git mirror config file to be in --prune mode by default. """ self.logger.debug("init repo: update mirror config: at %s" % gitdir) configfile = os.path.join(gitdir, "config") if os.path.exists(configfile): command_args = ['git', 'config', '-f', configfile, '--replace-all', 'fetch.prune', 'true'] self._execute_command(command_args) def _update_mirrors_configs(self, mirrors_dir): """ Update all git mirrors present in the repo mirror tree. """ self._walk_mirrors_repos(mirrors_dir, self._update_mirror_config) def _fixup_mirror_repo(self, gitdir): """ Fixup a git mirror dir w.r.t. some know invalid state. The HEAD of the mirrored repo may be corrupted for some reason, including an invalid sha1 in a manifest file for instance. In this case, repo will fail to update the mirror, hence we first arrange to have well-formed HEAD reference, the actual reference will be set on the following repo sync command. """ self.logger.debug("init repo: fixup mirror repo: at %s" % gitdir) headfile = os.path.join(gitdir, 'HEAD') headref = '' wellformed_headref = 'ref: refs/heads/master' try: with open(headfile, "r") as inf: headref = inf.read().rstrip() except (OSError, IOError) as e: return # don't try to fixup if not existing/readable if ((re.match(r'^ref: +refs/', headref) or re.match('^[0-9a-f]{40}', headref)) and (headref == wellformed_headref or self._execute_command(['git', 'rev-parse', '--verify', 'HEAD'], no_input=True, no_output=True, cwd=gitdir) == 0)): return self.logger.debug("init repo: fixup mirror repo: fix corrupted HEAD reference '%s' to '%s': %s" % (headref, wellformed_headref, headfile)) with open(headfile, "w", 1) as outf: outf.write('%s\n' % wellformed_headref) def _fixup_mirrors_repos(self, mirrors_dir): """ Fixup all git mirrors present in the repo mirror tree. """ self._walk_mirrors_repos(mirrors_dir, self._fixup_mirror_repo) def _walk_mirrors_repos(self, mirrors_dir, action): """ Walk all mirrored repos and apply action() to each. """ dirs = [] for root, dirs, files in os.walk(mirrors_dir): break for gitdir in filter(lambda x: x.endswith(".git"), dirs): action(os.path.join(mirrors_dir, gitdir)) def _append_options(self, args, append_opts): opts = [] new_args = [] i = 0 in_opt = True while i < len(args): if in_opt and args[i] == "--": in_opt = False if in_opt: opts.append(args[i]) else: new_args.append(args[i]) i += 1 return opts + append_opts + new_args def _fixup_repo_init_arguments(self, repo_init_args): def fixup_ssh_url(arg): new_arg = arg m = re.match(r"^([^:]*@[^:]*):(.*)", new_arg) if m != None: new_arg = "ssh://" + m.group(1) + "/" + m.group(2) return new_arg def fixup_args(args): new_args = [] i = 0 in_opt = True while i < len(args): if in_opt and args[i].startswith("-"): for opt in ("-u", "--manifest-url"): iopt = opt + "=" if opt.startswith("--") else opt if args[i] == opt and i + 1 < len(args): new_args.append(args[i]) new_args.append(fixup_ssh_url(args[i+1])) i += 2 break elif args[i].startswith(iopt) and len(args[i]) > len(iopt): new_args.append(iopt + fixup_ssh_url(args[i][len(iopt):])) i += 1 break if i < len(args): if args[i] == "--": in_opt = False new_args.append(args[i]) i += 1 return new_args return fixup_args(repo_init_args) def _must_execute_command_as_is(self): def _is_mirror_or_ref_arg(x): return x.find("--mirror") == 0 or x.find("--reference") == 0 if self.args.command != "init": return True mirror_or_ref = filter(_is_mirror_or_ref_arg, self.args.arguments) if len(list(mirror_or_ref)) > 0: return True return False def _cleanup_on_sync_error(self, errfile): """ Check error file generated by repo sync command and try to recover some failure conditions. Actually does: - clean reported dangling git locks if any Returns 1 if previous repo sync command should be re-executed. """ if self.args.dry_run: return 0 if not os.path.exists(errfile): return 0 retry = 0 with open(errfile, "r") as f: for line in f.readlines(): m = re.search(r"unable to create '(.*\.lock)'", line, re.IGNORECASE) if m != None and m.group(1) != None: try: os.unlink(m.group(1)) self.logger.debug("removed lock file: '%s'" % m.group(1)) retry = 1 except: pass return retry def execute_repo(self): """ Actually execute wrapped repo commands with optional mirroring. """ def lang_c_env(): """ Returns a copy of the environment with LANG/LC_ALL=C. """ env = os.environ.copy() env.update({'LANG': 'C', 'LC_ALL': 'C'}) return env if self.args.mirror_dir == None or self._must_execute_command_as_is(): command_args = ([self.args.repo, self.args.command] + self.args.arguments) self.logger.debug("executing %s..." % " ".join(command_args)) code = self._execute_command(command_args) return code # Fixup init arguments to be passed to repo init --mirror/--reference init_arguments = self._fixup_repo_init_arguments(self.args.arguments) # Create mirror dir if non existent and set user only permission try: self.logger.debug("creating mirror dir '%s'" % self.args.mirror_dir) mkdir_p(self.args.mirror_dir, self.args.dry_run, self.logger, mode=stat.S_IRWXU) except OSError as e: print("repo-mirror: error:: can't create mirror dir '%s': %s" % (self.args.mirror_dir, e.strerror), file=sys.stderr) return ExitCodes.USER # Case or repo init with mirror dir... lock = self.lock_class(self.args.mirror_dir, os.path.join("/", self.args.id), logger=self.logger) # Take lock and init mirrored repo mirror_dir_id = os.path.join(self.args.mirror_dir, self.args.id) try: lock.acquire_dir() # Prepare repo mirrors_dir = os.path.join(mirror_dir_id, "repos") mkdir_p(mirrors_dir, self.args.dry_run, self.logger, mode=stat.S_IRWXU) # Fixup some known errors self._fixup_mirrors_repos(mirrors_dir) # Update mirrors configuration self._update_mirrors_configs(mirrors_dir) # Init mirrored repo out_file = os.path.join(mirror_dir_id, "repo-init.out") err_file = os.path.join(mirror_dir_id, "repo-init.err") command_args = ([self.args.repo, "init"] + self._append_options(init_arguments, ["--mirror"] + (["--quiet"] if self.args.quiet else []))) clean_rec(os.path.join(mirrors_dir, ".repo"), self.args.dry_run, self.logger) code = self._execute_command(command_args, cwd=mirrors_dir, no_input=True, stdout=out_file, stderr=err_file, env=lang_c_env()) if code != 0: return code # Sync mirrored repo out_file = os.path.join(mirror_dir_id, "repo-sync.out") err_file = os.path.join(mirror_dir_id, "repo-sync.err") command_args = ([self.args.repo, "sync"] + (["--jobs=%d" % self.args.jobs] if self.args.jobs else []) + (["--quiet"] if self.args.quiet else [])) while True: code = self._execute_command(command_args, cwd=mirrors_dir, no_input=True, stdout=out_file, stderr=err_file, env=lang_c_env()) if code != 0 and self._cleanup_on_sync_error(err_file) == 1: self.logger.debug("attempted to fix last command, re-executing") continue break if code != 0: return code # Clean local .repo in mirror dir as we do not depend on it anymore clean_rec(os.path.join(mirrors_dir, ".repo"), self.args.dry_run, self.logger) finally: lock.release_dir() # Init requested repo with alternate mirror command_args = ([self.args.repo, "init"] + self._append_options(init_arguments, ["--reference=%s" % mirrors_dir])) code = self._execute_command(command_args) return code def run(self): # Setup logger log_fmt = "%(levelname)s: %(name)s: %(process)d: %(message)s" log_lvl = logging.DEBUG if args.debug else logging.INFO if self.args.log_file == "&stderr": log_stream = sys.stderr elif self.args.log_file == "&stdout": log_stream = sys.stdout else: try: log_stream = open(args.log_file, "a", 1) except IOError as e: print("repo-mirror: error:: can't open log file: '%s'" % str(e), file=sys.stderr) return ExitCodes.USER logging.basicConfig(stream = log_stream, level = log_lvl, format = log_fmt) self.logger = logging.getLogger("repo-mirror") # Select locking scheme if self.args.internal_locking == "none": self.lock_class = CacheNoneLock elif self.args.internal_locking == "path": self.lock_class = CachePathLock # Setup for dry run mode if self.args.dry_run: self.lock_class = CacheNoneLock # Dispatcher if self.args.clean_all: return self.clean_all() elif self.args.clean: return self.clean() elif self.args.list: return self.list() else: return self.execute_repo() args = parser.parse_args() sys.exit(RepoMirror(args).run())