"""Utility functions for Raptor""" # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import bz2 import gzip import os import signal import socket import sys import time from subprocess import PIPE, Popen from urllib.request import urlretrieve from redo import retriable, retry try: import zstandard except ImportError: zstandard = None try: import lzma except ImportError: lzma = None from mozlog import get_proxy_logger from mozproxy import mozbase_dir, mozharness_dir LOG = get_proxy_logger(component="mozproxy") # running locally via mach TOOLTOOL_PATHS = [ os.path.join(mozharness_dir, "external_tools", "tooltool.py"), os.path.join( mozbase_dir, "..", "..", "python", "mozbuild", "mozbuild", "action", "tooltool.py", ), ] if "MOZ_UPLOAD_DIR" in os.environ: TOOLTOOL_PATHS.append( os.path.join( os.environ["MOZ_UPLOAD_DIR"], "..", "..", "mozharness", "external_tools", "tooltool.py", ) ) def transform_platform( str_to_transform, config_platform, config_processor=None, mitmproxy_version=None ): """Transform platform name i.e. 'mitmproxy-rel-bin-{platform}.manifest' transforms to 'mitmproxy-rel-bin-osx.manifest'. Also transform '{x64}' if needed for 64 bit / win 10 """ if "{platform}" not in str_to_transform and "{x64}" not in str_to_transform: return str_to_transform if "win" in config_platform: platform_id = "win" elif config_platform == "mac": # Bug 1920821 # If we are using mitmproxy 12 we need to ensure platform_id is configured # correctly for the folder structure. Having this check also keeps the ability to # playback on older versions which don't have ARM support if config_processor == "arm" and mitmproxy_version == "12.2.1": platform_id = "osx-arm64" else: platform_id = "osx" else: platform_id = "linux64" if "{platform}" in str_to_transform: str_to_transform = str_to_transform.replace("{platform}", platform_id) if "{x64}" in str_to_transform and config_processor is not None: if "x86_64" in config_processor: str_to_transform = str_to_transform.replace("{x64}", "_x64") else: str_to_transform = str_to_transform.replace("{x64}", "") return str_to_transform @retriable(sleeptime=2) def tooltool_download(manifest, run_local, raptor_dir): """Download a file from tooltool using the provided tooltool manifest""" tooltool_path = None for path in TOOLTOOL_PATHS: if os.path.exists(path): tooltool_path = path break if tooltool_path is None: raise Exception("Could not find tooltool path!") if run_local: command = [sys.executable, tooltool_path, "fetch", "-o", "-m", manifest] else: # Attempt to determine the tooltool cache path: # - TOOLTOOLCACHE is used by Raptor tests # - TOOLTOOL_CACHE is automatically set for taskcluster jobs # - fallback to a hardcoded path _cache = next( x for x in ( os.environ.get("TOOLTOOLCACHE"), os.environ.get("TOOLTOOL_CACHE"), "/builds/tooltool_cache", ) if x is not None ) command = [ sys.executable, tooltool_path, "fetch", "-o", "-m", manifest, "-c", _cache, ] try: proc = Popen(command, cwd=raptor_dir, text=True) if proc.wait() != 0: raise Exception("Command failed") except Exception as e: LOG.critical(f"Error while downloading {manifest} from tooltool:{str(e)}") if proc.poll() is None: proc.kill(signal.SIGTERM) raise def archive_type(path): filename, extension = os.path.splitext(path) filename, extension2 = os.path.splitext(filename) if extension2 != "": extension = extension2 if extension == ".tar": return "tar" elif extension == ".zip": return "zip" return None def extract_archive(path, dest_dir, typ): """Extract an archive to a destination directory.""" # Resolve paths to absolute variants. path = os.path.abspath(path) dest_dir = os.path.abspath(dest_dir) suffix = os.path.splitext(path)[-1] # We pipe input to the decompressor program so that we can apply # custom decompressors that the program may not know about. if typ == "tar": if suffix == ".bz2": ifh = bz2.open(str(path), "rb") elif suffix == ".gz": ifh = gzip.open(str(path), "rb") elif suffix == ".xz": if not lzma: raise ValueError("lzma Python package not available") ifh = lzma.open(str(path), "rb") elif suffix == ".zst": if not zstandard: raise ValueError("zstandard Python package not available") dctx = zstandard.ZstdDecompressor() ifh = dctx.stream_reader(path.open("rb")) elif suffix == ".tar": ifh = path.open("rb") else: raise ValueError("unknown archive format for tar file: %s" % path) args = ["tar", "xf", "-"] pipe_stdin = True elif typ == "zip": # unzip from stdin has wonky behavior. We don't use a pipe for it. ifh = open(os.devnull, "rb") args = ["unzip", "-o", str(path)] pipe_stdin = False else: raise ValueError("unknown archive format: %s" % path) LOG.info("Extracting %s to %s using %r" % (path, dest_dir, args)) t0 = time.time() with ifh: p = Popen(args, cwd=str(dest_dir), bufsize=0, stdin=PIPE) while True: if not pipe_stdin: break chunk = ifh.read(131072) if not chunk: break p.stdin.write(chunk) # make sure we wait for the command to finish p.communicate() if p.returncode: raise Exception("%r exited %d" % (args, p.returncode)) LOG.info("%s extracted in %.3fs" % (path, time.time() - t0)) def download_file_from_url(url, local_dest, extract=False): """Receive a file in a URL and download it, i.e. for the hostutils tooltool manifest the url received would be formatted like this: config/tooltool-manifests/linux64/hostutils.manifest""" if os.path.exists(local_dest): LOG.info("file already exists at: %s" % local_dest) if not extract: return True else: LOG.info("downloading: %s to %s" % (url, local_dest)) try: retry(urlretrieve, args=(url, local_dest), attempts=3, sleeptime=5) except Exception: LOG.error("Failed to download file: %s" % local_dest, exc_info=True) if os.path.exists(local_dest): # delete partial downloaded file os.remove(local_dest) return False if not extract: return os.path.exists(local_dest) typ = archive_type(local_dest) if typ is None: LOG.info("Not able to determine archive type for: %s" % local_dest) return False extract_archive(local_dest, os.path.dirname(local_dest), typ) return True def get_available_port(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("", 0)) s.listen(1) port = s.getsockname()[1] s.close() return port