#!/bin/sh # vim: syntax=python ''':' # First try to run this script with python3, else run with python then python2 if command -v python3 >/dev/null 2>/dev/null; then exec python3 "$0" "$@" elif command -v python >/dev/null 2>/dev/null; then exec python "$0" "$@" else exec python2 "$0" "$@" fi ''' import sys import os import subprocess import re PY3 = sys.version_info > (3,) dpkg_options = ["--force-confold", "--force-confdef"] dpkg_cmd = os.environ.get('CFENGINE_TEST_DPKG_CMD', "/usr/bin/dpkg") dpkg_deb_cmd = os.environ.get('CFENGINE_TEST_DPKG_DEB_CMD', "/usr/bin/dpkg-deb") dpkg_query_cmd = os.environ.get('CFENGINE_TEST_DPKG_QUERY_CMD', "/usr/bin/dpkg-query") dpkg_output_format = "Name=${Package}\nVersion=${Version}\nArchitecture=${Architecture}\n" dpkg_status_format = "Status=${Status}\n" + dpkg_output_format apt_get_cmd = os.environ.get('CFENGINE_TEST_APT_GET_CMD', "/usr/bin/apt-get") # Some options only work with specific versions of apt, so we must know the # current version in order to do the right thing. apt_version = subprocess.Popen([ apt_get_cmd , '-v'], stdout=subprocess.PIPE, universal_newlines=True).communicate()[0] apt_version = apt_version.splitlines()[0].split(' ')[1] apt_get_options = ["-o", "Dpkg::Options::=--force-confold", "-o", "Dpkg::Options::=--force-confdef", "-y"] # compare only the first two digits of the version so versions like 1.1.1ubuntu2 work if [int(x) for x in apt_version.split(".")[0:2]] < [1, 1]: apt_get_options.append("--force-yes") else: # The --force-yes option was deprecated in apt-get 1.1 apt_get_options.extend( [ "--allow-downgrades", "--allow-remove-essential", "--allow-change-held-packages"]) apt_get_list_update_options = [] # compare only the first two digits of the version so versions like 1.1.1ubuntu2 work if [int(x) for x in apt_version.split(".")[0:2]] < [1, 0]: apt_get_list_update_options.append("--with-new-pkgs") os.environ['DEBIAN_FRONTEND'] = "noninteractive" os.environ['LC_ALL'] = "C" NULLFILE = open(os.devnull, 'w') redirection_is_broken_cached = -1 def redirection_is_broken(): # Older versions of Python have a bug where it is impossible to redirect # stderr using subprocess, and any attempt at redirecting *anything*, not # necessarily stderr, will result in it being closed instead. This is very # bad, because RPM may then open its RPM database on file descriptor 2 # (stderr), and will cause it to output error messages directly into the # database file. Fortunately "stdout=subprocess.PIPE" doesn't have the bug, # and that's good, because it would have been much more tricky to solve. global redirection_is_broken_cached if redirection_is_broken_cached == -1: cmd_line = [sys.executable, sys.argv[0], "internal-test-stderr"] if subprocess.call(cmd_line, stdout=sys.stderr) == 0: redirection_is_broken_cached = 0 else: redirection_is_broken_cached = 1 return redirection_is_broken_cached def subprocess_Popen(cmd, stdout=None, stderr=None): if ((not redirection_is_broken()) or (stdout is None and stderr is None) or (stdout == subprocess.PIPE) or (stderr == subprocess.PIPE)): return subprocess.Popen(cmd, stdout=stdout, stderr=stderr) old_stdout_fd = -1 old_stderr_fd = -1 if stdout is not None: old_stdout_fd = os.dup(1) os.dup2(stdout.fileno(), 1) if stderr is not None: old_stderr_fd = os.dup(2) os.dup2(stderr.fileno(), 2) result = subprocess.Popen(cmd) if old_stdout_fd >= 0: os.dup2(old_stdout_fd, 1) os.close(old_stdout_fd) if old_stderr_fd >= 0: os.dup2(old_stderr_fd, 2) os.close(old_stderr_fd) return result def subprocess_call(cmd, stdout=None, stderr=None): process = subprocess_Popen(cmd, stdout, stderr) return process.wait() def get_package_data(): pkg_string = "" for line in sys.stdin: if line.startswith("File="): pkg_string = line.split("=", 1)[1].rstrip() # Don't break, we need to exhaust stdin. if not pkg_string: return 1 if (pkg_string.startswith("/")): # Absolute file. sys.stdout.write("PackageType=file\n") sys.stdout.flush() return subprocess_call([dpkg_deb_cmd, "--showformat", dpkg_output_format, "-W", pkg_string]) elif (re.search("([:,]|_[0-9])", pkg_string)): # Contains either a version number or an illegal symbol. sys.stdout.write(line + "ErrorMessage: Package string with illegal format\n") return 1 else: sys.stdout.write("PackageType=repo\n") sys.stdout.write("Name=" + pkg_string + "\n") return 0 def list_installed(): # Ignore everything. sys.stdin.readlines() process = subprocess_Popen([dpkg_query_cmd, "--showformat", dpkg_status_format, "-W"], stdout=subprocess.PIPE) installed_package = False for line in process.stdout: if PY3: line = line.decode("utf-8") line = line.rstrip("\n") # 'Status=install ok ' or 'Status=hold ok ' if line.startswith("Status=install") or line.startswith("Status=hold"): state = line.split()[2] if state in [ "installed", "half-configured", "half-installed" ]: installed_package = True else: installed_package = False elif line.startswith("Status="): installed_package = False elif installed_package: sys.stdout.write(line + "\n") return 0 def list_updates(online): # Ignore everything. sys.stdin.readlines() if online: result = subprocess_call([apt_get_cmd] + apt_get_options + ["update"], stdout=NULLFILE) if result != 0: return result # We ignore held packages (--ignore-hold) so that all package updates # available are listed. This makes package update listing compatible with # debian 8 and highers `apt list --upgradeable` process = subprocess_Popen([apt_get_cmd] + apt_get_options + apt_get_list_update_options + ["--simulate", "--ignore-hold", "upgrade"], stdout=subprocess.PIPE) for line in process.stdout: if PY3: line = line.decode("utf-8") # Example of lines that we try to match: # (name) (old version (ignored)) (new version) (repository(ies) (ignored)) (arch) # | | | | | # V V V V V # Inst php5-cli [5.3.10-1ubuntu3.17] (5.3.10-1ubuntu3.18 Ubuntu:12.04/precise-updates [amd64]) [] # # Note architecture included in the name on this one: # Inst php5-cli:i386 [5.3.10-1ubuntu3.17] (5.3.10-1ubuntu3.18 Ubuntu:12.04/precise-updates [i386]) [] # # Note multiple repositories in this one: # Inst linux-libc-dev [2.6.32-48squeeze4] (2.6.32-48squeeze6 Debian:6.0.10/oldstable, Debian-Security:6.0/oldoldstable [amd64]) # # Another example (note the addition of jessie:jessie without a comma): # Inst rudder-agent [4.1.0~rc1-jessie0] (4.1.0-jessie0 release/4.1.0-2 jessie:jessie [amd64]) # # name old version new version # | | | # /-------+-------\ /--+--\ /------+-------\ match = re.match("^Inst\s+(?P[^\s:]+)(?::\S+)?\s+\[[^]\s]+\]\s+\((?P\S+)" + # repository(ies) arch (might be optional) # | | # /--+-\ /---------+---------\ "(?:\s+\S+)*?(\s+\[(?P[^]\s]+)\])?\).*", line) if match is not None: sys.stdout.write("Name=" + match.group("name") + "\n") sys.stdout.write("Version=" + match.group("version") + "\n") arch = match.group("arch") if not arch: arch = get_platform_arch() sys.stdout.write("Architecture=" + arch + "\n") return 0 def get_platform_arch(): process = subprocess_Popen([dpkg_cmd, "--print-architecture"], stdout=subprocess.PIPE) for line in process.stdout: if PY3: line = line.decode("utf-8") return line.rstrip() return None def one_package_argument(name, arch, version, is_apt_install): args = [] archs = [] platform_arch = get_platform_arch() if arch: archs.append(arch) else: # If we have existing architectures, operate on those, instead # of the platform default. stderr is suppressed to avoid # message pollution if the package is not be installed process = subprocess_Popen([dpkg_query_cmd, "--showformat", "${Architecture}=${Status}\n", "-W", name + ":*"], stdout=subprocess.PIPE, stderr=NULLFILE) for line in process.stdout: if PY3: line = line.decode("utf-8") # The space before "installed" is important, because it can be "not-installed". if "=" in line: arch, stat = line.split("=", 1) if stat.find(" installed") >= 0: archs.append(arch) version_suffix = "" if version != "": version_suffix = "=" + version if archs: for cur_arch in archs: if cur_arch == platform_arch: if is_apt_install: # Store duplicated entry in tuple for simplicity of use in repo_install and remove # functions. args.append((name + version_suffix, name + version_suffix)) else: # For some distributions with multi arch support we must provide package name with ':architecture' # postfix to remove package which architecture matches architecture of OS (Debian 7). # This is not consistent behavior for all dpkg implementations. On Ubuntu 12 we have to # use a package name only and adding ':architecture' postfix results in an error. args.append((name + version_suffix, name + ':' + cur_arch + version_suffix)) else: # For managing packages which architecture doesn't match native OS architecture we always # are using ':architecture' postfix added to package name. args.append((name + ':' + cur_arch + version_suffix, name + ':' + cur_arch + version_suffix)) else: args.append((name + version_suffix, name + version_suffix)) return args def package_arguments_builder(is_apt_install): name = "" version = "" arch = "" args = [] for line in sys.stdin: if line.startswith("Name="): if name: # Each new "Name=" triggers a new entry. args.extend(one_package_argument(name, arch, version, is_apt_install)) version = "" arch = "" name = line.split("=", 1)[1].rstrip() elif line.startswith("Version="): version = line.split("=", 1)[1].rstrip() elif line.startswith("Architecture="): arch = line.split("=", 1)[1].rstrip() elif line.startswith("options="): global apt_get_options option = line.split("=", 1)[1].rstrip() if option: apt_get_options.append(option) if name: args.extend(one_package_argument(name, arch, version, is_apt_install)) return args def repo_install(): args = package_arguments_builder(True) cmd_line = [apt_get_cmd] + apt_get_options + ["install"] if (not args): return 0 # Convert list of tuples into two lists so that first element of each # tuple belongs to list 'a1' and the second one to list 'a2'. a1, a2 = map(list, zip(*args)) # For 'repo_insrtall' both 'a1' and 'a2' should be equal so we can operate # on 'a1' elements only. if a1: return subprocess_call(cmd_line + a1, stdout=NULLFILE) return 0 def remove(): args = package_arguments_builder(False) cmd_line = [apt_get_cmd] + apt_get_options + ["remove"] if (not args): return 0 # Convert list of tuples into two lists so that first element of each # tuple belongs to list 'a1' and the second one to list 'a2'. # # In case of multi arch support elements of 'a1' list should not contain # packages names with ':architecture' suffix for all packages matching native # OS architecture. a1, a2 = map(list, zip(*args)) # As there seems to be no unified method to remove packages matching # native OS architecture we are trying first to remove packages providing # just a package name and if this call is failing we are trying # 'package_name:architecture' approach. ret = subprocess_call(cmd_line + a1, stdout=NULLFILE) if ret != 0 and a1 != a2: ret = subprocess_call(cmd_line + a2, stdout=NULLFILE) return ret def file_install(): cmd_line = [dpkg_cmd] + dpkg_options + ["-i"] found = False for line in sys.stdin: if line.startswith("File="): found = True cmd_line.append(line.split("=", 1)[1].rstrip()) if (not found): return 0 return subprocess_call(cmd_line, stdout=NULLFILE) def main(): if len(sys.argv) < 2: sys.stderr.write("Need to provide argument\n") return 2 if sys.argv[1] == "internal-test-stderr": # This will cause an exception if stderr is closed. try: os.fstat(2) except OSError: return 1 return 0 elif sys.argv[1] == "supports-api-version": sys.stdout.write("1\n") return 0 elif sys.argv[1] == "get-package-data": return get_package_data() elif sys.argv[1] == "list-installed": return list_installed() elif sys.argv[1] == "list-updates": return list_updates(True) elif sys.argv[1] == "list-updates-local": return list_updates(False) elif sys.argv[1] == "repo-install": return repo_install() elif sys.argv[1] == "remove": return remove() elif sys.argv[1] == "file-install": return file_install() else: sys.stderr.write("Invalid operation\n") return 2 sys.exit(main())